1use std::{
6 collections::HashMap,
7 net::{IpAddr, Ipv4Addr, SocketAddr},
8 sync::Arc,
9 time::Duration,
10};
11
12use arc_swap::ArcSwap;
13use ethers::types::Address as EthAddress;
14use iota_metrics::spawn_logged_monitored_task;
15use iota_types::{
16 Identifier,
17 bridge::{
18 BRIDGE_COMMITTEE_MODULE_NAME, BRIDGE_LIMITER_MODULE_NAME, BRIDGE_MODULE_NAME,
19 BRIDGE_TREASURY_MODULE_NAME,
20 },
21 event::EventID,
22};
23use tokio::task::JoinHandle;
24use tracing::info;
25
26use crate::{
27 action_executor::BridgeActionExecutor,
28 client::bridge_authority_aggregator::BridgeAuthorityAggregator,
29 config::{BridgeClientConfig, BridgeNodeConfig},
30 eth_syncer::EthSyncer,
31 events::init_all_struct_tags,
32 iota_syncer::IotaSyncer,
33 metrics::BridgeMetrics,
34 monitor::BridgeMonitor,
35 orchestrator::BridgeOrchestrator,
36 server::{BridgeNodePublicMetadata, handler::BridgeRequestHandler, run_server},
37 storage::BridgeOrchestratorTables,
38};
39
40pub async fn run_bridge_node(
41 config: BridgeNodeConfig,
42 metadata: BridgeNodePublicMetadata,
43 prometheus_registry: prometheus::Registry,
44) -> anyhow::Result<JoinHandle<()>> {
45 init_all_struct_tags();
46 let metrics = Arc::new(BridgeMetrics::new(&prometheus_registry));
47 let (server_config, client_config) = config.validate(metrics.clone()).await?;
48
49 let _handles = if let Some(client_config) = client_config {
51 start_client_components(client_config, metrics.clone()).await
52 } else {
53 Ok(vec![])
54 }?;
55
56 let socket_address = SocketAddr::new(
58 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
59 server_config.server_listen_port,
60 );
61 Ok(run_server(
62 &socket_address,
63 BridgeRequestHandler::new(
64 server_config.key,
65 server_config.iota_client,
66 server_config.eth_client,
67 server_config.approved_governance_actions,
68 metrics.clone(),
69 ),
70 metrics,
71 Arc::new(metadata),
72 ))
73}
74
75async fn start_client_components(
77 client_config: BridgeClientConfig,
78 metrics: Arc<BridgeMetrics>,
79) -> anyhow::Result<Vec<JoinHandle<()>>> {
80 let store: std::sync::Arc<BridgeOrchestratorTables> =
81 BridgeOrchestratorTables::new(&client_config.db_path.join("client"));
82 let iota_modules_to_watch = get_iota_modules_to_watch(
83 &store,
84 client_config.iota_bridge_module_last_processed_event_id_override,
85 );
86 let eth_contracts_to_watch = get_eth_contracts_to_watch(
87 &store,
88 &client_config.eth_contracts,
89 client_config.eth_contracts_start_block_fallback,
90 client_config.eth_contracts_start_block_override,
91 );
92
93 let iota_client = client_config.iota_client.clone();
94
95 let mut all_handles = vec![];
96 let (task_handles, eth_events_rx, _) =
97 EthSyncer::new(client_config.eth_client.clone(), eth_contracts_to_watch)
98 .run(metrics.clone())
99 .await
100 .expect("Failed to start eth syncer");
101 all_handles.extend(task_handles);
102
103 let (task_handles, iota_events_rx) =
104 IotaSyncer::new(client_config.iota_client, iota_modules_to_watch)
105 .run(Duration::from_secs(2))
106 .await
107 .expect("Failed to start iota syncer");
108 all_handles.extend(task_handles);
109
110 let committee = Arc::new(
111 iota_client
112 .get_bridge_committee()
113 .await
114 .expect("Failed to get committee"),
115 );
116 let bridge_auth_agg = Arc::new(ArcSwap::from(Arc::new(BridgeAuthorityAggregator::new(
117 committee,
118 ))));
119 let iota_token_type_tags = iota_client.get_token_id_map().await.unwrap();
121 let is_bridge_paused = iota_client.is_bridge_paused().await.unwrap();
122
123 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(is_bridge_paused);
124
125 let (monitor_tx, monitor_rx) = iota_metrics::metered_channel::channel(
126 10000,
127 &iota_metrics::get_metrics()
128 .unwrap()
129 .channel_inflight
130 .with_label_values(&["monitor_queue"]),
131 );
132 let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(iota_token_type_tags)));
133 let bridge_action_executor = BridgeActionExecutor::new(
134 iota_client.clone(),
135 bridge_auth_agg.clone(),
136 store.clone(),
137 client_config.key,
138 client_config.iota_address,
139 client_config.gas_object_ref.0,
140 iota_token_type_tags.clone(),
141 bridge_pause_rx,
142 metrics.clone(),
143 )
144 .await;
145
146 let monitor = BridgeMonitor::new(
147 iota_client.clone(),
148 monitor_rx,
149 bridge_auth_agg.clone(),
150 bridge_pause_tx,
151 iota_token_type_tags,
152 );
153 all_handles.push(spawn_logged_monitored_task!(monitor.run()));
154
155 let orchestrator = BridgeOrchestrator::new(
156 iota_client,
157 iota_events_rx,
158 eth_events_rx,
159 store.clone(),
160 monitor_tx,
161 metrics,
162 );
163
164 all_handles.extend(orchestrator.run(bridge_action_executor).await);
165 Ok(all_handles)
166}
167
168fn get_iota_modules_to_watch(
169 store: &std::sync::Arc<BridgeOrchestratorTables>,
170 iota_bridge_module_last_processed_event_id_override: Option<EventID>,
171) -> HashMap<Identifier, Option<EventID>> {
172 let iota_bridge_modules = vec![
173 BRIDGE_MODULE_NAME.to_owned(),
174 BRIDGE_COMMITTEE_MODULE_NAME.to_owned(),
175 BRIDGE_TREASURY_MODULE_NAME.to_owned(),
176 BRIDGE_LIMITER_MODULE_NAME.to_owned(),
177 ];
178 if let Some(cursor) = iota_bridge_module_last_processed_event_id_override {
179 info!("Overriding cursor for iota bridge modules to {:?}", cursor);
180 return HashMap::from_iter(
181 iota_bridge_modules
182 .iter()
183 .map(|module| (module.clone(), Some(cursor))),
184 );
185 }
186
187 let iota_bridge_module_stored_cursor = store
188 .get_iota_event_cursors(&iota_bridge_modules)
189 .expect("Failed to get eth iota event cursors from storage");
190 let mut iota_modules_to_watch = HashMap::new();
191 for (module_identifier, cursor) in iota_bridge_modules
192 .iter()
193 .zip(iota_bridge_module_stored_cursor)
194 {
195 if cursor.is_none() {
196 info!(
197 "No cursor found for iota bridge module {} in storage or config override, query start from the beginning.",
198 module_identifier
199 );
200 }
201 iota_modules_to_watch.insert(module_identifier.clone(), cursor);
202 }
203 iota_modules_to_watch
204}
205
206fn get_eth_contracts_to_watch(
207 store: &std::sync::Arc<BridgeOrchestratorTables>,
208 eth_contracts: &[EthAddress],
209 eth_contracts_start_block_fallback: u64,
210 eth_contracts_start_block_override: Option<u64>,
211) -> HashMap<EthAddress, u64> {
212 let stored_eth_cursors = store
213 .get_eth_event_cursors(eth_contracts)
214 .expect("Failed to get eth event cursors from storage");
215 let mut eth_contracts_to_watch = HashMap::new();
216 for (contract, stored_cursor) in eth_contracts.iter().zip(stored_eth_cursors) {
217 match (eth_contracts_start_block_override, stored_cursor) {
221 (Some(override_), _) => {
222 eth_contracts_to_watch.insert(*contract, override_);
223 info!(
224 "Overriding cursor for eth bridge contract {} to {}. Stored cursor: {:?}",
225 contract, override_, stored_cursor
226 );
227 }
228 (None, Some(stored_cursor)) => {
229 eth_contracts_to_watch.insert(*contract, stored_cursor + 1);
232 }
233 (None, None) => {
234 eth_contracts_to_watch.insert(*contract, eth_contracts_start_block_fallback);
236 }
237 }
238 }
239 eth_contracts_to_watch
240}
241
242#[cfg(test)]
243mod tests {
244 use ethers::types::Address as EthAddress;
245 use fastcrypto::secp256k1::Secp256k1KeyPair;
246 use iota_config::local_ip_utils::get_available_port;
247 use iota_types::{
248 base_types::IotaAddress,
249 bridge::BridgeChainId,
250 crypto::{EncodeDecodeBase64, IotaKeyPair, KeypairTraits, get_key_pair},
251 digests::TransactionDigest,
252 event::EventID,
253 };
254 use prometheus::Registry;
255 use tempfile::tempdir;
256
257 use super::*;
258 use crate::{
259 config::{BridgeNodeConfig, EthConfig, IotaConfig},
260 e2e_tests::test_utils::{BridgeTestCluster, BridgeTestClusterBuilder},
261 utils::wait_for_server_to_be_up,
262 };
263
264 #[tokio::test]
265 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
266 async fn test_get_eth_contracts_to_watch() {
267 telemetry_subscribers::init_for_testing();
268 let temp_dir = tempfile::tempdir().unwrap();
269 let eth_contracts = vec![
270 EthAddress::from_low_u64_be(1),
271 EthAddress::from_low_u64_be(2),
272 ];
273 let store = BridgeOrchestratorTables::new(temp_dir.path());
274
275 let contracts = get_eth_contracts_to_watch(&store, ð_contracts, 10, None);
277 assert_eq!(
278 contracts,
279 vec![(eth_contracts[0], 10), (eth_contracts[1], 10)]
280 .into_iter()
281 .collect::<HashMap<_, _>>()
282 );
283
284 let contracts = get_eth_contracts_to_watch(&store, ð_contracts, 10, Some(420));
286 assert_eq!(
287 contracts,
288 vec![(eth_contracts[0], 420), (eth_contracts[1], 420)]
289 .into_iter()
290 .collect::<HashMap<_, _>>()
291 );
292
293 store
294 .update_eth_event_cursor(eth_contracts[0], 100)
295 .unwrap();
296 store
297 .update_eth_event_cursor(eth_contracts[1], 102)
298 .unwrap();
299
300 let contracts = get_eth_contracts_to_watch(&store, ð_contracts, 10, None);
302 assert_eq!(
303 contracts,
304 vec![(eth_contracts[0], 101), (eth_contracts[1], 103)]
305 .into_iter()
306 .collect::<HashMap<_, _>>()
307 );
308
309 let contracts = get_eth_contracts_to_watch(&store, ð_contracts, 10, Some(200));
311 assert_eq!(
312 contracts,
313 vec![(eth_contracts[0], 200), (eth_contracts[1], 200)]
314 .into_iter()
315 .collect::<HashMap<_, _>>()
316 );
317 }
318
319 #[tokio::test]
320 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
321 async fn test_get_iota_modules_to_watch() {
322 telemetry_subscribers::init_for_testing();
323 let temp_dir = tempfile::tempdir().unwrap();
324
325 let store = BridgeOrchestratorTables::new(temp_dir.path());
326 let bridge_module = BRIDGE_MODULE_NAME.to_owned();
327 let committee_module = BRIDGE_COMMITTEE_MODULE_NAME.to_owned();
328 let treasury_module = BRIDGE_TREASURY_MODULE_NAME.to_owned();
329 let limiter_module = BRIDGE_LIMITER_MODULE_NAME.to_owned();
330 let iota_modules_to_watch = get_iota_modules_to_watch(&store, None);
332 assert_eq!(
333 iota_modules_to_watch,
334 vec![
335 (bridge_module.clone(), None),
336 (committee_module.clone(), None),
337 (treasury_module.clone(), None),
338 (limiter_module.clone(), None)
339 ]
340 .into_iter()
341 .collect::<HashMap<_, _>>()
342 );
343
344 let override_cursor = EventID {
346 tx_digest: TransactionDigest::random(),
347 event_seq: 42,
348 };
349 let iota_modules_to_watch = get_iota_modules_to_watch(&store, Some(override_cursor));
350 assert_eq!(
351 iota_modules_to_watch,
352 vec![
353 (bridge_module.clone(), Some(override_cursor)),
354 (committee_module.clone(), Some(override_cursor)),
355 (treasury_module.clone(), Some(override_cursor)),
356 (limiter_module.clone(), Some(override_cursor))
357 ]
358 .into_iter()
359 .collect::<HashMap<_, _>>()
360 );
361
362 let stored_cursor = EventID {
365 tx_digest: TransactionDigest::random(),
366 event_seq: 100,
367 };
368 store
369 .update_iota_event_cursor(bridge_module.clone(), stored_cursor)
370 .unwrap();
371 let iota_modules_to_watch = get_iota_modules_to_watch(&store, None);
372 assert_eq!(
373 iota_modules_to_watch,
374 vec![
375 (bridge_module.clone(), Some(stored_cursor)),
376 (committee_module.clone(), None),
377 (treasury_module.clone(), None),
378 (limiter_module.clone(), None)
379 ]
380 .into_iter()
381 .collect::<HashMap<_, _>>()
382 );
383
384 let stored_cursor = EventID {
386 tx_digest: TransactionDigest::random(),
387 event_seq: 100,
388 };
389 store
390 .update_iota_event_cursor(committee_module.clone(), stored_cursor)
391 .unwrap();
392 let iota_modules_to_watch = get_iota_modules_to_watch(&store, Some(override_cursor));
393 assert_eq!(
394 iota_modules_to_watch,
395 vec![
396 (bridge_module.clone(), Some(override_cursor)),
397 (committee_module.clone(), Some(override_cursor)),
398 (treasury_module.clone(), Some(override_cursor)),
399 (limiter_module.clone(), Some(override_cursor))
400 ]
401 .into_iter()
402 .collect::<HashMap<_, _>>()
403 );
404 }
405
406 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
407 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
408 async fn test_starting_bridge_node() {
409 telemetry_subscribers::init_for_testing();
410 let bridge_test_cluster = setup().await;
411 let kp = bridge_test_cluster.bridge_authority_key(0);
412
413 let tmp_dir = tempdir().unwrap().into_path();
415 let authority_key_path = "test_starting_bridge_node_bridge_authority_key";
416 let server_listen_port = get_available_port("127.0.0.1");
417 let base64_encoded = kp.encode_base64();
418 std::fs::write(tmp_dir.join(authority_key_path), base64_encoded).unwrap();
419
420 let config = BridgeNodeConfig {
421 server_listen_port,
422 metrics_port: get_available_port("127.0.0.1"),
423 bridge_authority_key_path: tmp_dir.join(authority_key_path),
424 iota: IotaConfig {
425 iota_rpc_url: bridge_test_cluster.iota_rpc_url(),
426 iota_bridge_chain_id: BridgeChainId::IotaCustom as u8,
427 bridge_client_key_path: None,
428 bridge_client_gas_object: None,
429 iota_bridge_module_last_processed_event_id_override: None,
430 },
431 eth: EthConfig {
432 eth_rpc_url: bridge_test_cluster.eth_rpc_url(),
433 eth_bridge_proxy_address: bridge_test_cluster.iota_bridge_address(),
434 eth_bridge_chain_id: BridgeChainId::EthCustom as u8,
435 eth_contracts_start_block_fallback: None,
436 eth_contracts_start_block_override: None,
437 },
438 approved_governance_actions: vec![],
439 run_client: false,
440 db_path: None,
441 };
442 let _handle = run_bridge_node(
444 config,
445 BridgeNodePublicMetadata::empty_for_testing(),
446 Registry::new(),
447 )
448 .await
449 .unwrap();
450
451 let server_url = format!("http://127.0.0.1:{}", server_listen_port);
452 let res = wait_for_server_to_be_up(server_url, 5).await;
454 res.unwrap();
455 }
456
457 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
458 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
459 async fn test_starting_bridge_node_with_client() {
460 telemetry_subscribers::init_for_testing();
461 let bridge_test_cluster = setup().await;
462 let kp = bridge_test_cluster.bridge_authority_key(0);
463
464 let tmp_dir = tempdir().unwrap().into_path();
466 let db_path = tmp_dir.join("test_starting_bridge_node_with_client_db");
467 let authority_key_path = "test_starting_bridge_node_with_client_bridge_authority_key";
468 let server_listen_port = get_available_port("127.0.0.1");
469
470 let base64_encoded = kp.encode_base64();
471 std::fs::write(tmp_dir.join(authority_key_path), base64_encoded).unwrap();
472
473 let client_iota_address = IotaAddress::from(kp.public());
474 let sender_address = bridge_test_cluster.iota_user_address();
475 bridge_test_cluster
477 .test_cluster
478 .transfer_iota_must_exceed(sender_address, client_iota_address, 1000000000)
479 .await;
480
481 let config = BridgeNodeConfig {
482 server_listen_port,
483 metrics_port: get_available_port("127.0.0.1"),
484 bridge_authority_key_path: tmp_dir.join(authority_key_path),
485 iota: IotaConfig {
486 iota_rpc_url: bridge_test_cluster.iota_rpc_url(),
487 iota_bridge_chain_id: BridgeChainId::IotaCustom as u8,
488 bridge_client_key_path: None,
489 bridge_client_gas_object: None,
490 iota_bridge_module_last_processed_event_id_override: Some(EventID {
491 tx_digest: TransactionDigest::random(),
492 event_seq: 0,
493 }),
494 },
495 eth: EthConfig {
496 eth_rpc_url: bridge_test_cluster.eth_rpc_url(),
497 eth_bridge_proxy_address: bridge_test_cluster.iota_bridge_address(),
498 eth_bridge_chain_id: BridgeChainId::EthCustom as u8,
499 eth_contracts_start_block_fallback: Some(0),
500 eth_contracts_start_block_override: None,
501 },
502 approved_governance_actions: vec![],
503 run_client: true,
504 db_path: Some(db_path),
505 };
506 let _handle = run_bridge_node(
508 config,
509 BridgeNodePublicMetadata::empty_for_testing(),
510 Registry::new(),
511 )
512 .await
513 .unwrap();
514
515 let server_url = format!("http://127.0.0.1:{}", server_listen_port);
516 let res = wait_for_server_to_be_up(server_url, 5).await;
520 res.unwrap();
521 }
522
523 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
524 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
525 async fn test_starting_bridge_node_with_client_and_separate_client_key() {
526 telemetry_subscribers::init_for_testing();
527 let bridge_test_cluster = setup().await;
528 let kp = bridge_test_cluster.bridge_authority_key(0);
529
530 let tmp_dir = tempdir().unwrap().into_path();
532 let db_path =
533 tmp_dir.join("test_starting_bridge_node_with_client_and_separate_client_key_db");
534 let authority_key_path =
535 "test_starting_bridge_node_with_client_and_separate_client_key_bridge_authority_key";
536 let server_listen_port = get_available_port("127.0.0.1");
537
538 let base64_encoded = kp.encode_base64();
540 std::fs::write(tmp_dir.join(authority_key_path), base64_encoded).unwrap();
541
542 let (_, kp): (_, Secp256k1KeyPair) = get_key_pair();
544 let kp = IotaKeyPair::from(kp);
545 let client_key_path =
546 "test_starting_bridge_node_with_client_and_separate_client_key_bridge_client_key";
547 std::fs::write(tmp_dir.join(client_key_path), kp.encode_base64()).unwrap();
548 let client_iota_address = IotaAddress::from(&kp.public());
549 let sender_address = bridge_test_cluster.iota_user_address();
550 let gas_obj = bridge_test_cluster
552 .test_cluster
553 .transfer_iota_must_exceed(sender_address, client_iota_address, 1000000000)
554 .await;
555
556 let config = BridgeNodeConfig {
557 server_listen_port,
558 metrics_port: get_available_port("127.0.0.1"),
559 bridge_authority_key_path: tmp_dir.join(authority_key_path),
560 iota: IotaConfig {
561 iota_rpc_url: bridge_test_cluster.iota_rpc_url(),
562 iota_bridge_chain_id: BridgeChainId::IotaCustom as u8,
563 bridge_client_key_path: Some(tmp_dir.join(client_key_path)),
564 bridge_client_gas_object: Some(gas_obj),
565 iota_bridge_module_last_processed_event_id_override: Some(EventID {
566 tx_digest: TransactionDigest::random(),
567 event_seq: 0,
568 }),
569 },
570 eth: EthConfig {
571 eth_rpc_url: bridge_test_cluster.eth_rpc_url(),
572 eth_bridge_proxy_address: bridge_test_cluster.iota_bridge_address(),
573 eth_bridge_chain_id: BridgeChainId::EthCustom as u8,
574 eth_contracts_start_block_fallback: Some(0),
575 eth_contracts_start_block_override: Some(0),
576 },
577 approved_governance_actions: vec![],
578 run_client: true,
579 db_path: Some(db_path),
580 };
581 let _handle = run_bridge_node(
583 config,
584 BridgeNodePublicMetadata::empty_for_testing(),
585 Registry::new(),
586 )
587 .await
588 .unwrap();
589
590 let server_url = format!("http://127.0.0.1:{}", server_listen_port);
591 let res = wait_for_server_to_be_up(server_url, 5).await;
595 res.unwrap();
596 }
597
598 async fn setup() -> BridgeTestCluster {
599 BridgeTestClusterBuilder::new()
600 .with_eth_env(true)
601 .with_bridge_cluster(false)
602 .with_num_validators(2)
603 .build()
604 .await
605 }
606}