iota_faucet/faucet/
write_ahead_log.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::Path;
6
7use iota_types::{
8    base_types::{IotaAddress, ObjectID},
9    transaction::TransactionData,
10};
11use serde::{Deserialize, Serialize};
12use tracing::info;
13use typed_store::{
14    DBMapUtils, Map, TypedStoreError,
15    rocks::DBMap,
16    traits::{TableSummary, TypedStoreDebug},
17};
18use uuid::Uuid;
19
20/// Persistent log of transactions paying out iota from the faucet, keyed by the
21/// coin serving the request.  Transactions are expected to be written to the
22/// log before they are sent to full-node, and removed after receiving a
23/// response back, before the coin becomes available for subsequent writes.
24///
25/// This allows the faucet to go down and back up, and not forget which requests
26/// were in-flight that it needs to confirm succeeded or failed.
27#[derive(DBMapUtils, Clone)]
28pub struct WriteAheadLog {
29    pub log: DBMap<ObjectID, Entry>,
30}
31
32#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
33pub struct Entry {
34    pub uuid: uuid::Bytes,
35    // TODO (jian): remove recipient
36    pub recipient: IotaAddress,
37    pub tx: TransactionData,
38    pub retry_count: u64,
39    pub in_flight: bool,
40}
41
42impl WriteAheadLog {
43    pub(crate) fn open(path: &Path) -> Self {
44        Self::open_tables_read_write(
45            path.to_path_buf(),
46            typed_store::rocks::MetricConf::new("faucet_write_ahead_log"),
47            None,
48            None,
49        )
50    }
51
52    /// Mark `coin` as reserved for transaction `tx` sending coin to
53    /// `recipient`. Fails if `coin` is already in the WAL pointing to an
54    /// existing transaction.
55    pub(crate) fn reserve(
56        &mut self,
57        uuid: Uuid,
58        coin: ObjectID,
59        recipient: IotaAddress,
60        tx: TransactionData,
61    ) -> Result<(), TypedStoreError> {
62        if self.log.contains_key(&coin)? {
63            // Don't permit multiple writes against the same coin
64            // TODO: Use a better error type than `TypedStoreError`.
65            return Err(TypedStoreError::Serialization(format!(
66                "Duplicate WAL entry for coin {coin:?}",
67            )));
68        }
69
70        let uuid = *uuid.as_bytes();
71        self.log.insert(
72            &coin,
73            &Entry {
74                uuid,
75                recipient,
76                tx,
77                retry_count: 0,
78                in_flight: true,
79            },
80        )
81    }
82
83    /// Check whether `coin` has a pending transaction in the WAL.  Returns
84    /// `Ok(Some(entry))` if a pending transaction exists, `Ok(None)` if
85    /// not, and `Err(_)` if there was an internal error accessing the WAL.
86    pub(crate) fn reclaim(&self, coin: ObjectID) -> Result<Option<Entry>, TypedStoreError> {
87        match self.log.get(&coin) {
88            Ok(entry) => Ok(entry),
89            Err(TypedStoreError::Serialization(_)) => {
90                // Remove bad log from the store, so we don't crash on start up, this can happen
91                // if we update the WAL Entry and have some leftover Entry from
92                // the WAL.
93                self.log
94                    .remove(&coin)
95                    .unwrap_or_else(|_| panic!("Coin: {coin:?} unable to be removed from log."));
96                Ok(None)
97            }
98            Err(err) => Err(err),
99        }
100    }
101
102    /// Indicate that the transaction in flight for `coin` has landed, and the
103    /// entry in the WAL can be removed.
104    pub(crate) fn commit(&mut self, coin: ObjectID) -> Result<(), TypedStoreError> {
105        self.log.remove(&coin)
106    }
107
108    pub(crate) fn increment_retry_count(&mut self, coin: ObjectID) -> Result<(), TypedStoreError> {
109        if let Some(mut entry) = self.log.get(&coin)? {
110            entry.retry_count += 1;
111            self.log.insert(&coin, &entry)?;
112        }
113        Ok(())
114    }
115
116    pub(crate) fn set_in_flight(
117        &mut self,
118        coin: ObjectID,
119        bool: bool,
120    ) -> Result<(), TypedStoreError> {
121        if let Some(mut entry) = self.log.get(&coin)? {
122            entry.in_flight = bool;
123            self.log.insert(&coin, &entry)?;
124        } else {
125            info!(
126                ?coin,
127                "Attempted to set inflight a coin that was not in the WAL."
128            );
129
130            return Err(TypedStoreError::RocksDB(format!(
131                "Coin object {coin:?} not found in WAL."
132            )));
133        }
134        Ok(())
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use iota_types::{
141        base_types::{ObjectRef, random_object_ref},
142        transaction::TEST_ONLY_GAS_UNIT_FOR_TRANSFER,
143    };
144
145    use super::*;
146
147    #[tokio::test]
148    async fn reserve_reclaim_reclaim() {
149        let tmp = tempfile::tempdir().unwrap();
150        let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
151
152        let uuid = Uuid::new_v4();
153        let coin = random_object_ref();
154        let (recv, tx) = random_request(coin);
155
156        assert!(wal.reserve(uuid, coin.0, recv, tx.clone()).is_ok());
157
158        // Reclaim once
159        let Some(entry) = wal.reclaim(coin.0).unwrap() else {
160            panic!("Entry not found for {}", coin.0);
161        };
162
163        assert_eq!(uuid, Uuid::from_bytes(entry.uuid));
164        assert_eq!(recv, entry.recipient);
165        assert_eq!(tx, entry.tx);
166
167        // Reclaim again, should still be there.
168        let Some(entry) = wal.reclaim(coin.0).unwrap() else {
169            panic!("Entry not found for {}", coin.0);
170        };
171
172        assert_eq!(uuid, Uuid::from_bytes(entry.uuid));
173        assert_eq!(recv, entry.recipient);
174        assert_eq!(tx, entry.tx);
175    }
176
177    #[tokio::test]
178    async fn test_increment_wal() {
179        let tmp = tempfile::tempdir().unwrap();
180        let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
181        let uuid = Uuid::new_v4();
182        let coin = random_object_ref();
183        let (recv0, tx0) = random_request(coin);
184
185        // First write goes through
186        wal.reserve(uuid, coin.0, recv0, tx0).unwrap();
187        wal.increment_retry_count(coin.0).unwrap();
188
189        let entry = wal.reclaim(coin.0).unwrap().unwrap();
190        assert_eq!(entry.retry_count, 1);
191    }
192
193    #[tokio::test]
194    async fn reserve_reserve() {
195        let tmp = tempfile::tempdir().unwrap();
196        let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
197
198        let uuid = Uuid::new_v4();
199        let coin = random_object_ref();
200        let (recv0, tx0) = random_request(coin);
201        let (recv1, tx1) = random_request(coin);
202
203        // First write goes through
204        wal.reserve(uuid, coin.0, recv0, tx0).unwrap();
205
206        // Second write fails because it tries to write to the same coin
207        assert!(matches!(
208            wal.reserve(uuid, coin.0, recv1, tx1),
209            Err(TypedStoreError::Serialization(_)),
210        ));
211    }
212
213    #[tokio::test]
214    async fn reserve_reclaim_commit_reclaim() {
215        let tmp = tempfile::tempdir().unwrap();
216        let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
217
218        let uuid = Uuid::new_v4();
219        let coin = random_object_ref();
220        let (recv, tx) = random_request(coin);
221
222        wal.reserve(uuid, coin.0, recv, tx.clone()).unwrap();
223
224        // Reclaim to show that the entry is there
225        let Some(entry) = wal.reclaim(coin.0).unwrap() else {
226            panic!("Entry not found for {}", coin.0);
227        };
228
229        assert_eq!(uuid, Uuid::from_bytes(entry.uuid));
230        assert_eq!(recv, entry.recipient);
231        assert_eq!(tx, entry.tx);
232
233        // Commit the transaction, which removes it from the log.
234        wal.commit(coin.0).unwrap();
235
236        // Expect it to now be gone
237        assert_eq!(Ok(None), wal.reclaim(coin.0));
238    }
239
240    #[tokio::test]
241    async fn reserve_commit_reserve() {
242        let tmp = tempfile::tempdir().unwrap();
243        let mut wal = WriteAheadLog::open(&tmp.path().join("wal"));
244
245        let uuid = Uuid::new_v4();
246        let coin = random_object_ref();
247        let (recv0, tx0) = random_request(coin);
248        let (recv1, tx1) = random_request(coin);
249
250        // Write the transaction
251        wal.reserve(uuid, coin.0, recv0, tx0).unwrap();
252
253        // Commit the transaction, which removes it from the log.
254        wal.commit(coin.0).unwrap();
255
256        // Write a fresh transaction, which should now pass
257        wal.reserve(uuid, coin.0, recv1, tx1).unwrap();
258    }
259
260    fn random_request(coin: ObjectRef) -> (IotaAddress, TransactionData) {
261        let gas_price = 1;
262        let send = IotaAddress::random_for_testing_only();
263        let recv = IotaAddress::random_for_testing_only();
264        (
265            recv,
266            TransactionData::new_pay_iota(
267                send,
268                vec![coin],
269                vec![recv],
270                vec![1000],
271                coin,
272                gas_price * TEST_ONLY_GAS_UNIT_FOR_TRANSFER,
273                gas_price,
274            )
275            .unwrap(),
276        )
277    }
278}