iota_storage/
write_path_pending_tx_log.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! WritePathPendingTransactionLog is used in the transaction write path (e.g.
6//! in TransactionOrchestrator) for transaction submission processing. It helps
7//! to achieve:
8//! 1. At one time, a transaction is only processed once.
9//! 2. When Fullnode crashes and restarts, the pending transaction will be
10//!    loaded and retried.
11
12use std::path::PathBuf;
13
14use iota_types::{
15    base_types::TransactionDigest,
16    crypto::EmptySignInfo,
17    error::{IotaError, IotaResult},
18    message_envelope::TrustedEnvelope,
19    transaction::{SenderSignedData, VerifiedTransaction},
20};
21use tracing::instrument;
22use typed_store::{
23    DBMapUtils,
24    rocks::{DBMap, MetricConf},
25    traits::{Map, TableSummary, TypedStoreDebug},
26};
27
28pub type IsFirstRecord = bool;
29
30#[derive(DBMapUtils)]
31struct WritePathPendingTransactionTable {
32    logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
33}
34
35pub struct WritePathPendingTransactionLog {
36    pending_transactions: WritePathPendingTransactionTable,
37}
38
39impl WritePathPendingTransactionLog {
40    pub fn new(path: PathBuf) -> Self {
41        let pending_transactions = WritePathPendingTransactionTable::open_tables_transactional(
42            path,
43            MetricConf::new("pending_tx_log"),
44            None,
45            None,
46        );
47        Self {
48            pending_transactions,
49        }
50    }
51
52    // Returns whether the table currently has this transaction in record.
53    // If not, write the transaction and return true; otherwise return false.
54    // Because the record will be cleaned up when the transaction finishes,
55    // even when it returns true, the callsite of this function should check
56    // the transaction status before doing anything, to avoid duplicates.
57    #[instrument(level = "trace", skip_all, fields(tx_digest = ?tx.digest()))]
58    pub async fn write_pending_transaction_maybe(
59        &self,
60        tx: &VerifiedTransaction,
61    ) -> IotaResult<IsFirstRecord> {
62        let tx_digest = tx.digest();
63        let mut transaction = self.pending_transactions.logs.transaction()?;
64        if transaction
65            .get(&self.pending_transactions.logs, tx_digest)?
66            .is_some()
67        {
68            return Ok(false);
69        }
70        transaction.insert_batch(
71            &self.pending_transactions.logs,
72            [(tx_digest, tx.serializable_ref())],
73        )?;
74        let result = transaction.commit();
75        Ok(result.is_ok())
76    }
77
78    // This function does not need to be behind a lock because:
79    // 1. there could be more than one callsite but the deletion is idempotent.
80    // 2. it does not race with the insert (`write_pending_transaction_maybe`) in a
81    //    way that we care. 2.a. for one transaction, `finish_transaction` shouldn't
82    //    predate `write_pending_transaction_maybe`. 2.b  for concurrent requests of
83    //    one transaction, a call to this function may happen in between hence
84    //    making the second request thinks it is the first record. It's preventable
85    //    by checking this transaction again after the call of
86    //    `write_pending_transaction_maybe`.
87    pub fn finish_transaction(&self, tx: &TransactionDigest) -> IotaResult {
88        let mut write_batch = self.pending_transactions.logs.batch();
89        write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
90        write_batch.write().map_err(IotaError::from)
91    }
92
93    pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
94        self.pending_transactions
95            .logs
96            .unbounded_iter()
97            .map(|(_tx_digest, tx)| VerifiedTransaction::from(tx))
98            .collect()
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use std::collections::HashSet;
105
106    use anyhow;
107    use iota_types::utils::create_fake_transaction;
108
109    use super::*;
110
111    #[tokio::test]
112    async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
113        let temp_dir = tempfile::tempdir().unwrap();
114        let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
115        let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
116        let tx_digest = *tx.digest();
117        assert!(
118            pending_txes
119                .write_pending_transaction_maybe(&tx)
120                .await
121                .unwrap()
122        );
123        // The second write will return false
124        assert!(
125            !pending_txes
126                .write_pending_transaction_maybe(&tx)
127                .await
128                .unwrap()
129        );
130
131        let loaded_txes = pending_txes.load_all_pending_transactions();
132        assert_eq!(vec![tx], loaded_txes);
133
134        pending_txes.finish_transaction(&tx_digest).unwrap();
135        let loaded_txes = pending_txes.load_all_pending_transactions();
136        assert!(loaded_txes.is_empty());
137
138        // It's ok to finish an already finished transaction
139        pending_txes.finish_transaction(&tx_digest).unwrap();
140
141        // Test writing and finishing more transactions
142        let txes: Vec<_> = (0..10)
143            .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
144            .collect();
145        for tx in txes.iter().take(10) {
146            assert!(
147                pending_txes
148                    .write_pending_transaction_maybe(tx)
149                    .await
150                    .unwrap()
151            );
152        }
153        let loaded_tx_digests: HashSet<_> = pending_txes
154            .load_all_pending_transactions()
155            .iter()
156            .map(|t| *t.digest())
157            .collect();
158        assert_eq!(
159            txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
160            loaded_tx_digests
161        );
162
163        for tx in txes.iter().take(5) {
164            pending_txes.finish_transaction(tx.digest()).unwrap();
165        }
166        let loaded_tx_digests: HashSet<_> = pending_txes
167            .load_all_pending_transactions()
168            .iter()
169            .map(|t| *t.digest())
170            .collect();
171        assert_eq!(
172            txes.iter()
173                .skip(5)
174                .map(|t| *t.digest())
175                .collect::<HashSet<_>>(),
176            loaded_tx_digests
177        );
178
179        Ok(())
180    }
181}