iota_storage/
write_path_pending_tx_log.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! WritePathPendingTransactionLog is used in the transaction write path (e.g.
6//! in TransactionOrchestrator) for transaction submission processing. It helps
7//! to achieve:
8//! 1. At one time, a transaction is only processed once.
9//! 2. When Fullnode crashes and restarts, the pending transaction will be
10//!    loaded and retried.
11
12use std::path::PathBuf;
13
14use iota_types::{
15    base_types::TransactionDigest,
16    crypto::EmptySignInfo,
17    error::{IotaError, IotaResult},
18    message_envelope::TrustedEnvelope,
19    transaction::{SenderSignedData, VerifiedTransaction},
20};
21use tracing::instrument;
22use typed_store::{
23    DBMapUtils,
24    rocks::{DBMap, MetricConf},
25    traits::{Map, TableSummary, TypedStoreDebug},
26};
27
28use crate::mutex_table::MutexTable;
29
30pub type IsFirstRecord = bool;
31const NUM_SHARDS: usize = 4096;
32
33#[derive(DBMapUtils)]
34struct WritePathPendingTransactionTable {
35    logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
36}
37
38pub struct WritePathPendingTransactionLog {
39    pending_transactions: WritePathPendingTransactionTable,
40    mutex_table: MutexTable<TransactionDigest>,
41}
42
43impl WritePathPendingTransactionLog {
44    pub fn new(path: PathBuf) -> Self {
45        let pending_transactions = WritePathPendingTransactionTable::open_tables_read_write(
46            path,
47            MetricConf::new("pending_tx_log"),
48            None,
49            None,
50        );
51        Self {
52            pending_transactions,
53            mutex_table: MutexTable::new(NUM_SHARDS),
54        }
55    }
56
57    // Returns whether the table currently has this transaction in record.
58    // If not, write the transaction and return true; otherwise return false.
59    // Because the record will be cleaned up when the transaction finishes,
60    // even when it returns true, the callsite of this function should check
61    // the transaction status before doing anything, to avoid duplicates.
62    #[instrument(level = "trace", skip_all, fields(tx_digest = ?tx.digest()))]
63    pub async fn write_pending_transaction_maybe(
64        &self,
65        tx: &VerifiedTransaction,
66    ) -> IotaResult<IsFirstRecord> {
67        let tx_digest = tx.digest();
68        let _guard = self.mutex_table.acquire_lock(*tx_digest);
69        if self.pending_transactions.logs.contains_key(tx_digest)? {
70            Ok(false)
71        } else {
72            self.pending_transactions
73                .logs
74                .insert(tx_digest, tx.serializable_ref())?;
75            Ok(true)
76        }
77    }
78
79    // This function does not need to be behind a lock because:
80    // 1. there could be more than one callsite but the deletion is idempotent.
81    // 2. it does not race with the insert (`write_pending_transaction_maybe`) in a
82    //    way that we care. 2.a. for one transaction, `finish_transaction` shouldn't
83    //    predate `write_pending_transaction_maybe`. 2.b  for concurrent requests of
84    //    one transaction, a call to this function may happen in between hence
85    //    making the second request thinks it is the first record. It's preventable
86    //    by checking this transaction again after the call of
87    //    `write_pending_transaction_maybe`.
88    pub fn finish_transaction(&self, tx: &TransactionDigest) -> IotaResult {
89        let mut write_batch = self.pending_transactions.logs.batch();
90        write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
91        write_batch.write().map_err(IotaError::from)
92    }
93
94    pub fn load_all_pending_transactions(&self) -> IotaResult<Vec<VerifiedTransaction>> {
95        Ok(self
96            .pending_transactions
97            .logs
98            .safe_iter()
99            .map(|item| item.map(|(_tx_digest, tx)| VerifiedTransaction::from(tx)))
100            .collect::<Result<Vec<_>, _>>()?)
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use std::collections::HashSet;
107
108    use anyhow;
109    use iota_types::utils::create_fake_transaction;
110
111    use super::*;
112
113    #[tokio::test]
114    async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
115        let temp_dir = tempfile::tempdir().unwrap();
116        let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
117        let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
118        let tx_digest = *tx.digest();
119        assert!(
120            pending_txes
121                .write_pending_transaction_maybe(&tx)
122                .await
123                .unwrap()
124        );
125        // The second write will return false
126        assert!(
127            !pending_txes
128                .write_pending_transaction_maybe(&tx)
129                .await
130                .unwrap()
131        );
132
133        let loaded_txes = pending_txes.load_all_pending_transactions()?;
134        assert_eq!(vec![tx], loaded_txes);
135
136        pending_txes.finish_transaction(&tx_digest).unwrap();
137        let loaded_txes = pending_txes.load_all_pending_transactions()?;
138        assert!(loaded_txes.is_empty());
139
140        // It's ok to finish an already finished transaction
141        pending_txes.finish_transaction(&tx_digest).unwrap();
142
143        // Test writing and finishing more transactions
144        let txes: Vec<_> = (0..10)
145            .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
146            .collect();
147        for tx in txes.iter().take(10) {
148            assert!(
149                pending_txes
150                    .write_pending_transaction_maybe(tx)
151                    .await
152                    .unwrap()
153            );
154        }
155        let loaded_tx_digests: HashSet<_> = pending_txes
156            .load_all_pending_transactions()?
157            .iter()
158            .map(|t| *t.digest())
159            .collect();
160        assert_eq!(
161            txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
162            loaded_tx_digests
163        );
164
165        for tx in txes.iter().take(5) {
166            pending_txes.finish_transaction(tx.digest()).unwrap();
167        }
168        let loaded_tx_digests: HashSet<_> = pending_txes
169            .load_all_pending_transactions()?
170            .iter()
171            .map(|t| *t.digest())
172            .collect();
173        assert_eq!(
174            txes.iter()
175                .skip(5)
176                .map(|t| *t.digest())
177                .collect::<HashSet<_>>(),
178            loaded_tx_digests
179        );
180
181        Ok(())
182    }
183}