iota_storage/
write_path_pending_tx_log.rs1use std::path::PathBuf;
13
14use iota_types::{
15 base_types::TransactionDigest,
16 crypto::EmptySignInfo,
17 error::{IotaError, IotaResult},
18 message_envelope::TrustedEnvelope,
19 transaction::{SenderSignedData, VerifiedTransaction},
20};
21use tracing::instrument;
22use typed_store::{
23 DBMapUtils,
24 rocks::{DBMap, MetricConf},
25 traits::{Map, TableSummary, TypedStoreDebug},
26};
27
28use crate::mutex_table::MutexTable;
29
30pub type IsFirstRecord = bool;
31const NUM_SHARDS: usize = 4096;
32
33#[derive(DBMapUtils)]
34struct WritePathPendingTransactionTable {
35 logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
36}
37
38pub struct WritePathPendingTransactionLog {
39 pending_transactions: WritePathPendingTransactionTable,
40 mutex_table: MutexTable<TransactionDigest>,
41}
42
43impl WritePathPendingTransactionLog {
44 pub fn new(path: PathBuf) -> Self {
45 let pending_transactions = WritePathPendingTransactionTable::open_tables_read_write(
46 path,
47 MetricConf::new("pending_tx_log"),
48 None,
49 None,
50 );
51 Self {
52 pending_transactions,
53 mutex_table: MutexTable::new(NUM_SHARDS),
54 }
55 }
56
57 #[instrument(level = "trace", skip_all, fields(tx_digest = ?tx.digest()))]
63 pub async fn write_pending_transaction_maybe(
64 &self,
65 tx: &VerifiedTransaction,
66 ) -> IotaResult<IsFirstRecord> {
67 let tx_digest = tx.digest();
68 let _guard = self.mutex_table.acquire_lock(*tx_digest);
69 if self.pending_transactions.logs.contains_key(tx_digest)? {
70 Ok(false)
71 } else {
72 self.pending_transactions
73 .logs
74 .insert(tx_digest, tx.serializable_ref())?;
75 Ok(true)
76 }
77 }
78
79 pub fn finish_transaction(&self, tx: &TransactionDigest) -> IotaResult {
89 let mut write_batch = self.pending_transactions.logs.batch();
90 write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
91 write_batch.write().map_err(IotaError::from)
92 }
93
94 pub fn load_all_pending_transactions(&self) -> IotaResult<Vec<VerifiedTransaction>> {
95 Ok(self
96 .pending_transactions
97 .logs
98 .safe_iter()
99 .map(|item| item.map(|(_tx_digest, tx)| VerifiedTransaction::from(tx)))
100 .collect::<Result<Vec<_>, _>>()?)
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use std::collections::HashSet;
107
108 use anyhow;
109 use iota_types::utils::create_fake_transaction;
110
111 use super::*;
112
113 #[tokio::test]
114 async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
115 let temp_dir = tempfile::tempdir().unwrap();
116 let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
117 let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
118 let tx_digest = *tx.digest();
119 assert!(
120 pending_txes
121 .write_pending_transaction_maybe(&tx)
122 .await
123 .unwrap()
124 );
125 assert!(
127 !pending_txes
128 .write_pending_transaction_maybe(&tx)
129 .await
130 .unwrap()
131 );
132
133 let loaded_txes = pending_txes.load_all_pending_transactions()?;
134 assert_eq!(vec![tx], loaded_txes);
135
136 pending_txes.finish_transaction(&tx_digest).unwrap();
137 let loaded_txes = pending_txes.load_all_pending_transactions()?;
138 assert!(loaded_txes.is_empty());
139
140 pending_txes.finish_transaction(&tx_digest).unwrap();
142
143 let txes: Vec<_> = (0..10)
145 .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
146 .collect();
147 for tx in txes.iter().take(10) {
148 assert!(
149 pending_txes
150 .write_pending_transaction_maybe(tx)
151 .await
152 .unwrap()
153 );
154 }
155 let loaded_tx_digests: HashSet<_> = pending_txes
156 .load_all_pending_transactions()?
157 .iter()
158 .map(|t| *t.digest())
159 .collect();
160 assert_eq!(
161 txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
162 loaded_tx_digests
163 );
164
165 for tx in txes.iter().take(5) {
166 pending_txes.finish_transaction(tx.digest()).unwrap();
167 }
168 let loaded_tx_digests: HashSet<_> = pending_txes
169 .load_all_pending_transactions()?
170 .iter()
171 .map(|t| *t.digest())
172 .collect();
173 assert_eq!(
174 txes.iter()
175 .skip(5)
176 .map(|t| *t.digest())
177 .collect::<HashSet<_>>(),
178 loaded_tx_digests
179 );
180
181 Ok(())
182 }
183}