1use std::{collections::HashMap, sync::Arc};
8
9use arc_swap::ArcSwap;
10use iota_types::TypeTag;
11use tokio::time::Duration;
12use tracing::{error, info, warn};
13
14use crate::{
15 client::bridge_authority_aggregator::BridgeAuthorityAggregator,
16 crypto::BridgeAuthorityPublicKeyBytes,
17 events::{
18 BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent, EmergencyOpEvent, IotaBridgeEvent,
19 },
20 iota_client::{IotaClient, IotaClientInner},
21 retry_with_max_elapsed_time,
22 types::{BridgeCommittee, IsBridgePaused},
23};
24
25const REFRESH_BRIDGE_RETRY_TIMES: u64 = 3;
26
27pub struct BridgeMonitor<C> {
28 iota_client: Arc<IotaClient<C>>,
29 monitor_rx: iota_metrics::metered_channel::Receiver<IotaBridgeEvent>,
30 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
31 bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
32 iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
33}
34
35impl<C> BridgeMonitor<C>
36where
37 C: IotaClientInner + 'static,
38{
39 pub fn new(
40 iota_client: Arc<IotaClient<C>>,
41 monitor_rx: iota_metrics::metered_channel::Receiver<IotaBridgeEvent>,
42 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
43 bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
44 iota_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
45 ) -> Self {
46 Self {
47 iota_client,
48 monitor_rx,
49 bridge_auth_agg,
50 bridge_paused_watch_tx,
51 iota_token_type_tags,
52 }
53 }
54
55 pub async fn run(self) {
56 tracing::info!("Starting BridgeMonitor");
57 let Self {
58 iota_client,
59 mut monitor_rx,
60 bridge_auth_agg,
61 bridge_paused_watch_tx,
62 iota_token_type_tags,
63 } = self;
64 let mut latest_token_config = (*iota_token_type_tags.load().clone()).clone();
65
66 while let Some(events) = monitor_rx.recv().await {
67 match events {
68 IotaBridgeEvent::IotaToEthTokenBridgeV1(_) => (),
69 IotaBridgeEvent::TokenTransferApproved(_) => (),
70 IotaBridgeEvent::TokenTransferClaimed(_) => (),
71 IotaBridgeEvent::TokenTransferAlreadyApproved(_) => (),
72 IotaBridgeEvent::TokenTransferAlreadyClaimed(_) => (),
73 IotaBridgeEvent::TokenTransferLimitExceed(_) => {
74 }
76
77 IotaBridgeEvent::EmergencyOpEvent(event) => {
78 info!("Received EmergencyOpEvent: {:?}", event);
79 let is_paused = get_latest_bridge_pause_status_with_emergency_event(
80 iota_client.clone(),
81 event,
82 Duration::from_secs(10),
83 )
84 .await;
85 bridge_paused_watch_tx
86 .send(is_paused)
87 .expect("Bridge pause status watch channel should not be closed");
88 }
89
90 IotaBridgeEvent::CommitteeMemberRegistration(_) => (),
91 IotaBridgeEvent::CommitteeUpdateEvent(_) => (),
92
93 IotaBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => {
94 info!("Received CommitteeMemberUrlUpdateEvent: {:?}", event);
95 let new_committee = get_latest_bridge_committee_with_url_update_event(
96 iota_client.clone(),
97 event,
98 Duration::from_secs(10),
99 )
100 .await;
101 bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new(
102 new_committee,
103 ))));
104 info!("Committee updated with CommitteeMemberUrlUpdateEvent");
105 }
106
107 IotaBridgeEvent::BlocklistValidatorEvent(event) => {
108 info!("Received BlocklistValidatorEvent: {:?}", event);
109 let new_committee = get_latest_bridge_committee_with_blocklist_event(
110 iota_client.clone(),
111 event,
112 Duration::from_secs(10),
113 )
114 .await;
115 bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new(
116 new_committee,
117 ))));
118 info!("Committee updated with BlocklistValidatorEvent");
119 }
120
121 IotaBridgeEvent::TokenRegistrationEvent(_) => (),
122
123 IotaBridgeEvent::NewTokenEvent(event) => {
124 if let std::collections::hash_map::Entry::Vacant(entry) =
125 latest_token_config.entry(event.token_id)
127 {
128 entry.insert(event.type_name.clone());
129 iota_token_type_tags.store(Arc::new(latest_token_config.clone()));
130 } else {
131 assert_eq!(event.type_name, latest_token_config[&event.token_id]);
133 }
134 }
135
136 IotaBridgeEvent::UpdateTokenPriceEvent(_) => (),
137 }
138 }
139
140 panic!("BridgeMonitor channel was closed unexpectedly");
141 }
142}
143
144async fn get_latest_bridge_committee_with_url_update_event<C: IotaClientInner>(
145 iota_client: Arc<IotaClient<C>>,
146 event: CommitteeMemberUrlUpdateEvent,
147 staleness_retry_interval: Duration,
148) -> BridgeCommittee {
149 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
150 loop {
151 let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
152 iota_client.get_bridge_committee(),
153 Duration::from_secs(600)
154 ) else {
155 error!("Failed to get bridge committee after retry");
156 continue;
157 };
158 let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(&event.member));
159 let Some(member) = member else {
160 warn!(
165 "Committee member not found in the committee: {:?}",
166 event.member
167 );
168 return committee;
169 };
170 if member.base_url == event.new_url {
171 return committee;
172 }
173 tokio::time::sleep(staleness_retry_interval).await;
181 remaining_retry_times -= 1;
182 if remaining_retry_times == 0 {
183 warn!(
184 "Committee member url {:?} does not match onchain record {:?} after retry",
185 event.member, member
186 );
187 return committee;
188 }
189 }
190}
191
192async fn get_latest_bridge_committee_with_blocklist_event<C: IotaClientInner>(
193 iota_client: Arc<IotaClient<C>>,
194 event: BlocklistValidatorEvent,
195 staleness_retry_interval: Duration,
196) -> BridgeCommittee {
197 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
198 loop {
199 let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
200 iota_client.get_bridge_committee(),
201 Duration::from_secs(600)
202 ) else {
203 error!("Failed to get bridge committee after retry");
204 continue;
205 };
206 let mut any_mismatch = false;
207 for pk in &event.public_keys {
208 let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(pk));
209 let Some(member) = member else {
210 warn!("Committee member not found in the committee: {:?}", pk);
214 any_mismatch = true;
215 break;
216 };
217 if member.is_blocklisted != event.blocklisted {
218 warn!(
219 "Committee member blocklist status does not match onchain record: {:?}",
220 member
221 );
222 any_mismatch = true;
223 break;
224 }
225 }
226 if !any_mismatch {
227 return committee;
228 }
229 tokio::time::sleep(staleness_retry_interval).await;
237 remaining_retry_times -= 1;
238 if remaining_retry_times == 0 {
239 warn!(
240 "Committee member blocklist status {:?} does not match onchain record after retry",
241 event
242 );
243 return committee;
244 }
245 }
246}
247
248async fn get_latest_bridge_pause_status_with_emergency_event<C: IotaClientInner>(
249 iota_client: Arc<IotaClient<C>>,
250 event: EmergencyOpEvent,
251 staleness_retry_interval: Duration,
252) -> IsBridgePaused {
253 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
254 loop {
255 let Ok(Ok(summary)) = retry_with_max_elapsed_time!(
256 iota_client.get_bridge_summary(),
257 Duration::from_secs(600)
258 ) else {
259 error!("Failed to get bridge summary after retry");
260 continue;
261 };
262 if summary.is_frozen == event.frozen {
263 return summary.is_frozen;
264 }
265 tokio::time::sleep(staleness_retry_interval).await;
273 remaining_retry_times -= 1;
274 if remaining_retry_times == 0 {
275 warn!(
276 "Bridge pause status {:?} does not match onchain record {:?} after retry",
277 event, summary.is_frozen
278 );
279 return summary.is_frozen;
280 }
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use std::str::FromStr;
287
288 use fastcrypto::traits::KeyPair;
289 use iota_types::{
290 base_types::IotaAddress,
291 bridge::{BridgeCommitteeSummary, MoveTypeCommitteeMember},
292 crypto::{ToFromBytes, get_key_pair},
293 };
294 use prometheus::Registry;
295
296 use super::*;
297 use crate::{
298 events::{NewTokenEvent, init_all_struct_tags},
299 iota_mock_client::IotaMockClient,
300 test_utils::{bridge_committee_to_bridge_committee_summary, get_test_authority_and_key},
301 types::{BRIDGE_PAUSED, BRIDGE_UNPAUSED, BridgeAuthority, BridgeCommittee},
302 };
303
304 #[tokio::test]
305 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
306 async fn test_get_latest_bridge_committee_with_url_update_event() {
307 telemetry_subscribers::init_for_testing();
308 let iota_client_mock = IotaMockClient::default();
309 let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
310 let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
311 let pk = kp.public().clone();
312 let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
313 let pk_bytes = pk_as_bytes.as_bytes().to_vec();
314 let event = CommitteeMemberUrlUpdateEvent {
315 member: pk,
316 new_url: "http://new.url".to_string(),
317 };
318 let summary = BridgeCommitteeSummary {
319 members: vec![(
320 pk_bytes.clone(),
321 MoveTypeCommitteeMember {
322 iota_address: IotaAddress::random_for_testing_only(),
323 bridge_pubkey_bytes: pk_bytes.clone(),
324 voting_power: 10000,
325 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
326 blocklisted: false,
327 },
328 )],
329 member_registration: vec![],
330 last_committee_update_epoch: 0,
331 };
332
333 iota_client_mock.set_bridge_committee(summary.clone());
335 let timer = std::time::Instant::now();
336 let committee = get_latest_bridge_committee_with_url_update_event(
337 iota_client.clone(),
338 event.clone(),
339 Duration::from_secs(2),
340 )
341 .await;
342 assert_eq!(
343 committee.member(&pk_as_bytes).unwrap().base_url,
344 "http://new.url"
345 );
346 assert!(timer.elapsed().as_millis() < 500);
347
348 let old_summary = BridgeCommitteeSummary {
352 members: vec![(
353 pk_bytes.clone(),
354 MoveTypeCommitteeMember {
355 iota_address: IotaAddress::random_for_testing_only(),
356 bridge_pubkey_bytes: pk_bytes.clone(),
357 voting_power: 10000,
358 http_rest_url: "http://old.url".to_string().as_bytes().to_vec(),
359 blocklisted: false,
360 },
361 )],
362 member_registration: vec![],
363 last_committee_update_epoch: 0,
364 };
365 iota_client_mock.set_bridge_committee(old_summary.clone());
366 let timer = std::time::Instant::now();
367 let iota_client_mock_clone = iota_client_mock.clone();
369 tokio::spawn(async move {
370 tokio::time::sleep(Duration::from_secs(1)).await;
371 iota_client_mock_clone.set_bridge_committee(summary.clone());
372 });
373 let committee = get_latest_bridge_committee_with_url_update_event(
374 iota_client.clone(),
375 event.clone(),
376 Duration::from_secs(2),
377 )
378 .await;
379 assert_eq!(
380 committee.member(&pk_as_bytes).unwrap().base_url,
381 "http://new.url"
382 );
383 let elapsed = timer.elapsed().as_millis();
384 assert!(elapsed > 1000 && elapsed < 3000);
385
386 let newer_summary = BridgeCommitteeSummary {
389 members: vec![(
390 pk_bytes.clone(),
391 MoveTypeCommitteeMember {
392 iota_address: IotaAddress::random_for_testing_only(),
393 bridge_pubkey_bytes: pk_bytes.clone(),
394 voting_power: 10000,
395 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
396 blocklisted: false,
397 },
398 )],
399 member_registration: vec![],
400 last_committee_update_epoch: 0,
401 };
402 iota_client_mock.set_bridge_committee(newer_summary.clone());
403 let timer = std::time::Instant::now();
404 let committee = get_latest_bridge_committee_with_url_update_event(
405 iota_client.clone(),
406 event.clone(),
407 Duration::from_millis(500),
408 )
409 .await;
410 assert_eq!(
411 committee.member(&pk_as_bytes).unwrap().base_url,
412 "http://newer.url"
413 );
414 let elapsed = timer.elapsed().as_millis();
415 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
416
417 let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
420 let pk2 = kp2.public().clone();
421 let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
422 let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
423 let newer_summary = BridgeCommitteeSummary {
424 members: vec![(
425 pk_bytes2.clone(),
426 MoveTypeCommitteeMember {
427 iota_address: IotaAddress::random_for_testing_only(),
428 bridge_pubkey_bytes: pk_bytes2.clone(),
429 voting_power: 10000,
430 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
431 blocklisted: false,
432 },
433 )],
434 member_registration: vec![],
435 last_committee_update_epoch: 0,
436 };
437 iota_client_mock.set_bridge_committee(newer_summary.clone());
438 let timer = std::time::Instant::now();
439 let committee = get_latest_bridge_committee_with_url_update_event(
440 iota_client.clone(),
441 event.clone(),
442 Duration::from_secs(1),
443 )
444 .await;
445 assert_eq!(
446 committee.member(&pk_as_bytes2).unwrap().base_url,
447 "http://newer.url"
448 );
449 assert!(committee.member(&pk_as_bytes).is_none());
450 let elapsed = timer.elapsed().as_millis();
451 assert!(elapsed < 1000);
452 }
453
454 #[tokio::test]
455 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
456 async fn test_get_latest_bridge_committee_with_blocklist_event() {
457 telemetry_subscribers::init_for_testing();
458 let iota_client_mock = IotaMockClient::default();
459 let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
460 let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
461 let pk = kp.public().clone();
462 let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
463 let pk_bytes = pk_as_bytes.as_bytes().to_vec();
464
465 let event = BlocklistValidatorEvent {
467 blocklisted: true,
468 public_keys: vec![pk.clone()],
469 };
470 let summary = BridgeCommitteeSummary {
471 members: vec![(
472 pk_bytes.clone(),
473 MoveTypeCommitteeMember {
474 iota_address: IotaAddress::random_for_testing_only(),
475 bridge_pubkey_bytes: pk_bytes.clone(),
476 voting_power: 10000,
477 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
478 blocklisted: true,
479 },
480 )],
481 member_registration: vec![],
482 last_committee_update_epoch: 0,
483 };
484 iota_client_mock.set_bridge_committee(summary.clone());
485 let timer = std::time::Instant::now();
486 let committee = get_latest_bridge_committee_with_blocklist_event(
487 iota_client.clone(),
488 event.clone(),
489 Duration::from_secs(2),
490 )
491 .await;
492 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
493 assert!(timer.elapsed().as_millis() < 500);
494
495 let event = BlocklistValidatorEvent {
498 blocklisted: false,
499 public_keys: vec![pk.clone()],
500 };
501 let summary = BridgeCommitteeSummary {
502 members: vec![(
503 pk_bytes.clone(),
504 MoveTypeCommitteeMember {
505 iota_address: IotaAddress::random_for_testing_only(),
506 bridge_pubkey_bytes: pk_bytes.clone(),
507 voting_power: 10000,
508 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
509 blocklisted: false,
510 },
511 )],
512 member_registration: vec![],
513 last_committee_update_epoch: 0,
514 };
515 iota_client_mock.set_bridge_committee(summary.clone());
516 let timer = std::time::Instant::now();
517 let committee = get_latest_bridge_committee_with_blocklist_event(
518 iota_client.clone(),
519 event.clone(),
520 Duration::from_secs(2),
521 )
522 .await;
523 assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
524 assert!(timer.elapsed().as_millis() < 500);
525
526 let old_summary = BridgeCommitteeSummary {
530 members: vec![(
531 pk_bytes.clone(),
532 MoveTypeCommitteeMember {
533 iota_address: IotaAddress::random_for_testing_only(),
534 bridge_pubkey_bytes: pk_bytes.clone(),
535 voting_power: 10000,
536 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
537 blocklisted: true,
538 },
539 )],
540 member_registration: vec![],
541 last_committee_update_epoch: 0,
542 };
543 iota_client_mock.set_bridge_committee(old_summary.clone());
544 let timer = std::time::Instant::now();
545 let iota_client_mock_clone = iota_client_mock.clone();
547 tokio::spawn(async move {
548 tokio::time::sleep(Duration::from_secs(1)).await;
549 iota_client_mock_clone.set_bridge_committee(summary.clone());
550 });
551 let committee = get_latest_bridge_committee_with_blocklist_event(
552 iota_client.clone(),
553 event.clone(),
554 Duration::from_secs(2),
555 )
556 .await;
557 assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
558 let elapsed = timer.elapsed().as_millis();
559 assert!(elapsed > 1000 && elapsed < 3000);
560
561 let newer_summary = BridgeCommitteeSummary {
564 members: vec![(
565 pk_bytes.clone(),
566 MoveTypeCommitteeMember {
567 iota_address: IotaAddress::random_for_testing_only(),
568 bridge_pubkey_bytes: pk_bytes.clone(),
569 voting_power: 10000,
570 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
571 blocklisted: true,
572 },
573 )],
574 member_registration: vec![],
575 last_committee_update_epoch: 0,
576 };
577 iota_client_mock.set_bridge_committee(newer_summary.clone());
578 let timer = std::time::Instant::now();
579 let committee = get_latest_bridge_committee_with_blocklist_event(
580 iota_client.clone(),
581 event.clone(),
582 Duration::from_millis(500),
583 )
584 .await;
585 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
586 let elapsed = timer.elapsed().as_millis();
587 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
588
589 let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
592 let pk2 = kp2.public().clone();
593 let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
594 let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
595 let summary = BridgeCommitteeSummary {
596 members: vec![(
597 pk_bytes2.clone(),
598 MoveTypeCommitteeMember {
599 iota_address: IotaAddress::random_for_testing_only(),
600 bridge_pubkey_bytes: pk_bytes2.clone(),
601 voting_power: 10000,
602 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
603 blocklisted: false,
604 },
605 )],
606 member_registration: vec![],
607 last_committee_update_epoch: 0,
608 };
609 iota_client_mock.set_bridge_committee(summary.clone());
610 let timer = std::time::Instant::now();
611 let committee = get_latest_bridge_committee_with_blocklist_event(
612 iota_client.clone(),
613 event.clone(),
614 Duration::from_secs(1),
615 )
616 .await;
617 assert_eq!(
618 committee.member(&pk_as_bytes2).unwrap().base_url,
619 "http://newer.url"
620 );
621 assert!(committee.member(&pk_as_bytes).is_none());
622 let elapsed = timer.elapsed().as_millis();
623 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
624
625 let event = BlocklistValidatorEvent {
627 blocklisted: true,
628 public_keys: vec![pk, pk2],
629 };
630 let summary = BridgeCommitteeSummary {
631 members: vec![
632 (
633 pk_bytes.clone(),
634 MoveTypeCommitteeMember {
635 iota_address: IotaAddress::random_for_testing_only(),
636 bridge_pubkey_bytes: pk_bytes.clone(),
637 voting_power: 5000,
638 http_rest_url: "http://pk.url".to_string().as_bytes().to_vec(),
639 blocklisted: true,
640 },
641 ),
642 (
643 pk_bytes2.clone(),
644 MoveTypeCommitteeMember {
645 iota_address: IotaAddress::random_for_testing_only(),
646 bridge_pubkey_bytes: pk_bytes2.clone(),
647 voting_power: 5000,
648 http_rest_url: "http://pk2.url".to_string().as_bytes().to_vec(),
649 blocklisted: false,
650 },
651 ),
652 ],
653 member_registration: vec![],
654 last_committee_update_epoch: 0,
655 };
656 iota_client_mock.set_bridge_committee(summary.clone());
657 let timer = std::time::Instant::now();
658 let committee = get_latest_bridge_committee_with_blocklist_event(
659 iota_client.clone(),
660 event.clone(),
661 Duration::from_millis(500),
662 )
663 .await;
664 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
665 assert!(!committee.member(&pk_as_bytes2).unwrap().is_blocklisted);
666 let elapsed = timer.elapsed().as_millis();
667 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
668 }
669
670 #[tokio::test]
671 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
672 async fn test_get_bridge_pause_status_with_emergency_event() {
673 telemetry_subscribers::init_for_testing();
674 let iota_client_mock = IotaMockClient::default();
675 let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
676
677 let event = EmergencyOpEvent { frozen: true };
679 iota_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
680 let timer = std::time::Instant::now();
681 assert!(
682 get_latest_bridge_pause_status_with_emergency_event(
683 iota_client.clone(),
684 event.clone(),
685 Duration::from_secs(2),
686 )
687 .await
688 );
689 assert!(timer.elapsed().as_millis() < 500);
690
691 let event = EmergencyOpEvent { frozen: false };
692 iota_client_mock.set_is_bridge_paused(BRIDGE_UNPAUSED);
693 let timer = std::time::Instant::now();
694 assert!(
695 !get_latest_bridge_pause_status_with_emergency_event(
696 iota_client.clone(),
697 event.clone(),
698 Duration::from_secs(2),
699 )
700 .await
701 );
702 assert!(timer.elapsed().as_millis() < 500);
703
704 iota_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
708 let timer = std::time::Instant::now();
709 let iota_client_mock_clone = iota_client_mock.clone();
711 tokio::spawn(async move {
712 tokio::time::sleep(Duration::from_secs(1)).await;
713 iota_client_mock_clone.set_is_bridge_paused(BRIDGE_UNPAUSED);
714 });
715 assert!(
716 !get_latest_bridge_pause_status_with_emergency_event(
717 iota_client.clone(),
718 event.clone(),
719 Duration::from_secs(2),
720 )
721 .await
722 );
723 let elapsed = timer.elapsed().as_millis();
724 assert!(elapsed > 1000 && elapsed < 3000, "{}", elapsed);
725
726 iota_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
729 let timer = std::time::Instant::now();
730 assert!(
731 get_latest_bridge_pause_status_with_emergency_event(
732 iota_client.clone(),
733 event.clone(),
734 Duration::from_secs(2),
735 )
736 .await
737 );
738 let elapsed = timer.elapsed().as_millis();
739 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
740 }
741
742 #[tokio::test]
743 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
744 async fn test_update_bridge_authority_aggregation_with_url_change_event() {
745 let (
746 monitor_tx,
747 monitor_rx,
748 iota_client_mock,
749 iota_client,
750 bridge_pause_tx,
751 _bridge_pause_rx,
752 mut authorities,
753 ) = setup();
754 let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
755 let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
756 Arc::new(old_committee),
757 ))));
758 let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
759 let _handle = tokio::task::spawn(
760 BridgeMonitor::new(
761 iota_client.clone(),
762 monitor_rx,
763 agg.clone(),
764 bridge_pause_tx,
765 iota_token_type_tags,
766 )
767 .run(),
768 );
769 let new_url = "http://new.url".to_string();
770 authorities[0].base_url = new_url.clone();
771 let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
772 let new_committee_summary =
773 bridge_committee_to_bridge_committee_summary(new_committee.clone());
774 iota_client_mock.set_bridge_committee(new_committee_summary.clone());
775 monitor_tx
776 .send(IotaBridgeEvent::CommitteeMemberUrlUpdateEvent(
777 CommitteeMemberUrlUpdateEvent {
778 member: authorities[0].pubkey.clone(),
779 new_url: new_url.clone(),
780 },
781 ))
782 .await
783 .unwrap();
784 tokio::time::sleep(Duration::from_secs(1)).await;
786 assert_eq!(
788 agg.load()
789 .committee
790 .member(&BridgeAuthorityPublicKeyBytes::from(&authorities[0].pubkey))
791 .unwrap()
792 .base_url,
793 new_url
794 );
795 }
796
797 #[tokio::test]
798 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
799 async fn test_update_bridge_authority_aggregation_with_blocklist_event() {
800 let (
801 monitor_tx,
802 monitor_rx,
803 iota_client_mock,
804 iota_client,
805 bridge_pause_tx,
806 _bridge_pause_rx,
807 mut authorities,
808 ) = setup();
809 let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
810 let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
811 Arc::new(old_committee),
812 ))));
813 let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
814 let _handle = tokio::task::spawn(
815 BridgeMonitor::new(
816 iota_client.clone(),
817 monitor_rx,
818 agg.clone(),
819 bridge_pause_tx,
820 iota_token_type_tags,
821 )
822 .run(),
823 );
824 authorities[0].is_blocklisted = true;
825 let to_blocklist = &authorities[0];
826 let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
827 let new_committee_summary =
828 bridge_committee_to_bridge_committee_summary(new_committee.clone());
829 iota_client_mock.set_bridge_committee(new_committee_summary.clone());
830 monitor_tx
831 .send(IotaBridgeEvent::BlocklistValidatorEvent(
832 BlocklistValidatorEvent {
833 public_keys: vec![to_blocklist.pubkey.clone()],
834 blocklisted: true,
835 },
836 ))
837 .await
838 .unwrap();
839 tokio::time::sleep(Duration::from_secs(1)).await;
841 assert!(
842 agg.load()
843 .committee
844 .member(&BridgeAuthorityPublicKeyBytes::from(&to_blocklist.pubkey))
845 .unwrap()
846 .is_blocklisted,
847 );
848 }
849
850 #[tokio::test]
851 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
852 async fn test_update_bridge_pause_status_with_emergency_event() {
853 let (
854 monitor_tx,
855 monitor_rx,
856 iota_client_mock,
857 iota_client,
858 bridge_pause_tx,
859 bridge_pause_rx,
860 authorities,
861 ) = setup();
862 let event = EmergencyOpEvent {
863 frozen: !*bridge_pause_tx.borrow(), };
865 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
866 let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
867 Arc::new(committee),
868 ))));
869 let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
870 let _handle = tokio::task::spawn(
871 BridgeMonitor::new(
872 iota_client.clone(),
873 monitor_rx,
874 agg.clone(),
875 bridge_pause_tx,
876 iota_token_type_tags,
877 )
878 .run(),
879 );
880
881 iota_client_mock.set_is_bridge_paused(event.frozen);
882 monitor_tx
883 .send(IotaBridgeEvent::EmergencyOpEvent(event.clone()))
884 .await
885 .unwrap();
886 tokio::time::sleep(Duration::from_secs(1)).await;
888 assert!(*bridge_pause_rx.borrow() == event.frozen);
890 }
891
892 #[tokio::test]
893 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
894 async fn test_update_iota_token_type_tags() {
895 let (
896 monitor_tx,
897 monitor_rx,
898 _iota_client_mock,
899 iota_client,
900 bridge_pause_tx,
901 _bridge_pause_rx,
902 authorities,
903 ) = setup();
904 let event = NewTokenEvent {
905 token_id: 255,
906 type_name: TypeTag::from_str("0xbeef::beef::BEEF").unwrap(),
907 native_token: false,
908 decimal_multiplier: 1000000,
909 notional_value: 100000000,
910 };
911 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
912 let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
913 Arc::new(committee),
914 ))));
915 let iota_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
916 let iota_token_type_tags_clone = iota_token_type_tags.clone();
917 let _handle = tokio::task::spawn(
918 BridgeMonitor::new(
919 iota_client.clone(),
920 monitor_rx,
921 agg.clone(),
922 bridge_pause_tx,
923 iota_token_type_tags_clone,
924 )
925 .run(),
926 );
927
928 monitor_tx
929 .send(IotaBridgeEvent::NewTokenEvent(event.clone()))
930 .await
931 .unwrap();
932 tokio::time::sleep(Duration::from_secs(1)).await;
934 iota_token_type_tags
936 .load()
937 .clone()
938 .get(&event.token_id)
939 .unwrap();
940 }
941
942 #[expect(clippy::type_complexity)]
943 fn setup() -> (
944 iota_metrics::metered_channel::Sender<IotaBridgeEvent>,
945 iota_metrics::metered_channel::Receiver<IotaBridgeEvent>,
946 IotaMockClient,
947 Arc<IotaClient<IotaMockClient>>,
948 tokio::sync::watch::Sender<IsBridgePaused>,
949 tokio::sync::watch::Receiver<IsBridgePaused>,
950 Vec<BridgeAuthority>,
951 ) {
952 telemetry_subscribers::init_for_testing();
953 let registry = Registry::new();
954 iota_metrics::init_metrics(®istry);
955 init_all_struct_tags();
956
957 let iota_client_mock = IotaMockClient::default();
958 let iota_client = Arc::new(IotaClient::new_for_testing(iota_client_mock.clone()));
959 let (monitor_tx, monitor_rx) = iota_metrics::metered_channel::channel(
960 10000,
961 &iota_metrics::get_metrics()
962 .unwrap()
963 .channel_inflight
964 .with_label_values(&["monitor_queue"]),
965 );
966 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
967 let authorities = vec![
968 get_test_authority_and_key(2500, 0 ).0,
969 get_test_authority_and_key(2500, 0 ).0,
970 get_test_authority_and_key(2500, 0 ).0,
971 get_test_authority_and_key(2500, 0 ).0,
972 ];
973 (
974 monitor_tx,
975 monitor_rx,
976 iota_client_mock,
977 iota_client,
978 bridge_pause_tx,
979 bridge_pause_rx,
980 authorities,
981 )
982 }
983}