iota_bridge/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, path::Path, sync::Arc};
6
7use iota_types::{Identifier, event::EventID};
8use typed_store::{
9    DBMapUtils, Map,
10    rocks::{DBMap, MetricConf},
11    traits::{TableSummary, TypedStoreDebug},
12};
13
14use crate::{
15    error::{BridgeError, BridgeResult},
16    types::{BridgeAction, BridgeActionDigest},
17};
18
19#[derive(DBMapUtils)]
20pub struct BridgeOrchestratorTables {
21    /// pending BridgeActions that orchestrator received but not yet executed
22    pub(crate) pending_actions: DBMap<BridgeActionDigest, BridgeAction>,
23    /// module identifier to the last processed EventID
24    pub(crate) iota_syncer_cursors: DBMap<Identifier, EventID>,
25    /// contract address to the last processed block
26    pub(crate) eth_syncer_cursors: DBMap<ethers::types::Address, u64>,
27}
28
29impl BridgeOrchestratorTables {
30    pub fn new(path: &Path) -> Arc<Self> {
31        Arc::new(Self::open_tables_read_write(
32            path.to_path_buf(),
33            MetricConf::new("bridge"),
34            None,
35            None,
36        ))
37    }
38
39    pub(crate) fn insert_pending_actions(&self, actions: &[BridgeAction]) -> BridgeResult<()> {
40        let mut batch = self.pending_actions.batch();
41        batch
42            .insert_batch(
43                &self.pending_actions,
44                actions.iter().map(|a| (a.digest(), a)),
45            )
46            .map_err(|e| {
47                BridgeError::Storage(format!("Couldn't insert into pending_actions: {:?}", e))
48            })?;
49        batch
50            .write()
51            .map_err(|e| BridgeError::Storage(format!("Couldn't write batch: {:?}", e)))
52    }
53
54    pub(crate) fn remove_pending_actions(
55        &self,
56        actions: &[BridgeActionDigest],
57    ) -> BridgeResult<()> {
58        let mut batch = self.pending_actions.batch();
59        batch
60            .delete_batch(&self.pending_actions, actions)
61            .map_err(|e| {
62                BridgeError::Storage(format!("Couldn't delete from pending_actions: {:?}", e))
63            })?;
64        batch
65            .write()
66            .map_err(|e| BridgeError::Storage(format!("Couldn't write batch: {:?}", e)))
67    }
68
69    pub(crate) fn update_iota_event_cursor(
70        &self,
71        module: Identifier,
72        cursor: EventID,
73    ) -> BridgeResult<()> {
74        let mut batch = self.iota_syncer_cursors.batch();
75
76        batch
77            .insert_batch(&self.iota_syncer_cursors, [(module, cursor)])
78            .map_err(|e| {
79                BridgeError::Storage(format!("Couldn't insert into iota_syncer_cursors: {:?}", e))
80            })?;
81        batch
82            .write()
83            .map_err(|e| BridgeError::Storage(format!("Couldn't write batch: {:?}", e)))
84    }
85
86    pub(crate) fn update_eth_event_cursor(
87        &self,
88        contract_address: ethers::types::Address,
89        cursor: u64,
90    ) -> BridgeResult<()> {
91        let mut batch = self.eth_syncer_cursors.batch();
92
93        batch
94            .insert_batch(&self.eth_syncer_cursors, [(contract_address, cursor)])
95            .map_err(|e| {
96                BridgeError::Storage(format!("Couldn't insert into eth_syncer_cursors: {:?}", e))
97            })?;
98        batch
99            .write()
100            .map_err(|e| BridgeError::Storage(format!("Couldn't write batch: {:?}", e)))
101    }
102
103    pub fn get_all_pending_actions(&self) -> HashMap<BridgeActionDigest, BridgeAction> {
104        self.pending_actions.unbounded_iter().collect()
105    }
106
107    pub fn get_iota_event_cursors(
108        &self,
109        identifiers: &[Identifier],
110    ) -> BridgeResult<Vec<Option<EventID>>> {
111        self.iota_syncer_cursors
112            .multi_get(identifiers)
113            .map_err(|e| BridgeError::Storage(format!("Couldn't get iota_syncer_cursors: {:?}", e)))
114    }
115
116    pub fn get_eth_event_cursors(
117        &self,
118        contract_addresses: &[ethers::types::Address],
119    ) -> BridgeResult<Vec<Option<u64>>> {
120        self.eth_syncer_cursors
121            .multi_get(contract_addresses)
122            .map_err(|e| BridgeError::Storage(format!("Couldn't get iota_syncer_cursors: {:?}", e)))
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use std::str::FromStr;
129
130    use iota_types::digests::TransactionDigest;
131
132    use super::*;
133    use crate::test_utils::get_test_iota_to_eth_bridge_action;
134
135    // async: existing runtime is required with typed-store
136    #[tokio::test]
137    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
138    async fn test_bridge_storage_basic() {
139        let temp_dir = tempfile::tempdir().unwrap();
140        let store = BridgeOrchestratorTables::new(temp_dir.path());
141
142        let action1 = get_test_iota_to_eth_bridge_action(
143            None,
144            Some(0),
145            Some(99),
146            Some(10000),
147            None,
148            None,
149            None,
150        );
151
152        let action2 = get_test_iota_to_eth_bridge_action(
153            None,
154            Some(2),
155            Some(100),
156            Some(10000),
157            None,
158            None,
159            None,
160        );
161
162        // in the beginning it's empty
163        let actions = store.get_all_pending_actions();
164        assert!(actions.is_empty());
165
166        // remove non existing entry is ok
167        store.remove_pending_actions(&[action1.digest()]).unwrap();
168
169        store
170            .insert_pending_actions(&vec![action1.clone(), action2.clone()])
171            .unwrap();
172
173        let actions = store.get_all_pending_actions();
174        assert_eq!(
175            actions,
176            HashMap::from_iter(vec![
177                (action1.digest(), action1.clone()),
178                (action2.digest(), action2.clone())
179            ])
180        );
181
182        // insert an existing action is ok
183        store.insert_pending_actions(&[action1.clone()]).unwrap();
184        let actions = store.get_all_pending_actions();
185        assert_eq!(
186            actions,
187            HashMap::from_iter(vec![
188                (action1.digest(), action1.clone()),
189                (action2.digest(), action2.clone())
190            ])
191        );
192
193        // remove action 2
194        store.remove_pending_actions(&[action2.digest()]).unwrap();
195        let actions = store.get_all_pending_actions();
196        assert_eq!(
197            actions,
198            HashMap::from_iter(vec![(action1.digest(), action1.clone())])
199        );
200
201        // remove action 1
202        store.remove_pending_actions(&[action1.digest()]).unwrap();
203        let actions = store.get_all_pending_actions();
204        assert!(actions.is_empty());
205
206        // update eth event cursor
207        let eth_contract_address = ethers::types::Address::random();
208        let eth_block_num = 199999u64;
209        assert!(
210            store
211                .get_eth_event_cursors(&[eth_contract_address])
212                .unwrap()[0]
213                .is_none()
214        );
215        store
216            .update_eth_event_cursor(eth_contract_address, eth_block_num)
217            .unwrap();
218        assert_eq!(
219            store
220                .get_eth_event_cursors(&[eth_contract_address])
221                .unwrap()[0]
222                .unwrap(),
223            eth_block_num
224        );
225
226        // update iota event cursor
227        let iota_module = Identifier::from_str("test").unwrap();
228        let iota_cursor = EventID {
229            tx_digest: TransactionDigest::random(),
230            event_seq: 1,
231        };
232        assert!(
233            store
234                .get_iota_event_cursors(&[iota_module.clone()])
235                .unwrap()[0]
236                .is_none()
237        );
238        store
239            .update_iota_event_cursor(iota_module.clone(), iota_cursor)
240            .unwrap();
241        assert_eq!(
242            store
243                .get_iota_event_cursors(&[iota_module.clone()])
244                .unwrap()[0]
245                .unwrap(),
246            iota_cursor
247        );
248    }
249}