1use std::{collections::HashMap, path::Path, sync::Arc};
6
7use iota_types::{Identifier, event::EventID};
8use typed_store::{
9 DBMapUtils, Map,
10 rocks::{DBMap, MetricConf},
11 traits::{TableSummary, TypedStoreDebug},
12};
13
14use crate::{
15 error::{BridgeError, BridgeResult},
16 types::{BridgeAction, BridgeActionDigest},
17};
18
19#[derive(DBMapUtils)]
20pub struct BridgeOrchestratorTables {
21 pub(crate) pending_actions: DBMap<BridgeActionDigest, BridgeAction>,
23 pub(crate) iota_syncer_cursors: DBMap<Identifier, EventID>,
25 pub(crate) eth_syncer_cursors: DBMap<ethers::types::Address, u64>,
27}
28
29impl BridgeOrchestratorTables {
30 pub fn new(path: &Path) -> Arc<Self> {
31 Arc::new(Self::open_tables_read_write(
32 path.to_path_buf(),
33 MetricConf::new("bridge"),
34 None,
35 None,
36 ))
37 }
38
39 pub(crate) fn insert_pending_actions(&self, actions: &[BridgeAction]) -> BridgeResult<()> {
40 let mut batch = self.pending_actions.batch();
41 batch
42 .insert_batch(
43 &self.pending_actions,
44 actions.iter().map(|a| (a.digest(), a)),
45 )
46 .map_err(|e| {
47 BridgeError::Storage(format!("Couldn't insert into pending_actions: {:?}", e))
48 })?;
49 batch
50 .write()
51 .map_err(|e| BridgeError::Storage(format!("Couldn't write batch: {:?}", e)))
52 }
53
54 pub(crate) fn remove_pending_actions(
55 &self,
56 actions: &[BridgeActionDigest],
57 ) -> BridgeResult<()> {
58 let mut batch = self.pending_actions.batch();
59 batch
60 .delete_batch(&self.pending_actions, actions)
61 .map_err(|e| {
62 BridgeError::Storage(format!("Couldn't delete from pending_actions: {:?}", e))
63 })?;
64 batch
65 .write()
66 .map_err(|e| BridgeError::Storage(format!("Couldn't write batch: {:?}", e)))
67 }
68
69 pub(crate) fn update_iota_event_cursor(
70 &self,
71 module: Identifier,
72 cursor: EventID,
73 ) -> BridgeResult<()> {
74 let mut batch = self.iota_syncer_cursors.batch();
75
76 batch
77 .insert_batch(&self.iota_syncer_cursors, [(module, cursor)])
78 .map_err(|e| {
79 BridgeError::Storage(format!("Couldn't insert into iota_syncer_cursors: {:?}", e))
80 })?;
81 batch
82 .write()
83 .map_err(|e| BridgeError::Storage(format!("Couldn't write batch: {:?}", e)))
84 }
85
86 pub(crate) fn update_eth_event_cursor(
87 &self,
88 contract_address: ethers::types::Address,
89 cursor: u64,
90 ) -> BridgeResult<()> {
91 let mut batch = self.eth_syncer_cursors.batch();
92
93 batch
94 .insert_batch(&self.eth_syncer_cursors, [(contract_address, cursor)])
95 .map_err(|e| {
96 BridgeError::Storage(format!("Couldn't insert into eth_syncer_cursors: {:?}", e))
97 })?;
98 batch
99 .write()
100 .map_err(|e| BridgeError::Storage(format!("Couldn't write batch: {:?}", e)))
101 }
102
103 pub fn get_all_pending_actions(&self) -> HashMap<BridgeActionDigest, BridgeAction> {
104 self.pending_actions.unbounded_iter().collect()
105 }
106
107 pub fn get_iota_event_cursors(
108 &self,
109 identifiers: &[Identifier],
110 ) -> BridgeResult<Vec<Option<EventID>>> {
111 self.iota_syncer_cursors
112 .multi_get(identifiers)
113 .map_err(|e| BridgeError::Storage(format!("Couldn't get iota_syncer_cursors: {:?}", e)))
114 }
115
116 pub fn get_eth_event_cursors(
117 &self,
118 contract_addresses: &[ethers::types::Address],
119 ) -> BridgeResult<Vec<Option<u64>>> {
120 self.eth_syncer_cursors
121 .multi_get(contract_addresses)
122 .map_err(|e| BridgeError::Storage(format!("Couldn't get iota_syncer_cursors: {:?}", e)))
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use std::str::FromStr;
129
130 use iota_types::digests::TransactionDigest;
131
132 use super::*;
133 use crate::test_utils::get_test_iota_to_eth_bridge_action;
134
135 #[tokio::test]
137 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
138 async fn test_bridge_storage_basic() {
139 let temp_dir = tempfile::tempdir().unwrap();
140 let store = BridgeOrchestratorTables::new(temp_dir.path());
141
142 let action1 = get_test_iota_to_eth_bridge_action(
143 None,
144 Some(0),
145 Some(99),
146 Some(10000),
147 None,
148 None,
149 None,
150 );
151
152 let action2 = get_test_iota_to_eth_bridge_action(
153 None,
154 Some(2),
155 Some(100),
156 Some(10000),
157 None,
158 None,
159 None,
160 );
161
162 let actions = store.get_all_pending_actions();
164 assert!(actions.is_empty());
165
166 store.remove_pending_actions(&[action1.digest()]).unwrap();
168
169 store
170 .insert_pending_actions(&vec![action1.clone(), action2.clone()])
171 .unwrap();
172
173 let actions = store.get_all_pending_actions();
174 assert_eq!(
175 actions,
176 HashMap::from_iter(vec![
177 (action1.digest(), action1.clone()),
178 (action2.digest(), action2.clone())
179 ])
180 );
181
182 store.insert_pending_actions(&[action1.clone()]).unwrap();
184 let actions = store.get_all_pending_actions();
185 assert_eq!(
186 actions,
187 HashMap::from_iter(vec![
188 (action1.digest(), action1.clone()),
189 (action2.digest(), action2.clone())
190 ])
191 );
192
193 store.remove_pending_actions(&[action2.digest()]).unwrap();
195 let actions = store.get_all_pending_actions();
196 assert_eq!(
197 actions,
198 HashMap::from_iter(vec![(action1.digest(), action1.clone())])
199 );
200
201 store.remove_pending_actions(&[action1.digest()]).unwrap();
203 let actions = store.get_all_pending_actions();
204 assert!(actions.is_empty());
205
206 let eth_contract_address = ethers::types::Address::random();
208 let eth_block_num = 199999u64;
209 assert!(
210 store
211 .get_eth_event_cursors(&[eth_contract_address])
212 .unwrap()[0]
213 .is_none()
214 );
215 store
216 .update_eth_event_cursor(eth_contract_address, eth_block_num)
217 .unwrap();
218 assert_eq!(
219 store
220 .get_eth_event_cursors(&[eth_contract_address])
221 .unwrap()[0]
222 .unwrap(),
223 eth_block_num
224 );
225
226 let iota_module = Identifier::from_str("test").unwrap();
228 let iota_cursor = EventID {
229 tx_digest: TransactionDigest::random(),
230 event_seq: 1,
231 };
232 assert!(
233 store
234 .get_iota_event_cursors(&[iota_module.clone()])
235 .unwrap()[0]
236 .is_none()
237 );
238 store
239 .update_iota_event_cursor(iota_module.clone(), iota_cursor)
240 .unwrap();
241 assert_eq!(
242 store
243 .get_iota_event_cursors(&[iota_module.clone()])
244 .unwrap()[0]
245 .unwrap(),
246 iota_cursor
247 );
248 }
249}