1use std::path::Path;
6
7use iota_types::{
8 base_types::{IotaAddress, ObjectID},
9 transaction::TransactionData,
10};
11use serde::{Deserialize, Serialize};
12use tracing::info;
13use typed_store::{
14 DBMapUtils, Map, TypedStoreError,
15 rocks::DBMap,
16 traits::{TableSummary, TypedStoreDebug},
17};
18use uuid::Uuid;
19
20#[derive(DBMapUtils, Clone)]
28pub struct WriteAheadLog {
29 pub log: DBMap<ObjectID, Entry>,
30}
31
32#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
33pub struct Entry {
34 pub uuid: uuid::Bytes,
35 pub recipient: IotaAddress,
37 pub tx: TransactionData,
38 pub retry_count: u64,
39 pub in_flight: bool,
40}
41
42impl WriteAheadLog {
43 pub(crate) fn open(path: &Path) -> Self {
44 Self::open_tables_read_write(
45 path.to_path_buf(),
46 typed_store::rocks::MetricConf::new("faucet_write_ahead_log"),
47 None,
48 None,
49 )
50 }
51
52 pub(crate) fn reserve(
56 &mut self,
57 uuid: Uuid,
58 coin: ObjectID,
59 recipient: IotaAddress,
60 tx: TransactionData,
61 ) -> Result<(), TypedStoreError> {
62 if self.log.contains_key(&coin)? {
63 return Err(TypedStoreError::Serialization(format!(
66 "Duplicate WAL entry for coin {coin:?}",
67 )));
68 }
69
70 let uuid = *uuid.as_bytes();
71 self.log.insert(
72 &coin,
73 &Entry {
74 uuid,
75 recipient,
76 tx,
77 retry_count: 0,
78 in_flight: true,
79 },
80 )
81 }
82
83 pub(crate) fn reclaim(&self, coin: ObjectID) -> Result<Option<Entry>, TypedStoreError> {
87 match self.log.get(&coin) {
88 Ok(entry) => Ok(entry),
89 Err(TypedStoreError::Serialization(_)) => {
90 self.log
94 .remove(&coin)
95 .unwrap_or_else(|_| panic!("Coin: {coin:?} unable to be removed from log."));
96 Ok(None)
97 }
98 Err(err) => Err(err),
99 }
100 }
101
102 pub(crate) fn commit(&mut self, coin: ObjectID) -> Result<(), TypedStoreError> {
105 self.log.remove(&coin)
106 }
107
108 pub(crate) fn increment_retry_count(&mut self, coin: ObjectID) -> Result<(), TypedStoreError> {
109 if let Some(mut entry) = self.log.get(&coin)? {
110 entry.retry_count += 1;
111 self.log.insert(&coin, &entry)?;
112 }
113 Ok(())
114 }
115
116 pub(crate) fn set_in_flight(
117 &mut self,
118 coin: ObjectID,
119 bool: bool,
120 ) -> Result<(), TypedStoreError> {
121 if let Some(mut entry) = self.log.get(&coin)? {
122 entry.in_flight = bool;
123 self.log.insert(&coin, &entry)?;
124 } else {
125 info!(
126 ?coin,
127 "Attempted to set inflight a coin that was not in the WAL."
128 );
129
130 return Err(TypedStoreError::RocksDB(format!(
131 "Coin object {coin:?} not found in WAL."
132 )));
133 }
134 Ok(())
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use iota_types::{
141 base_types::{ObjectRef, random_object_ref},
142 transaction::TEST_ONLY_GAS_UNIT_FOR_TRANSFER,
143 };
144
145 use super::*;
146
147 #[tokio::test]
148 async fn reserve_reclaim_reclaim() {
149 let tmp = tempfile::tempdir().unwrap();
150 let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
151
152 let uuid = Uuid::new_v4();
153 let coin = random_object_ref();
154 let (recv, tx) = random_request(coin);
155
156 assert!(wal.reserve(uuid, coin.0, recv, tx.clone()).is_ok());
157
158 let Some(entry) = wal.reclaim(coin.0).unwrap() else {
160 panic!("Entry not found for {}", coin.0);
161 };
162
163 assert_eq!(uuid, Uuid::from_bytes(entry.uuid));
164 assert_eq!(recv, entry.recipient);
165 assert_eq!(tx, entry.tx);
166
167 let Some(entry) = wal.reclaim(coin.0).unwrap() else {
169 panic!("Entry not found for {}", coin.0);
170 };
171
172 assert_eq!(uuid, Uuid::from_bytes(entry.uuid));
173 assert_eq!(recv, entry.recipient);
174 assert_eq!(tx, entry.tx);
175 }
176
177 #[tokio::test]
178 async fn test_increment_wal() {
179 let tmp = tempfile::tempdir().unwrap();
180 let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
181 let uuid = Uuid::new_v4();
182 let coin = random_object_ref();
183 let (recv0, tx0) = random_request(coin);
184
185 wal.reserve(uuid, coin.0, recv0, tx0).unwrap();
187 wal.increment_retry_count(coin.0).unwrap();
188
189 let entry = wal.reclaim(coin.0).unwrap().unwrap();
190 assert_eq!(entry.retry_count, 1);
191 }
192
193 #[tokio::test]
194 async fn reserve_reserve() {
195 let tmp = tempfile::tempdir().unwrap();
196 let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
197
198 let uuid = Uuid::new_v4();
199 let coin = random_object_ref();
200 let (recv0, tx0) = random_request(coin);
201 let (recv1, tx1) = random_request(coin);
202
203 wal.reserve(uuid, coin.0, recv0, tx0).unwrap();
205
206 assert!(matches!(
208 wal.reserve(uuid, coin.0, recv1, tx1),
209 Err(TypedStoreError::Serialization(_)),
210 ));
211 }
212
213 #[tokio::test]
214 async fn reserve_reclaim_commit_reclaim() {
215 let tmp = tempfile::tempdir().unwrap();
216 let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
217
218 let uuid = Uuid::new_v4();
219 let coin = random_object_ref();
220 let (recv, tx) = random_request(coin);
221
222 wal.reserve(uuid, coin.0, recv, tx.clone()).unwrap();
223
224 let Some(entry) = wal.reclaim(coin.0).unwrap() else {
226 panic!("Entry not found for {}", coin.0);
227 };
228
229 assert_eq!(uuid, Uuid::from_bytes(entry.uuid));
230 assert_eq!(recv, entry.recipient);
231 assert_eq!(tx, entry.tx);
232
233 wal.commit(coin.0).unwrap();
235
236 assert_eq!(Ok(None), wal.reclaim(coin.0));
238 }
239
240 #[tokio::test]
241 async fn reserve_commit_reserve() {
242 let tmp = tempfile::tempdir().unwrap();
243 let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
244
245 let uuid = Uuid::new_v4();
246 let coin = random_object_ref();
247 let (recv0, tx0) = random_request(coin);
248 let (recv1, tx1) = random_request(coin);
249
250 wal.reserve(uuid, coin.0, recv0, tx0).unwrap();
252
253 wal.commit(coin.0).unwrap();
255
256 wal.reserve(uuid, coin.0, recv1, tx1).unwrap();
258 }
259
260 fn random_request(coin: ObjectRef) -> (IotaAddress, TransactionData) {
261 let gas_price = 1;
262 let send = IotaAddress::random_for_testing_only();
263 let recv = IotaAddress::random_for_testing_only();
264 (
265 recv,
266 TransactionData::new_pay_iota(
267 send,
268 vec![coin],
269 vec![recv],
270 vec![1000],
271 coin,
272 gas_price * TEST_ONLY_GAS_UNIT_FOR_TRANSFER,
273 gas_price,
274 )
275 .unwrap(),
276 )
277 }
278}