iota_bridge/
monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! `BridgeMonitor` receives all `IotaBridgeEvent` and handles them accordingly.
6
7use std::{collections::HashMap, sync::Arc};
8
9use arc_swap::ArcSwap;
10use iota_types::TypeTag;
11use tokio::time::Duration;
12use tracing::{error, info, warn};
13
14use crate::{
15    client::bridge_authority_aggregator::BridgeAuthorityAggregator,
16    crypto::BridgeAuthorityPublicKeyBytes,
17    events::{
18        BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent, EmergencyOpEvent, IotaBridgeEvent,
19    },
20    iota_client::{IotaClient, IotaClientInner},
21    retry_with_max_elapsed_time,
22    types::{BridgeCommittee, IsBridgePaused},
23};
24
25const REFRESH_BRIDGE_RETRY_TIMES: u64 = 3;
26
27pub struct BridgeMonitor<C> {
28    iota_client: Arc<IotaClient<C>>,
29    monitor_rx: iota_metrics::metered_channel::Receiver<IotaBridgeEvent>,
30    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
31    bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
32    iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
33}
34
35impl<C> BridgeMonitor<C>
36where
37    C: IotaClientInner + 'static,
38{
39    pub fn new(
40        iota_client: Arc<IotaClient<C>>,
41        monitor_rx: iota_metrics::metered_channel::Receiver<IotaBridgeEvent>,
42        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
43        bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
44        iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
45    ) -> Self {
46        Self {
47            iota_client,
48            monitor_rx,
49            bridge_auth_agg,
50            bridge_paused_watch_tx,
51            iota_token_type_tags,
52        }
53    }
54
55    pub async fn run(self) {
56        tracing::info!("Starting BridgeMonitor");
57        let Self {
58            iota_client,
59            mut monitor_rx,
60            bridge_auth_agg,
61            bridge_paused_watch_tx,
62            iota_token_type_tags,
63        } = self;
64        let mut latest_token_config = (*iota_token_type_tags.load().clone()).clone();
65
66        while let Some(events) = monitor_rx.recv().await {
67            match events {
68                IotaBridgeEvent::IotaToEthTokenBridgeV1(_) => (),
69                IotaBridgeEvent::TokenTransferApproved(_) => (),
70                IotaBridgeEvent::TokenTransferClaimed(_) => (),
71                IotaBridgeEvent::TokenTransferAlreadyApproved(_) => (),
72                IotaBridgeEvent::TokenTransferAlreadyClaimed(_) => (),
73                IotaBridgeEvent::TokenTransferLimitExceed(_) => {
74                    // TODO
75                }
76
77                IotaBridgeEvent::EmergencyOpEvent(event) => {
78                    info!("Received EmergencyOpEvent: {:?}", event);
79                    let is_paused = get_latest_bridge_pause_status_with_emergency_event(
80                        iota_client.clone(),
81                        event,
82                        Duration::from_secs(10),
83                    )
84                    .await;
85                    bridge_paused_watch_tx
86                        .send(is_paused)
87                        .expect("Bridge pause status watch channel should not be closed");
88                }
89
90                IotaBridgeEvent::CommitteeMemberRegistration(_) => (),
91                IotaBridgeEvent::CommitteeUpdateEvent(_) => (),
92
93                IotaBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => {
94                    info!("Received CommitteeMemberUrlUpdateEvent: {:?}", event);
95                    let new_committee = get_latest_bridge_committee_with_url_update_event(
96                        iota_client.clone(),
97                        event,
98                        Duration::from_secs(10),
99                    )
100                    .await;
101                    bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new(
102                        new_committee,
103                    ))));
104                    info!("Committee updated with CommitteeMemberUrlUpdateEvent");
105                }
106
107                IotaBridgeEvent::BlocklistValidatorEvent(event) => {
108                    info!("Received BlocklistValidatorEvent: {:?}", event);
109                    let new_committee = get_latest_bridge_committee_with_blocklist_event(
110                        iota_client.clone(),
111                        event,
112                        Duration::from_secs(10),
113                    )
114                    .await;
115                    bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new(
116                        new_committee,
117                    ))));
118                    info!("Committee updated with BlocklistValidatorEvent");
119                }
120
121                IotaBridgeEvent::TokenRegistrationEvent(_) => (),
122
123                IotaBridgeEvent::NewTokenEvent(event) => {
124                    if let std::collections::hash_map::Entry::Vacant(entry) =
125                        // We only add new tokens but not remove so it's ok to just insert
126                        latest_token_config.entry(event.token_id)
127                    {
128                        entry.insert(event.type_name.clone());
129                        iota_token_type_tags.store(Arc::new(latest_token_config.clone()));
130                    } else {
131                        // invariant
132                        assert_eq!(event.type_name, latest_token_config[&event.token_id]);
133                    }
134                }
135
136                IotaBridgeEvent::UpdateTokenPriceEvent(_) => (),
137            }
138        }
139
140        panic!("BridgeMonitor channel was closed unexpectedly");
141    }
142}
143
144async fn get_latest_bridge_committee_with_url_update_event<C: IotaClientInner>(
145    iota_client: Arc<IotaClient<C>>,
146    event: CommitteeMemberUrlUpdateEvent,
147    staleness_retry_interval: Duration,
148) -> BridgeCommittee {
149    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
150    loop {
151        let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
152            iota_client.get_bridge_committee(),
153            Duration::from_secs(600)
154        ) else {
155            error!("Failed to get bridge committee after retry");
156            continue;
157        };
158        let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(&event.member));
159        let Some(member) = member else {
160            // This is possible when a node is processing an older event while the member
161            // quit at a later point, which is fine. Or fullnode returns a stale
162            // committee that the member hasn't joined, which is rare and tricy to handle so
163            // we just log it.
164            warn!(
165                "Committee member not found in the committee: {:?}",
166                event.member
167            );
168            return committee;
169        };
170        if member.base_url == event.new_url {
171            return committee;
172        }
173        // If url does not match, it could be:
174        // 1. the query is sent to a stale fullnode that does not have the latest data
175        //    yet
176        // 2. the node is processing an older message, and the latest url has changed
177        //    again
178        // In either case, we retry a few times. If it still fails to match, we assume
179        // it's the latter case.
180        tokio::time::sleep(staleness_retry_interval).await;
181        remaining_retry_times -= 1;
182        if remaining_retry_times == 0 {
183            warn!(
184                "Committee member url {:?} does not match onchain record {:?} after retry",
185                event.member, member
186            );
187            return committee;
188        }
189    }
190}
191
192async fn get_latest_bridge_committee_with_blocklist_event<C: IotaClientInner>(
193    iota_client: Arc<IotaClient<C>>,
194    event: BlocklistValidatorEvent,
195    staleness_retry_interval: Duration,
196) -> BridgeCommittee {
197    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
198    loop {
199        let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
200            iota_client.get_bridge_committee(),
201            Duration::from_secs(600)
202        ) else {
203            error!("Failed to get bridge committee after retry");
204            continue;
205        };
206        let mut any_mismatch = false;
207        for pk in &event.public_keys {
208            let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(pk));
209            let Some(member) = member else {
210                // This is possible when a node is processing an older event while the member
211                // quit at a later point. Or fullnode returns a stale committee that
212                // the member hasn't joined.
213                warn!("Committee member not found in the committee: {:?}", pk);
214                any_mismatch = true;
215                break;
216            };
217            if member.is_blocklisted != event.blocklisted {
218                warn!(
219                    "Committee member blocklist status does not match onchain record: {:?}",
220                    member
221                );
222                any_mismatch = true;
223                break;
224            }
225        }
226        if !any_mismatch {
227            return committee;
228        }
229        // If there is any match, it could be:
230        // 1. the query is sent to a stale fullnode that does not have the latest data
231        //    yet
232        // 2. the node is processing an older message, and the latest blocklist status
233        //    has changed again
234        // In either case, we retry a few times. If it still fails to match, we assume
235        // it's the latter case.
236        tokio::time::sleep(staleness_retry_interval).await;
237        remaining_retry_times -= 1;
238        if remaining_retry_times == 0 {
239            warn!(
240                "Committee member blocklist status {:?} does not match onchain record after retry",
241                event
242            );
243            return committee;
244        }
245    }
246}
247
248async fn get_latest_bridge_pause_status_with_emergency_event<C: IotaClientInner>(
249    iota_client: Arc<IotaClient<C>>,
250    event: EmergencyOpEvent,
251    staleness_retry_interval: Duration,
252) -> IsBridgePaused {
253    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
254    loop {
255        let Ok(Ok(summary)) = retry_with_max_elapsed_time!(
256            iota_client.get_bridge_summary(),
257            Duration::from_secs(600)
258        ) else {
259            error!("Failed to get bridge summary after retry");
260            continue;
261        };
262        if summary.is_frozen == event.frozen {
263            return summary.is_frozen;
264        }
265        // If the onchain status does not match, it could be:
266        // 1. the query is sent to a stale fullnode that does not have the latest data
267        //    yet
268        // 2. the node is processing an older message, and the latest status has changed
269        //    again
270        // In either case, we retry a few times. If it still fails to match, we assume
271        // it's the latter case.
272        tokio::time::sleep(staleness_retry_interval).await;
273        remaining_retry_times -= 1;
274        if remaining_retry_times == 0 {
275            warn!(
276                "Bridge pause status {:?} does not match onchain record {:?} after retry",
277                event, summary.is_frozen
278            );
279            return summary.is_frozen;
280        }
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use std::str::FromStr;
287
288    use fastcrypto::traits::KeyPair;
289    use iota_types::{
290        base_types::IotaAddress,
291        bridge::{BridgeCommitteeSummary, MoveTypeCommitteeMember},
292        crypto::{ToFromBytes, get_key_pair},
293    };
294    use prometheus::Registry;
295
296    use super::*;
297    use crate::{
298        events::{NewTokenEvent, init_all_struct_tags},
299        iota_mock_client::IotaMockClient,
300        test_utils::{bridge_committee_to_bridge_committee_summary, get_test_authority_and_key},
301        types::{BRIDGE_PAUSED, BRIDGE_UNPAUSED, BridgeAuthority, BridgeCommittee},
302    };
303
304    #[tokio::test]
305    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
306    async fn test_get_latest_bridge_committee_with_url_update_event() {
307        telemetry_subscribers::init_for_testing();
308        let iota_client_mock = IotaMockClient::default();
309        let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
310        let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
311        let pk = kp.public().clone();
312        let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
313        let pk_bytes = pk_as_bytes.as_bytes().to_vec();
314        let event = CommitteeMemberUrlUpdateEvent {
315            member: pk,
316            new_url: "http://new.url".to_string(),
317        };
318        let summary = BridgeCommitteeSummary {
319            members: vec![(
320                pk_bytes.clone(),
321                MoveTypeCommitteeMember {
322                    iota_address: IotaAddress::random_for_testing_only(),
323                    bridge_pubkey_bytes: pk_bytes.clone(),
324                    voting_power: 10000,
325                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
326                    blocklisted: false,
327                },
328            )],
329            member_registration: vec![],
330            last_committee_update_epoch: 0,
331        };
332
333        // Test the regular case, the onchain url matches
334        iota_client_mock.set_bridge_committee(summary.clone());
335        let timer = std::time::Instant::now();
336        let committee = get_latest_bridge_committee_with_url_update_event(
337            iota_client.clone(),
338            event.clone(),
339            Duration::from_secs(2),
340        )
341        .await;
342        assert_eq!(
343            committee.member(&pk_as_bytes).unwrap().base_url,
344            "http://new.url"
345        );
346        assert!(timer.elapsed().as_millis() < 500);
347
348        // Test the case where the onchain url is older. Then update onchain url in 1
349        // second. Since the retry interval is 2 seconds, it should return the
350        // next retry.
351        let old_summary = BridgeCommitteeSummary {
352            members: vec![(
353                pk_bytes.clone(),
354                MoveTypeCommitteeMember {
355                    iota_address: IotaAddress::random_for_testing_only(),
356                    bridge_pubkey_bytes: pk_bytes.clone(),
357                    voting_power: 10000,
358                    http_rest_url: "http://old.url".to_string().as_bytes().to_vec(),
359                    blocklisted: false,
360                },
361            )],
362            member_registration: vec![],
363            last_committee_update_epoch: 0,
364        };
365        iota_client_mock.set_bridge_committee(old_summary.clone());
366        let timer = std::time::Instant::now();
367        // update the url to "http://new.url" in 1 second
368        let iota_client_mock_clone = iota_client_mock.clone();
369        tokio::spawn(async move {
370            tokio::time::sleep(Duration::from_secs(1)).await;
371            iota_client_mock_clone.set_bridge_committee(summary.clone());
372        });
373        let committee = get_latest_bridge_committee_with_url_update_event(
374            iota_client.clone(),
375            event.clone(),
376            Duration::from_secs(2),
377        )
378        .await;
379        assert_eq!(
380            committee.member(&pk_as_bytes).unwrap().base_url,
381            "http://new.url"
382        );
383        let elapsed = timer.elapsed().as_millis();
384        assert!(elapsed > 1000 && elapsed < 3000);
385
386        // Test the case where the onchain url is newer. It should retry up to
387        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
388        let newer_summary = BridgeCommitteeSummary {
389            members: vec![(
390                pk_bytes.clone(),
391                MoveTypeCommitteeMember {
392                    iota_address: IotaAddress::random_for_testing_only(),
393                    bridge_pubkey_bytes: pk_bytes.clone(),
394                    voting_power: 10000,
395                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
396                    blocklisted: false,
397                },
398            )],
399            member_registration: vec![],
400            last_committee_update_epoch: 0,
401        };
402        iota_client_mock.set_bridge_committee(newer_summary.clone());
403        let timer = std::time::Instant::now();
404        let committee = get_latest_bridge_committee_with_url_update_event(
405            iota_client.clone(),
406            event.clone(),
407            Duration::from_millis(500),
408        )
409        .await;
410        assert_eq!(
411            committee.member(&pk_as_bytes).unwrap().base_url,
412            "http://newer.url"
413        );
414        let elapsed = timer.elapsed().as_millis();
415        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
416
417        // Test the case where the member is not found in the committee
418        // It should return the onchain record.
419        let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
420        let pk2 = kp2.public().clone();
421        let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
422        let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
423        let newer_summary = BridgeCommitteeSummary {
424            members: vec![(
425                pk_bytes2.clone(),
426                MoveTypeCommitteeMember {
427                    iota_address: IotaAddress::random_for_testing_only(),
428                    bridge_pubkey_bytes: pk_bytes2.clone(),
429                    voting_power: 10000,
430                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
431                    blocklisted: false,
432                },
433            )],
434            member_registration: vec![],
435            last_committee_update_epoch: 0,
436        };
437        iota_client_mock.set_bridge_committee(newer_summary.clone());
438        let timer = std::time::Instant::now();
439        let committee = get_latest_bridge_committee_with_url_update_event(
440            iota_client.clone(),
441            event.clone(),
442            Duration::from_secs(1),
443        )
444        .await;
445        assert_eq!(
446            committee.member(&pk_as_bytes2).unwrap().base_url,
447            "http://newer.url"
448        );
449        assert!(committee.member(&pk_as_bytes).is_none());
450        let elapsed = timer.elapsed().as_millis();
451        assert!(elapsed < 1000);
452    }
453
454    #[tokio::test]
455    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
456    async fn test_get_latest_bridge_committee_with_blocklist_event() {
457        telemetry_subscribers::init_for_testing();
458        let iota_client_mock = IotaMockClient::default();
459        let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
460        let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
461        let pk = kp.public().clone();
462        let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
463        let pk_bytes = pk_as_bytes.as_bytes().to_vec();
464
465        // Test the case where the onchain status is the same as the event (blocklisted)
466        let event = BlocklistValidatorEvent {
467            blocklisted: true,
468            public_keys: vec![pk.clone()],
469        };
470        let summary = BridgeCommitteeSummary {
471            members: vec![(
472                pk_bytes.clone(),
473                MoveTypeCommitteeMember {
474                    iota_address: IotaAddress::random_for_testing_only(),
475                    bridge_pubkey_bytes: pk_bytes.clone(),
476                    voting_power: 10000,
477                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
478                    blocklisted: true,
479                },
480            )],
481            member_registration: vec![],
482            last_committee_update_epoch: 0,
483        };
484        iota_client_mock.set_bridge_committee(summary.clone());
485        let timer = std::time::Instant::now();
486        let committee = get_latest_bridge_committee_with_blocklist_event(
487            iota_client.clone(),
488            event.clone(),
489            Duration::from_secs(2),
490        )
491        .await;
492        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
493        assert!(timer.elapsed().as_millis() < 500);
494
495        // Test the case where the onchain status is the same as the event
496        // (unblocklisted)
497        let event = BlocklistValidatorEvent {
498            blocklisted: false,
499            public_keys: vec![pk.clone()],
500        };
501        let summary = BridgeCommitteeSummary {
502            members: vec![(
503                pk_bytes.clone(),
504                MoveTypeCommitteeMember {
505                    iota_address: IotaAddress::random_for_testing_only(),
506                    bridge_pubkey_bytes: pk_bytes.clone(),
507                    voting_power: 10000,
508                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
509                    blocklisted: false,
510                },
511            )],
512            member_registration: vec![],
513            last_committee_update_epoch: 0,
514        };
515        iota_client_mock.set_bridge_committee(summary.clone());
516        let timer = std::time::Instant::now();
517        let committee = get_latest_bridge_committee_with_blocklist_event(
518            iota_client.clone(),
519            event.clone(),
520            Duration::from_secs(2),
521        )
522        .await;
523        assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
524        assert!(timer.elapsed().as_millis() < 500);
525
526        // Test the case where the onchain status is older. Then update onchain status
527        // in 1 second. Since the retry interval is 2 seconds, it should return
528        // the next retry.
529        let old_summary = BridgeCommitteeSummary {
530            members: vec![(
531                pk_bytes.clone(),
532                MoveTypeCommitteeMember {
533                    iota_address: IotaAddress::random_for_testing_only(),
534                    bridge_pubkey_bytes: pk_bytes.clone(),
535                    voting_power: 10000,
536                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
537                    blocklisted: true,
538                },
539            )],
540            member_registration: vec![],
541            last_committee_update_epoch: 0,
542        };
543        iota_client_mock.set_bridge_committee(old_summary.clone());
544        let timer = std::time::Instant::now();
545        // update unblocklisted in 1 second
546        let iota_client_mock_clone = iota_client_mock.clone();
547        tokio::spawn(async move {
548            tokio::time::sleep(Duration::from_secs(1)).await;
549            iota_client_mock_clone.set_bridge_committee(summary.clone());
550        });
551        let committee = get_latest_bridge_committee_with_blocklist_event(
552            iota_client.clone(),
553            event.clone(),
554            Duration::from_secs(2),
555        )
556        .await;
557        assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
558        let elapsed = timer.elapsed().as_millis();
559        assert!(elapsed > 1000 && elapsed < 3000);
560
561        // Test the case where the onchain url is newer. It should retry up to
562        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
563        let newer_summary = BridgeCommitteeSummary {
564            members: vec![(
565                pk_bytes.clone(),
566                MoveTypeCommitteeMember {
567                    iota_address: IotaAddress::random_for_testing_only(),
568                    bridge_pubkey_bytes: pk_bytes.clone(),
569                    voting_power: 10000,
570                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
571                    blocklisted: true,
572                },
573            )],
574            member_registration: vec![],
575            last_committee_update_epoch: 0,
576        };
577        iota_client_mock.set_bridge_committee(newer_summary.clone());
578        let timer = std::time::Instant::now();
579        let committee = get_latest_bridge_committee_with_blocklist_event(
580            iota_client.clone(),
581            event.clone(),
582            Duration::from_millis(500),
583        )
584        .await;
585        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
586        let elapsed = timer.elapsed().as_millis();
587        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
588
589        // Test the case where the member onchain url is not found in the committee
590        // It should return the onchain record after retrying a few times.
591        let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
592        let pk2 = kp2.public().clone();
593        let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
594        let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
595        let summary = BridgeCommitteeSummary {
596            members: vec![(
597                pk_bytes2.clone(),
598                MoveTypeCommitteeMember {
599                    iota_address: IotaAddress::random_for_testing_only(),
600                    bridge_pubkey_bytes: pk_bytes2.clone(),
601                    voting_power: 10000,
602                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
603                    blocklisted: false,
604                },
605            )],
606            member_registration: vec![],
607            last_committee_update_epoch: 0,
608        };
609        iota_client_mock.set_bridge_committee(summary.clone());
610        let timer = std::time::Instant::now();
611        let committee = get_latest_bridge_committee_with_blocklist_event(
612            iota_client.clone(),
613            event.clone(),
614            Duration::from_secs(1),
615        )
616        .await;
617        assert_eq!(
618            committee.member(&pk_as_bytes2).unwrap().base_url,
619            "http://newer.url"
620        );
621        assert!(committee.member(&pk_as_bytes).is_none());
622        let elapsed = timer.elapsed().as_millis();
623        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
624
625        // Test any mismtach in the blocklist status should retry a few times
626        let event = BlocklistValidatorEvent {
627            blocklisted: true,
628            public_keys: vec![pk, pk2],
629        };
630        let summary = BridgeCommitteeSummary {
631            members: vec![
632                (
633                    pk_bytes.clone(),
634                    MoveTypeCommitteeMember {
635                        iota_address: IotaAddress::random_for_testing_only(),
636                        bridge_pubkey_bytes: pk_bytes.clone(),
637                        voting_power: 5000,
638                        http_rest_url: "http://pk.url".to_string().as_bytes().to_vec(),
639                        blocklisted: true,
640                    },
641                ),
642                (
643                    pk_bytes2.clone(),
644                    MoveTypeCommitteeMember {
645                        iota_address: IotaAddress::random_for_testing_only(),
646                        bridge_pubkey_bytes: pk_bytes2.clone(),
647                        voting_power: 5000,
648                        http_rest_url: "http://pk2.url".to_string().as_bytes().to_vec(),
649                        blocklisted: false,
650                    },
651                ),
652            ],
653            member_registration: vec![],
654            last_committee_update_epoch: 0,
655        };
656        iota_client_mock.set_bridge_committee(summary.clone());
657        let timer = std::time::Instant::now();
658        let committee = get_latest_bridge_committee_with_blocklist_event(
659            iota_client.clone(),
660            event.clone(),
661            Duration::from_millis(500),
662        )
663        .await;
664        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
665        assert!(!committee.member(&pk_as_bytes2).unwrap().is_blocklisted);
666        let elapsed = timer.elapsed().as_millis();
667        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
668    }
669
670    #[tokio::test]
671    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
672    async fn test_get_bridge_pause_status_with_emergency_event() {
673        telemetry_subscribers::init_for_testing();
674        let iota_client_mock = IotaMockClient::default();
675        let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
676
677        // Test event and onchain status match
678        let event = EmergencyOpEvent { frozen: true };
679        iota_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
680        let timer = std::time::Instant::now();
681        assert!(
682            get_latest_bridge_pause_status_with_emergency_event(
683                iota_client.clone(),
684                event.clone(),
685                Duration::from_secs(2),
686            )
687            .await
688        );
689        assert!(timer.elapsed().as_millis() < 500);
690
691        let event = EmergencyOpEvent { frozen: false };
692        iota_client_mock.set_is_bridge_paused(BRIDGE_UNPAUSED);
693        let timer = std::time::Instant::now();
694        assert!(
695            !get_latest_bridge_pause_status_with_emergency_event(
696                iota_client.clone(),
697                event.clone(),
698                Duration::from_secs(2),
699            )
700            .await
701        );
702        assert!(timer.elapsed().as_millis() < 500);
703
704        // Test the case where the onchain status (paused) is older. Then update onchain
705        // status in 1 second. Since the retry interval is 2 seconds, it should
706        // return the next retry.
707        iota_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
708        let timer = std::time::Instant::now();
709        // update the bridge to unpaused in 1 second
710        let iota_client_mock_clone = iota_client_mock.clone();
711        tokio::spawn(async move {
712            tokio::time::sleep(Duration::from_secs(1)).await;
713            iota_client_mock_clone.set_is_bridge_paused(BRIDGE_UNPAUSED);
714        });
715        assert!(
716            !get_latest_bridge_pause_status_with_emergency_event(
717                iota_client.clone(),
718                event.clone(),
719                Duration::from_secs(2),
720            )
721            .await
722        );
723        let elapsed = timer.elapsed().as_millis();
724        assert!(elapsed > 1000 && elapsed < 3000, "{}", elapsed);
725
726        // Test the case where the onchain status (paused) is newer. It should retry up
727        // to REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
728        iota_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
729        let timer = std::time::Instant::now();
730        assert!(
731            get_latest_bridge_pause_status_with_emergency_event(
732                iota_client.clone(),
733                event.clone(),
734                Duration::from_secs(2),
735            )
736            .await
737        );
738        let elapsed = timer.elapsed().as_millis();
739        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
740    }
741
742    #[tokio::test]
743    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
744    async fn test_update_bridge_authority_aggregation_with_url_change_event() {
745        let (
746            monitor_tx,
747            monitor_rx,
748            iota_client_mock,
749            iota_client,
750            bridge_pause_tx,
751            _bridge_pause_rx,
752            mut authorities,
753        ) = setup();
754        let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
755        let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
756            Arc::new(old_committee),
757        ))));
758        let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
759        let _handle = tokio::task::spawn(
760            BridgeMonitor::new(
761                iota_client.clone(),
762                monitor_rx,
763                agg.clone(),
764                bridge_pause_tx,
765                iota_token_type_tags,
766            )
767            .run(),
768        );
769        let new_url = "http://new.url".to_string();
770        authorities[0].base_url = new_url.clone();
771        let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
772        let new_committee_summary =
773            bridge_committee_to_bridge_committee_summary(new_committee.clone());
774        iota_client_mock.set_bridge_committee(new_committee_summary.clone());
775        monitor_tx
776            .send(IotaBridgeEvent::CommitteeMemberUrlUpdateEvent(
777                CommitteeMemberUrlUpdateEvent {
778                    member: authorities[0].pubkey.clone(),
779                    new_url: new_url.clone(),
780                },
781            ))
782            .await
783            .unwrap();
784        // Wait for the monitor to process the event
785        tokio::time::sleep(Duration::from_secs(1)).await;
786        // Now expect the committee to be updated
787        assert_eq!(
788            agg.load()
789                .committee
790                .member(&BridgeAuthorityPublicKeyBytes::from(&authorities[0].pubkey))
791                .unwrap()
792                .base_url,
793            new_url
794        );
795    }
796
797    #[tokio::test]
798    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
799    async fn test_update_bridge_authority_aggregation_with_blocklist_event() {
800        let (
801            monitor_tx,
802            monitor_rx,
803            iota_client_mock,
804            iota_client,
805            bridge_pause_tx,
806            _bridge_pause_rx,
807            mut authorities,
808        ) = setup();
809        let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
810        let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
811            Arc::new(old_committee),
812        ))));
813        let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
814        let _handle = tokio::task::spawn(
815            BridgeMonitor::new(
816                iota_client.clone(),
817                monitor_rx,
818                agg.clone(),
819                bridge_pause_tx,
820                iota_token_type_tags,
821            )
822            .run(),
823        );
824        authorities[0].is_blocklisted = true;
825        let to_blocklist = &authorities[0];
826        let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
827        let new_committee_summary =
828            bridge_committee_to_bridge_committee_summary(new_committee.clone());
829        iota_client_mock.set_bridge_committee(new_committee_summary.clone());
830        monitor_tx
831            .send(IotaBridgeEvent::BlocklistValidatorEvent(
832                BlocklistValidatorEvent {
833                    public_keys: vec![to_blocklist.pubkey.clone()],
834                    blocklisted: true,
835                },
836            ))
837            .await
838            .unwrap();
839        // Wait for the monitor to process the event
840        tokio::time::sleep(Duration::from_secs(1)).await;
841        assert!(
842            agg.load()
843                .committee
844                .member(&BridgeAuthorityPublicKeyBytes::from(&to_blocklist.pubkey))
845                .unwrap()
846                .is_blocklisted,
847        );
848    }
849
850    #[tokio::test]
851    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
852    async fn test_update_bridge_pause_status_with_emergency_event() {
853        let (
854            monitor_tx,
855            monitor_rx,
856            iota_client_mock,
857            iota_client,
858            bridge_pause_tx,
859            bridge_pause_rx,
860            authorities,
861        ) = setup();
862        let event = EmergencyOpEvent {
863            frozen: !*bridge_pause_tx.borrow(), // toggle the bridge pause status
864        };
865        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
866        let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
867            Arc::new(committee),
868        ))));
869        let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
870        let _handle = tokio::task::spawn(
871            BridgeMonitor::new(
872                iota_client.clone(),
873                monitor_rx,
874                agg.clone(),
875                bridge_pause_tx,
876                iota_token_type_tags,
877            )
878            .run(),
879        );
880
881        iota_client_mock.set_is_bridge_paused(event.frozen);
882        monitor_tx
883            .send(IotaBridgeEvent::EmergencyOpEvent(event.clone()))
884            .await
885            .unwrap();
886        // Wait for the monitor to process the event
887        tokio::time::sleep(Duration::from_secs(1)).await;
888        // Now expect the committee to be updated
889        assert!(*bridge_pause_rx.borrow() == event.frozen);
890    }
891
892    #[tokio::test]
893    #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
894    async fn test_update_iota_token_type_tags() {
895        let (
896            monitor_tx,
897            monitor_rx,
898            _iota_client_mock,
899            iota_client,
900            bridge_pause_tx,
901            _bridge_pause_rx,
902            authorities,
903        ) = setup();
904        let event = NewTokenEvent {
905            token_id: 255,
906            type_name: TypeTag::from_str("0xbeef::beef::BEEF").unwrap(),
907            native_token: false,
908            decimal_multiplier: 1000000,
909            notional_value: 100000000,
910        };
911        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
912        let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
913            Arc::new(committee),
914        ))));
915        let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
916        let iota_token_type_tags_clone = iota_token_type_tags.clone();
917        let _handle = tokio::task::spawn(
918            BridgeMonitor::new(
919                iota_client.clone(),
920                monitor_rx,
921                agg.clone(),
922                bridge_pause_tx,
923                iota_token_type_tags_clone,
924            )
925            .run(),
926        );
927
928        monitor_tx
929            .send(IotaBridgeEvent::NewTokenEvent(event.clone()))
930            .await
931            .unwrap();
932        // Wait for the monitor to process the event
933        tokio::time::sleep(Duration::from_secs(1)).await;
934        // Now expect new token type tags to appear in iota_token_type_tags
935        iota_token_type_tags
936            .load()
937            .clone()
938            .get(&event.token_id)
939            .unwrap();
940    }
941
942    #[expect(clippy::type_complexity)]
943    fn setup() -> (
944        iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
945        iota_metrics::metered_channel::Receiver<IotaBridgeEvent>,
946        IotaMockClient,
947        Arc<IotaClient<IotaMockClient>>,
948        tokio::sync::watch::Sender<IsBridgePaused>,
949        tokio::sync::watch::Receiver<IsBridgePaused>,
950        Vec<BridgeAuthority>,
951    ) {
952        telemetry_subscribers::init_for_testing();
953        let registry = Registry::new();
954        iota_metrics::init_metrics(&registry);
955        init_all_struct_tags();
956
957        let iota_client_mock = IotaMockClient::default();
958        let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
959        let (monitor_tx, monitor_rx) = iota_metrics::metered_channel::channel(
960            10000,
961            &iota_metrics::get_metrics()
962                .unwrap()
963                .channel_inflight
964                .with_label_values(&["monitor_queue"]),
965        );
966        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
967        let authorities = vec![
968            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
969            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
970            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
971            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
972        ];
973        (
974            monitor_tx,
975            monitor_rx,
976            iota_client_mock,
977            iota_client,
978            bridge_pause_tx,
979            bridge_pause_rx,
980            authorities,
981        )
982    }
983}