iota_storage/
write_path_pending_tx_log.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! WritePathPendingTransactionLog is used in the transaction write path (e.g.
6//! in TransactionOrchestrator) for transaction submission processing. It helps
7//! to achieve:
8//! 1. At one time, a transaction is only processed once.
9//! 2. When Fullnode crashes and restarts, the pending transaction will be
10//!    loaded and retried.
11
12use std::path::PathBuf;
13
14use iota_types::{
15    base_types::TransactionDigest,
16    crypto::EmptySignInfo,
17    error::{IotaError, IotaResult},
18    message_envelope::TrustedEnvelope,
19    transaction::{SenderSignedData, VerifiedTransaction},
20};
21use typed_store::{
22    DBMapUtils,
23    rocks::{DBMap, MetricConf},
24    traits::{Map, TableSummary, TypedStoreDebug},
25};
26
27pub type IsFirstRecord = bool;
28
29#[derive(DBMapUtils)]
30struct WritePathPendingTransactionTable {
31    logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
32}
33
34pub struct WritePathPendingTransactionLog {
35    pending_transactions: WritePathPendingTransactionTable,
36}
37
38impl WritePathPendingTransactionLog {
39    pub fn new(path: PathBuf) -> Self {
40        let pending_transactions = WritePathPendingTransactionTable::open_tables_transactional(
41            path,
42            MetricConf::new("pending_tx_log"),
43            None,
44            None,
45        );
46        Self {
47            pending_transactions,
48        }
49    }
50
51    // Returns whether the table currently has this transaction in record.
52    // If not, write the transaction and return true; otherwise return false.
53    // Because the record will be cleaned up when the transaction finishes,
54    // even when it returns true, the callsite of this function should check
55    // the transaction status before doing anything, to avoid duplicates.
56    pub async fn write_pending_transaction_maybe(
57        &self,
58        tx: &VerifiedTransaction,
59    ) -> IotaResult<IsFirstRecord> {
60        let tx_digest = tx.digest();
61        let mut transaction = self.pending_transactions.logs.transaction()?;
62        if transaction
63            .get(&self.pending_transactions.logs, tx_digest)?
64            .is_some()
65        {
66            return Ok(false);
67        }
68        transaction.insert_batch(
69            &self.pending_transactions.logs,
70            [(tx_digest, tx.serializable_ref())],
71        )?;
72        let result = transaction.commit();
73        Ok(result.is_ok())
74    }
75
76    // This function does not need to be behind a lock because:
77    // 1. there could be more than one callsite but the deletion is idempotent.
78    // 2. it does not race with the insert (`write_pending_transaction_maybe`) in a
79    //    way that we care. 2.a. for one transaction, `finish_transaction` shouldn't
80    //    predate `write_pending_transaction_maybe`. 2.b  for concurrent requests of
81    //    one transaction, a call to this function may happen in between hence
82    //    making the second request thinks it is the first record. It's preventable
83    //    by checking this transaction again after the call of
84    //    `write_pending_transaction_maybe`.
85    pub fn finish_transaction(&self, tx: &TransactionDigest) -> IotaResult {
86        let mut write_batch = self.pending_transactions.logs.batch();
87        write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
88        write_batch.write().map_err(IotaError::from)
89    }
90
91    pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
92        self.pending_transactions
93            .logs
94            .unbounded_iter()
95            .map(|(_tx_digest, tx)| VerifiedTransaction::from(tx))
96            .collect()
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use std::collections::HashSet;
103
104    use anyhow;
105    use iota_types::utils::create_fake_transaction;
106
107    use super::*;
108
109    #[tokio::test]
110    async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
111        let temp_dir = tempfile::tempdir().unwrap();
112        let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
113        let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
114        let tx_digest = *tx.digest();
115        assert!(
116            pending_txes
117                .write_pending_transaction_maybe(&tx)
118                .await
119                .unwrap()
120        );
121        // The second write will return false
122        assert!(
123            !pending_txes
124                .write_pending_transaction_maybe(&tx)
125                .await
126                .unwrap()
127        );
128
129        let loaded_txes = pending_txes.load_all_pending_transactions();
130        assert_eq!(vec![tx], loaded_txes);
131
132        pending_txes.finish_transaction(&tx_digest).unwrap();
133        let loaded_txes = pending_txes.load_all_pending_transactions();
134        assert!(loaded_txes.is_empty());
135
136        // It's ok to finish an already finished transaction
137        pending_txes.finish_transaction(&tx_digest).unwrap();
138
139        // Test writing and finishing more transactions
140        let txes: Vec<_> = (0..10)
141            .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
142            .collect();
143        for tx in txes.iter().take(10) {
144            assert!(
145                pending_txes
146                    .write_pending_transaction_maybe(tx)
147                    .await
148                    .unwrap()
149            );
150        }
151        let loaded_tx_digests: HashSet<_> = pending_txes
152            .load_all_pending_transactions()
153            .iter()
154            .map(|t| *t.digest())
155            .collect();
156        assert_eq!(
157            txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
158            loaded_tx_digests
159        );
160
161        for tx in txes.iter().take(5) {
162            pending_txes.finish_transaction(tx.digest()).unwrap();
163        }
164        let loaded_tx_digests: HashSet<_> = pending_txes
165            .load_all_pending_transactions()
166            .iter()
167            .map(|t| *t.digest())
168            .collect();
169        assert_eq!(
170            txes.iter()
171                .skip(5)
172                .map(|t| *t.digest())
173                .collect::<HashSet<_>>(),
174            loaded_tx_digests
175        );
176
177        Ok(())
178    }
179}