iota_storage/
write_path_pending_tx_log.rs1use std::path::PathBuf;
13
14use iota_types::{
15 base_types::TransactionDigest,
16 crypto::EmptySignInfo,
17 error::{IotaError, IotaResult},
18 message_envelope::TrustedEnvelope,
19 transaction::{SenderSignedData, VerifiedTransaction},
20};
21use tracing::instrument;
22use typed_store::{
23 DBMapUtils,
24 rocks::{DBMap, MetricConf},
25 traits::{Map, TableSummary, TypedStoreDebug},
26};
27
28pub type IsFirstRecord = bool;
29
30#[derive(DBMapUtils)]
31struct WritePathPendingTransactionTable {
32 logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
33}
34
35pub struct WritePathPendingTransactionLog {
36 pending_transactions: WritePathPendingTransactionTable,
37}
38
39impl WritePathPendingTransactionLog {
40 pub fn new(path: PathBuf) -> Self {
41 let pending_transactions = WritePathPendingTransactionTable::open_tables_transactional(
42 path,
43 MetricConf::new("pending_tx_log"),
44 None,
45 None,
46 );
47 Self {
48 pending_transactions,
49 }
50 }
51
52 #[instrument(level = "trace", skip_all, fields(tx_digest = ?tx.digest()))]
58 pub async fn write_pending_transaction_maybe(
59 &self,
60 tx: &VerifiedTransaction,
61 ) -> IotaResult<IsFirstRecord> {
62 let tx_digest = tx.digest();
63 let mut transaction = self.pending_transactions.logs.transaction()?;
64 if transaction
65 .get(&self.pending_transactions.logs, tx_digest)?
66 .is_some()
67 {
68 return Ok(false);
69 }
70 transaction.insert_batch(
71 &self.pending_transactions.logs,
72 [(tx_digest, tx.serializable_ref())],
73 )?;
74 let result = transaction.commit();
75 Ok(result.is_ok())
76 }
77
78 pub fn finish_transaction(&self, tx: &TransactionDigest) -> IotaResult {
88 let mut write_batch = self.pending_transactions.logs.batch();
89 write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
90 write_batch.write().map_err(IotaError::from)
91 }
92
93 pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
94 self.pending_transactions
95 .logs
96 .unbounded_iter()
97 .map(|(_tx_digest, tx)| VerifiedTransaction::from(tx))
98 .collect()
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use std::collections::HashSet;
105
106 use anyhow;
107 use iota_types::utils::create_fake_transaction;
108
109 use super::*;
110
111 #[tokio::test]
112 async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
113 let temp_dir = tempfile::tempdir().unwrap();
114 let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
115 let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
116 let tx_digest = *tx.digest();
117 assert!(
118 pending_txes
119 .write_pending_transaction_maybe(&tx)
120 .await
121 .unwrap()
122 );
123 assert!(
125 !pending_txes
126 .write_pending_transaction_maybe(&tx)
127 .await
128 .unwrap()
129 );
130
131 let loaded_txes = pending_txes.load_all_pending_transactions();
132 assert_eq!(vec![tx], loaded_txes);
133
134 pending_txes.finish_transaction(&tx_digest).unwrap();
135 let loaded_txes = pending_txes.load_all_pending_transactions();
136 assert!(loaded_txes.is_empty());
137
138 pending_txes.finish_transaction(&tx_digest).unwrap();
140
141 let txes: Vec<_> = (0..10)
143 .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
144 .collect();
145 for tx in txes.iter().take(10) {
146 assert!(
147 pending_txes
148 .write_pending_transaction_maybe(tx)
149 .await
150 .unwrap()
151 );
152 }
153 let loaded_tx_digests: HashSet<_> = pending_txes
154 .load_all_pending_transactions()
155 .iter()
156 .map(|t| *t.digest())
157 .collect();
158 assert_eq!(
159 txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
160 loaded_tx_digests
161 );
162
163 for tx in txes.iter().take(5) {
164 pending_txes.finish_transaction(tx.digest()).unwrap();
165 }
166 let loaded_tx_digests: HashSet<_> = pending_txes
167 .load_all_pending_transactions()
168 .iter()
169 .map(|t| *t.digest())
170 .collect();
171 assert_eq!(
172 txes.iter()
173 .skip(5)
174 .map(|t| *t.digest())
175 .collect::<HashSet<_>>(),
176 loaded_tx_digests
177 );
178
179 Ok(())
180 }
181}