iota_bridge/
node.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    net::{IpAddr, Ipv4Addr, SocketAddr},
8    sync::Arc,
9    time::Duration,
10};
11
12use arc_swap::ArcSwap;
13use ethers::types::Address as EthAddress;
14use iota_metrics::spawn_logged_monitored_task;
15use iota_types::{
16    Identifier,
17    bridge::{
18        BRIDGE_COMMITTEE_MODULE_NAME, BRIDGE_LIMITER_MODULE_NAME, BRIDGE_MODULE_NAME,
19        BRIDGE_TREASURY_MODULE_NAME,
20    },
21    event::EventID,
22};
23use tokio::task::JoinHandle;
24use tracing::info;
25
26use crate::{
27    action_executor::BridgeActionExecutor,
28    client::bridge_authority_aggregator::BridgeAuthorityAggregator,
29    config::{BridgeClientConfig, BridgeNodeConfig},
30    eth_syncer::EthSyncer,
31    events::init_all_struct_tags,
32    iota_syncer::IotaSyncer,
33    metrics::BridgeMetrics,
34    monitor::BridgeMonitor,
35    orchestrator::BridgeOrchestrator,
36    server::{BridgeNodePublicMetadata, handler::BridgeRequestHandler, run_server},
37    storage::BridgeOrchestratorTables,
38};
39
40pub async fn run_bridge_node(
41    config: BridgeNodeConfig,
42    metadata: BridgeNodePublicMetadata,
43    prometheus_registry: prometheus::Registry,
44) -> anyhow::Result<JoinHandle<()>> {
45    init_all_struct_tags();
46    let metrics = Arc::new(BridgeMetrics::new(&prometheus_registry));
47    let (server_config, client_config) = config.validate(metrics.clone()).await?;
48
49    // Start Client
50    let _handles = if let Some(client_config) = client_config {
51        start_client_components(client_config, metrics.clone()).await
52    } else {
53        Ok(vec![])
54    }?;
55
56    // Start Server
57    let socket_address = SocketAddr::new(
58        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
59        server_config.server_listen_port,
60    );
61    Ok(run_server(
62        &socket_address,
63        BridgeRequestHandler::new(
64            server_config.key,
65            server_config.iota_client,
66            server_config.eth_client,
67            server_config.approved_governance_actions,
68            metrics.clone(),
69        ),
70        metrics,
71        Arc::new(metadata),
72    ))
73}
74
75// TODO: is there a way to clean up the overrides after it's stored in DB?
76async fn start_client_components(
77    client_config: BridgeClientConfig,
78    metrics: Arc<BridgeMetrics>,
79) -> anyhow::Result<Vec<JoinHandle<()>>> {
80    let store: std::sync::Arc<BridgeOrchestratorTables> =
81        BridgeOrchestratorTables::new(&client_config.db_path.join("client"));
82    let iota_modules_to_watch = get_iota_modules_to_watch(
83        &store,
84        client_config.iota_bridge_module_last_processed_event_id_override,
85    );
86    let eth_contracts_to_watch = get_eth_contracts_to_watch(
87        &store,
88        &client_config.eth_contracts,
89        client_config.eth_contracts_start_block_fallback,
90        client_config.eth_contracts_start_block_override,
91    );
92
93    let iota_client = client_config.iota_client.clone();
94
95    let mut all_handles = vec![];
96    let (task_handles, eth_events_rx, _) =
97        EthSyncer::new(client_config.eth_client.clone(), eth_contracts_to_watch)
98            .run(metrics.clone())
99            .await
100            .expect("Failed to start eth syncer");
101    all_handles.extend(task_handles);
102
103    let (task_handles, iota_events_rx) =
104        IotaSyncer::new(client_config.iota_client, iota_modules_to_watch)
105            .run(Duration::from_secs(2))
106            .await
107            .expect("Failed to start iota syncer");
108    all_handles.extend(task_handles);
109
110    let committee = Arc::new(
111        iota_client
112            .get_bridge_committee()
113            .await
114            .expect("Failed to get committee"),
115    );
116    let bridge_auth_agg = Arc::new(ArcSwap::from(Arc::new(BridgeAuthorityAggregator::new(
117        committee,
118    ))));
119    // TODO: should we use one query instead of two?
120    let iota_token_type_tags = iota_client.get_token_id_map().await.unwrap();
121    let is_bridge_paused = iota_client.is_bridge_paused().await.unwrap();
122
123    let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(is_bridge_paused);
124
125    let (monitor_tx, monitor_rx) = iota_metrics::metered_channel::channel(
126        10000,
127        &iota_metrics::get_metrics()
128            .unwrap()
129            .channel_inflight
130            .with_label_values(&["monitor_queue"]),
131    );
132    let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(iota_token_type_tags)));
133    let bridge_action_executor = BridgeActionExecutor::new(
134        iota_client.clone(),
135        bridge_auth_agg.clone(),
136        store.clone(),
137        client_config.key,
138        client_config.iota_address,
139        client_config.gas_object_ref.0,
140        iota_token_type_tags.clone(),
141        bridge_pause_rx,
142        metrics.clone(),
143    )
144    .await;
145
146    let monitor = BridgeMonitor::new(
147        iota_client.clone(),
148        monitor_rx,
149        bridge_auth_agg.clone(),
150        bridge_pause_tx,
151        iota_token_type_tags,
152    );
153    all_handles.push(spawn_logged_monitored_task!(monitor.run()));
154
155    let orchestrator = BridgeOrchestrator::new(
156        iota_client,
157        iota_events_rx,
158        eth_events_rx,
159        store.clone(),
160        monitor_tx,
161        metrics,
162    );
163
164    all_handles.extend(orchestrator.run(bridge_action_executor).await);
165    Ok(all_handles)
166}
167
168fn get_iota_modules_to_watch(
169    store: &std::sync::Arc<BridgeOrchestratorTables>,
170    iota_bridge_module_last_processed_event_id_override: Option<EventID>,
171) -> HashMap<Identifier, Option<EventID>> {
172    let iota_bridge_modules = vec![
173        BRIDGE_MODULE_NAME.to_owned(),
174        BRIDGE_COMMITTEE_MODULE_NAME.to_owned(),
175        BRIDGE_TREASURY_MODULE_NAME.to_owned(),
176        BRIDGE_LIMITER_MODULE_NAME.to_owned(),
177    ];
178    if let Some(cursor) = iota_bridge_module_last_processed_event_id_override {
179        info!("Overriding cursor for iota bridge modules to {:?}", cursor);
180        return HashMap::from_iter(
181            iota_bridge_modules
182                .iter()
183                .map(|module| (module.clone(), Some(cursor))),
184        );
185    }
186
187    let iota_bridge_module_stored_cursor = store
188        .get_iota_event_cursors(&iota_bridge_modules)
189        .expect("Failed to get eth iota event cursors from storage");
190    let mut iota_modules_to_watch = HashMap::new();
191    for (module_identifier, cursor) in iota_bridge_modules
192        .iter()
193        .zip(iota_bridge_module_stored_cursor)
194    {
195        if cursor.is_none() {
196            info!(
197                "No cursor found for iota bridge module {} in storage or config override, query start from the beginning.",
198                module_identifier
199            );
200        }
201        iota_modules_to_watch.insert(module_identifier.clone(), cursor);
202    }
203    iota_modules_to_watch
204}
205
206fn get_eth_contracts_to_watch(
207    store: &std::sync::Arc<BridgeOrchestratorTables>,
208    eth_contracts: &[EthAddress],
209    eth_contracts_start_block_fallback: u64,
210    eth_contracts_start_block_override: Option<u64>,
211) -> HashMap<EthAddress, u64> {
212    let stored_eth_cursors = store
213        .get_eth_event_cursors(eth_contracts)
214        .expect("Failed to get eth event cursors from storage");
215    let mut eth_contracts_to_watch = HashMap::new();
216    for (contract, stored_cursor) in eth_contracts.iter().zip(stored_eth_cursors) {
217        // start block precedence:
218        // eth_contracts_start_block_override > stored cursor >
219        // eth_contracts_start_block_fallback
220        match (eth_contracts_start_block_override, stored_cursor) {
221            (Some(override_), _) => {
222                eth_contracts_to_watch.insert(*contract, override_);
223                info!(
224                    "Overriding cursor for eth bridge contract {} to {}. Stored cursor: {:?}",
225                    contract, override_, stored_cursor
226                );
227            }
228            (None, Some(stored_cursor)) => {
229                // +1: The stored value is the last block that was processed, so we start from
230                // the next block.
231                eth_contracts_to_watch.insert(*contract, stored_cursor + 1);
232            }
233            (None, None) => {
234                // If no cursor is found, start from the fallback block.
235                eth_contracts_to_watch.insert(*contract, eth_contracts_start_block_fallback);
236            }
237        }
238    }
239    eth_contracts_to_watch
240}
241
242#[cfg(test)]
243mod tests {
244    use ethers::types::Address as EthAddress;
245    use fastcrypto::secp256k1::Secp256k1KeyPair;
246    use iota_config::local_ip_utils::get_available_port;
247    use iota_types::{
248        base_types::IotaAddress,
249        bridge::BridgeChainId,
250        crypto::{EncodeDecodeBase64, IotaKeyPair, KeypairTraits, get_key_pair},
251        digests::TransactionDigest,
252        event::EventID,
253    };
254    use prometheus::Registry;
255    use tempfile::tempdir;
256
257    use super::*;
258    use crate::{
259        config::{BridgeNodeConfig, EthConfig, IotaConfig},
260        e2e_tests::test_utils::{BridgeTestCluster, BridgeTestClusterBuilder},
261        utils::wait_for_server_to_be_up,
262    };
263
264    #[tokio::test]
265    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
266    async fn test_get_eth_contracts_to_watch() {
267        telemetry_subscribers::init_for_testing();
268        let temp_dir = tempfile::tempdir().unwrap();
269        let eth_contracts = vec![
270            EthAddress::from_low_u64_be(1),
271            EthAddress::from_low_u64_be(2),
272        ];
273        let store = BridgeOrchestratorTables::new(temp_dir.path());
274
275        // No override, no watermark found in DB, use fallback
276        let contracts = get_eth_contracts_to_watch(&store, &eth_contracts, 10, None);
277        assert_eq!(
278            contracts,
279            vec![(eth_contracts[0], 10), (eth_contracts[1], 10)]
280                .into_iter()
281                .collect::<HashMap<_, _>>()
282        );
283
284        // no watermark found in DB, use override
285        let contracts = get_eth_contracts_to_watch(&store, &eth_contracts, 10, Some(420));
286        assert_eq!(
287            contracts,
288            vec![(eth_contracts[0], 420), (eth_contracts[1], 420)]
289                .into_iter()
290                .collect::<HashMap<_, _>>()
291        );
292
293        store
294            .update_eth_event_cursor(eth_contracts[0], 100)
295            .unwrap();
296        store
297            .update_eth_event_cursor(eth_contracts[1], 102)
298            .unwrap();
299
300        // No override, found watermarks in DB, use +1
301        let contracts = get_eth_contracts_to_watch(&store, &eth_contracts, 10, None);
302        assert_eq!(
303            contracts,
304            vec![(eth_contracts[0], 101), (eth_contracts[1], 103)]
305                .into_iter()
306                .collect::<HashMap<_, _>>()
307        );
308
309        // use override
310        let contracts = get_eth_contracts_to_watch(&store, &eth_contracts, 10, Some(200));
311        assert_eq!(
312            contracts,
313            vec![(eth_contracts[0], 200), (eth_contracts[1], 200)]
314                .into_iter()
315                .collect::<HashMap<_, _>>()
316        );
317    }
318
319    #[tokio::test]
320    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
321    async fn test_get_iota_modules_to_watch() {
322        telemetry_subscribers::init_for_testing();
323        let temp_dir = tempfile::tempdir().unwrap();
324
325        let store = BridgeOrchestratorTables::new(temp_dir.path());
326        let bridge_module = BRIDGE_MODULE_NAME.to_owned();
327        let committee_module = BRIDGE_COMMITTEE_MODULE_NAME.to_owned();
328        let treasury_module = BRIDGE_TREASURY_MODULE_NAME.to_owned();
329        let limiter_module = BRIDGE_LIMITER_MODULE_NAME.to_owned();
330        // No override, no stored watermark, use None
331        let iota_modules_to_watch = get_iota_modules_to_watch(&store, None);
332        assert_eq!(
333            iota_modules_to_watch,
334            vec![
335                (bridge_module.clone(), None),
336                (committee_module.clone(), None),
337                (treasury_module.clone(), None),
338                (limiter_module.clone(), None)
339            ]
340            .into_iter()
341            .collect::<HashMap<_, _>>()
342        );
343
344        // no stored watermark, use override
345        let override_cursor = EventID {
346            tx_digest: TransactionDigest::random(),
347            event_seq: 42,
348        };
349        let iota_modules_to_watch = get_iota_modules_to_watch(&store, Some(override_cursor));
350        assert_eq!(
351            iota_modules_to_watch,
352            vec![
353                (bridge_module.clone(), Some(override_cursor)),
354                (committee_module.clone(), Some(override_cursor)),
355                (treasury_module.clone(), Some(override_cursor)),
356                (limiter_module.clone(), Some(override_cursor))
357            ]
358            .into_iter()
359            .collect::<HashMap<_, _>>()
360        );
361
362        // No override, found stored watermark for `bridge` module, use stored watermark
363        // for `bridge` and None for `committee`
364        let stored_cursor = EventID {
365            tx_digest: TransactionDigest::random(),
366            event_seq: 100,
367        };
368        store
369            .update_iota_event_cursor(bridge_module.clone(), stored_cursor)
370            .unwrap();
371        let iota_modules_to_watch = get_iota_modules_to_watch(&store, None);
372        assert_eq!(
373            iota_modules_to_watch,
374            vec![
375                (bridge_module.clone(), Some(stored_cursor)),
376                (committee_module.clone(), None),
377                (treasury_module.clone(), None),
378                (limiter_module.clone(), None)
379            ]
380            .into_iter()
381            .collect::<HashMap<_, _>>()
382        );
383
384        // found stored watermark, use override
385        let stored_cursor = EventID {
386            tx_digest: TransactionDigest::random(),
387            event_seq: 100,
388        };
389        store
390            .update_iota_event_cursor(committee_module.clone(), stored_cursor)
391            .unwrap();
392        let iota_modules_to_watch = get_iota_modules_to_watch(&store, Some(override_cursor));
393        assert_eq!(
394            iota_modules_to_watch,
395            vec![
396                (bridge_module.clone(), Some(override_cursor)),
397                (committee_module.clone(), Some(override_cursor)),
398                (treasury_module.clone(), Some(override_cursor)),
399                (limiter_module.clone(), Some(override_cursor))
400            ]
401            .into_iter()
402            .collect::<HashMap<_, _>>()
403        );
404    }
405
406    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
407    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
408    async fn test_starting_bridge_node() {
409        telemetry_subscribers::init_for_testing();
410        let bridge_test_cluster = setup().await;
411        let kp = bridge_test_cluster.bridge_authority_key(0);
412
413        // prepare node config (server only)
414        let tmp_dir = tempdir().unwrap().into_path();
415        let authority_key_path = "test_starting_bridge_node_bridge_authority_key";
416        let server_listen_port = get_available_port("127.0.0.1");
417        let base64_encoded = kp.encode_base64();
418        std::fs::write(tmp_dir.join(authority_key_path), base64_encoded).unwrap();
419
420        let config = BridgeNodeConfig {
421            server_listen_port,
422            metrics_port: get_available_port("127.0.0.1"),
423            bridge_authority_key_path: tmp_dir.join(authority_key_path),
424            iota: IotaConfig {
425                iota_rpc_url: bridge_test_cluster.iota_rpc_url(),
426                iota_bridge_chain_id: BridgeChainId::IotaCustom as u8,
427                bridge_client_key_path: None,
428                bridge_client_gas_object: None,
429                iota_bridge_module_last_processed_event_id_override: None,
430            },
431            eth: EthConfig {
432                eth_rpc_url: bridge_test_cluster.eth_rpc_url(),
433                eth_bridge_proxy_address: bridge_test_cluster.iota_bridge_address(),
434                eth_bridge_chain_id: BridgeChainId::EthCustom as u8,
435                eth_contracts_start_block_fallback: None,
436                eth_contracts_start_block_override: None,
437            },
438            approved_governance_actions: vec![],
439            run_client: false,
440            db_path: None,
441        };
442        // Spawn bridge node in memory
443        let _handle = run_bridge_node(
444            config,
445            BridgeNodePublicMetadata::empty_for_testing(),
446            Registry::new(),
447        )
448        .await
449        .unwrap();
450
451        let server_url = format!("http://127.0.0.1:{}", server_listen_port);
452        // Now we expect to see the server to be up and running.
453        let res = wait_for_server_to_be_up(server_url, 5).await;
454        res.unwrap();
455    }
456
457    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
458    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
459    async fn test_starting_bridge_node_with_client() {
460        telemetry_subscribers::init_for_testing();
461        let bridge_test_cluster = setup().await;
462        let kp = bridge_test_cluster.bridge_authority_key(0);
463
464        // prepare node config (server + client)
465        let tmp_dir = tempdir().unwrap().into_path();
466        let db_path = tmp_dir.join("test_starting_bridge_node_with_client_db");
467        let authority_key_path = "test_starting_bridge_node_with_client_bridge_authority_key";
468        let server_listen_port = get_available_port("127.0.0.1");
469
470        let base64_encoded = kp.encode_base64();
471        std::fs::write(tmp_dir.join(authority_key_path), base64_encoded).unwrap();
472
473        let client_iota_address = IotaAddress::from(kp.public());
474        let sender_address = bridge_test_cluster.iota_user_address();
475        // send some gas to this address
476        bridge_test_cluster
477            .test_cluster
478            .transfer_iota_must_exceed(sender_address, client_iota_address, 1000000000)
479            .await;
480
481        let config = BridgeNodeConfig {
482            server_listen_port,
483            metrics_port: get_available_port("127.0.0.1"),
484            bridge_authority_key_path: tmp_dir.join(authority_key_path),
485            iota: IotaConfig {
486                iota_rpc_url: bridge_test_cluster.iota_rpc_url(),
487                iota_bridge_chain_id: BridgeChainId::IotaCustom as u8,
488                bridge_client_key_path: None,
489                bridge_client_gas_object: None,
490                iota_bridge_module_last_processed_event_id_override: Some(EventID {
491                    tx_digest: TransactionDigest::random(),
492                    event_seq: 0,
493                }),
494            },
495            eth: EthConfig {
496                eth_rpc_url: bridge_test_cluster.eth_rpc_url(),
497                eth_bridge_proxy_address: bridge_test_cluster.iota_bridge_address(),
498                eth_bridge_chain_id: BridgeChainId::EthCustom as u8,
499                eth_contracts_start_block_fallback: Some(0),
500                eth_contracts_start_block_override: None,
501            },
502            approved_governance_actions: vec![],
503            run_client: true,
504            db_path: Some(db_path),
505        };
506        // Spawn bridge node in memory
507        let _handle = run_bridge_node(
508            config,
509            BridgeNodePublicMetadata::empty_for_testing(),
510            Registry::new(),
511        )
512        .await
513        .unwrap();
514
515        let server_url = format!("http://127.0.0.1:{}", server_listen_port);
516        // Now we expect to see the server to be up and running.
517        // client components are spawned earlier than server, so as long as the server
518        // is up, we know the client components are already running.
519        let res = wait_for_server_to_be_up(server_url, 5).await;
520        res.unwrap();
521    }
522
523    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
524    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
525    async fn test_starting_bridge_node_with_client_and_separate_client_key() {
526        telemetry_subscribers::init_for_testing();
527        let bridge_test_cluster = setup().await;
528        let kp = bridge_test_cluster.bridge_authority_key(0);
529
530        // prepare node config (server + client)
531        let tmp_dir = tempdir().unwrap().into_path();
532        let db_path =
533            tmp_dir.join("test_starting_bridge_node_with_client_and_separate_client_key_db");
534        let authority_key_path =
535            "test_starting_bridge_node_with_client_and_separate_client_key_bridge_authority_key";
536        let server_listen_port = get_available_port("127.0.0.1");
537
538        // prepare bridge authority key
539        let base64_encoded = kp.encode_base64();
540        std::fs::write(tmp_dir.join(authority_key_path), base64_encoded).unwrap();
541
542        // prepare bridge client key
543        let (_, kp): (_, Secp256k1KeyPair) = get_key_pair();
544        let kp = IotaKeyPair::from(kp);
545        let client_key_path =
546            "test_starting_bridge_node_with_client_and_separate_client_key_bridge_client_key";
547        std::fs::write(tmp_dir.join(client_key_path), kp.encode_base64()).unwrap();
548        let client_iota_address = IotaAddress::from(&kp.public());
549        let sender_address = bridge_test_cluster.iota_user_address();
550        // send some gas to this address
551        let gas_obj = bridge_test_cluster
552            .test_cluster
553            .transfer_iota_must_exceed(sender_address, client_iota_address, 1000000000)
554            .await;
555
556        let config = BridgeNodeConfig {
557            server_listen_port,
558            metrics_port: get_available_port("127.0.0.1"),
559            bridge_authority_key_path: tmp_dir.join(authority_key_path),
560            iota: IotaConfig {
561                iota_rpc_url: bridge_test_cluster.iota_rpc_url(),
562                iota_bridge_chain_id: BridgeChainId::IotaCustom as u8,
563                bridge_client_key_path: Some(tmp_dir.join(client_key_path)),
564                bridge_client_gas_object: Some(gas_obj),
565                iota_bridge_module_last_processed_event_id_override: Some(EventID {
566                    tx_digest: TransactionDigest::random(),
567                    event_seq: 0,
568                }),
569            },
570            eth: EthConfig {
571                eth_rpc_url: bridge_test_cluster.eth_rpc_url(),
572                eth_bridge_proxy_address: bridge_test_cluster.iota_bridge_address(),
573                eth_bridge_chain_id: BridgeChainId::EthCustom as u8,
574                eth_contracts_start_block_fallback: Some(0),
575                eth_contracts_start_block_override: Some(0),
576            },
577            approved_governance_actions: vec![],
578            run_client: true,
579            db_path: Some(db_path),
580        };
581        // Spawn bridge node in memory
582        let _handle = run_bridge_node(
583            config,
584            BridgeNodePublicMetadata::empty_for_testing(),
585            Registry::new(),
586        )
587        .await
588        .unwrap();
589
590        let server_url = format!("http://127.0.0.1:{}", server_listen_port);
591        // Now we expect to see the server to be up and running.
592        // client components are spawned earlier than server, so as long as the server
593        // is up, we know the client components are already running.
594        let res = wait_for_server_to_be_up(server_url, 5).await;
595        res.unwrap();
596    }
597
598    async fn setup() -> BridgeTestCluster {
599        BridgeTestClusterBuilder::new()
600            .with_eth_env(true)
601            .with_bridge_cluster(false)
602            .with_num_validators(2)
603            .build()
604            .await
605    }
606}