iota_storage/
write_path_pending_tx_log.rs1use std::path::PathBuf;
13
14use iota_types::{
15 base_types::TransactionDigest,
16 crypto::EmptySignInfo,
17 error::{IotaError, IotaResult},
18 message_envelope::TrustedEnvelope,
19 transaction::{SenderSignedData, VerifiedTransaction},
20};
21use typed_store::{
22 DBMapUtils,
23 rocks::{DBMap, MetricConf},
24 traits::{Map, TableSummary, TypedStoreDebug},
25};
26
27pub type IsFirstRecord = bool;
28
29#[derive(DBMapUtils)]
30struct WritePathPendingTransactionTable {
31 logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
32}
33
34pub struct WritePathPendingTransactionLog {
35 pending_transactions: WritePathPendingTransactionTable,
36}
37
38impl WritePathPendingTransactionLog {
39 pub fn new(path: PathBuf) -> Self {
40 let pending_transactions = WritePathPendingTransactionTable::open_tables_transactional(
41 path,
42 MetricConf::new("pending_tx_log"),
43 None,
44 None,
45 );
46 Self {
47 pending_transactions,
48 }
49 }
50
51 pub async fn write_pending_transaction_maybe(
57 &self,
58 tx: &VerifiedTransaction,
59 ) -> IotaResult<IsFirstRecord> {
60 let tx_digest = tx.digest();
61 let mut transaction = self.pending_transactions.logs.transaction()?;
62 if transaction
63 .get(&self.pending_transactions.logs, tx_digest)?
64 .is_some()
65 {
66 return Ok(false);
67 }
68 transaction.insert_batch(
69 &self.pending_transactions.logs,
70 [(tx_digest, tx.serializable_ref())],
71 )?;
72 let result = transaction.commit();
73 Ok(result.is_ok())
74 }
75
76 pub fn finish_transaction(&self, tx: &TransactionDigest) -> IotaResult {
86 let mut write_batch = self.pending_transactions.logs.batch();
87 write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
88 write_batch.write().map_err(IotaError::from)
89 }
90
91 pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
92 self.pending_transactions
93 .logs
94 .unbounded_iter()
95 .map(|(_tx_digest, tx)| VerifiedTransaction::from(tx))
96 .collect()
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use std::collections::HashSet;
103
104 use anyhow;
105 use iota_types::utils::create_fake_transaction;
106
107 use super::*;
108
109 #[tokio::test]
110 async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
111 let temp_dir = tempfile::tempdir().unwrap();
112 let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
113 let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
114 let tx_digest = *tx.digest();
115 assert!(
116 pending_txes
117 .write_pending_transaction_maybe(&tx)
118 .await
119 .unwrap()
120 );
121 assert!(
123 !pending_txes
124 .write_pending_transaction_maybe(&tx)
125 .await
126 .unwrap()
127 );
128
129 let loaded_txes = pending_txes.load_all_pending_transactions();
130 assert_eq!(vec![tx], loaded_txes);
131
132 pending_txes.finish_transaction(&tx_digest).unwrap();
133 let loaded_txes = pending_txes.load_all_pending_transactions();
134 assert!(loaded_txes.is_empty());
135
136 pending_txes.finish_transaction(&tx_digest).unwrap();
138
139 let txes: Vec<_> = (0..10)
141 .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
142 .collect();
143 for tx in txes.iter().take(10) {
144 assert!(
145 pending_txes
146 .write_pending_transaction_maybe(tx)
147 .await
148 .unwrap()
149 );
150 }
151 let loaded_tx_digests: HashSet<_> = pending_txes
152 .load_all_pending_transactions()
153 .iter()
154 .map(|t| *t.digest())
155 .collect();
156 assert_eq!(
157 txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
158 loaded_tx_digests
159 );
160
161 for tx in txes.iter().take(5) {
162 pending_txes.finish_transaction(tx.digest()).unwrap();
163 }
164 let loaded_tx_digests: HashSet<_> = pending_txes
165 .load_all_pending_transactions()
166 .iter()
167 .map(|t| *t.digest())
168 .collect();
169 assert_eq!(
170 txes.iter()
171 .skip(5)
172 .map(|t| *t.digest())
173 .collect::<HashSet<_>>(),
174 loaded_tx_digests
175 );
176
177 Ok(())
178 }
179}