1use std::{
8 collections::{BTreeMap, BTreeSet, btree_map::Entry},
9 sync::Arc,
10 time::Duration,
11};
12
13use iota_authority_aggregation::{ReduceOutput, quorum_map_then_reduce_with_timeout_and_prefs};
14use iota_types::{
15 base_types::ConciseableName,
16 committee::{StakeUnit, TOTAL_VOTING_POWER},
17};
18use tracing::{error, info, warn};
19
20use crate::{
21 client::bridge_client::BridgeClient,
22 crypto::{BridgeAuthorityPublicKeyBytes, BridgeAuthoritySignInfo},
23 error::{BridgeError, BridgeResult},
24 types::{
25 BridgeAction, BridgeCommittee, BridgeCommitteeValiditySignInfo, CertifiedBridgeAction,
26 VerifiedCertifiedBridgeAction, VerifiedSignedBridgeAction,
27 },
28};
29
30pub struct BridgeAuthorityAggregator {
31 pub committee: Arc<BridgeCommittee>,
32 pub clients: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>>>,
33}
34
35impl BridgeAuthorityAggregator {
36 pub fn new(committee: Arc<BridgeCommittee>) -> Self {
37 let clients: BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>> = committee
38 .members()
39 .iter()
40 .filter_map(|(name, authority)| {
41 if authority.is_blocklisted {
42 warn!("Ignored blocklisted authority {:?} (stake: {}) when creating BridgeAuthorityAggregator", name.concise(), authority.voting_power);
43 return None;
44 }
45 match BridgeClient::new(
47 name.clone(),
48 committee.clone(),
49 ) {
50 Ok(client) => Some((name.clone(), Arc::new(client))),
51 Err(e) => {
52 error!(
53 "Failed to create BridgeClient for {:?}: {:?}",
54 name.concise(),
55 e
56 );
57 None
58 }
59 }
60 })
61 .collect::<BTreeMap<_, _>>();
62 Self {
63 committee,
64 clients: Arc::new(clients),
65 }
66 }
67
68 pub async fn request_committee_signatures(
69 &self,
70 action: BridgeAction,
71 ) -> BridgeResult<VerifiedCertifiedBridgeAction> {
72 let state = GetSigsState::new(action.approval_threshold(), self.committee.clone());
73 request_sign_bridge_action_into_certification(
74 action,
75 self.committee.clone(),
76 self.clients.clone(),
77 state,
78 )
79 .await
80 }
81}
82
83#[derive(Debug)]
84struct GetSigsState {
85 total_bad_stake: StakeUnit,
86 total_ok_stake: StakeUnit,
87 sigs: BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthoritySignInfo>,
88 validity_threshold: StakeUnit,
89 committee: Arc<BridgeCommittee>,
90}
91
92impl GetSigsState {
93 fn new(validity_threshold: StakeUnit, committee: Arc<BridgeCommittee>) -> Self {
94 Self {
95 committee,
96 total_bad_stake: 0,
97 total_ok_stake: 0,
98 sigs: BTreeMap::new(),
99 validity_threshold,
100 }
101 }
102
103 fn handle_verified_signed_action(
104 &mut self,
105 name: BridgeAuthorityPublicKeyBytes,
106 stake: StakeUnit,
107 signed_action: VerifiedSignedBridgeAction,
108 ) -> BridgeResult<Option<VerifiedCertifiedBridgeAction>> {
109 info!("Got signatures from {}, stake: {}", name.concise(), stake);
110 if !self.committee.is_active_member(&name) {
111 return Err(BridgeError::InvalidBridgeAuthority(name));
112 }
113
114 assert_eq!(stake, self.committee.member(&name).unwrap().voting_power);
117
118 match self.sigs.entry(name.clone()) {
119 Entry::Vacant(e) => {
120 e.insert(signed_action.auth_sig().clone());
121 self.total_ok_stake += stake;
122 }
123 Entry::Occupied(_e) => {
124 return Err(BridgeError::AuthoritySignatureDuplication(format!(
125 "Got signatures for the same authority twice: {}",
126 name.concise()
127 )));
128 }
129 }
130 if self.total_ok_stake >= self.validity_threshold {
131 info!(
132 "Got enough signatures from {} validators with total_ok_stake {}",
133 self.sigs.len(),
134 self.total_ok_stake
135 );
136 let signatures = self
137 .sigs
138 .iter()
139 .map(|(k, v)| (k.clone(), v.signature.clone()))
140 .collect::<BTreeMap<_, _>>();
141 let sig_info = BridgeCommitteeValiditySignInfo { signatures };
142 let certified_action: iota_types::message_envelope::Envelope<
143 BridgeAction,
144 BridgeCommitteeValiditySignInfo,
145 > = CertifiedBridgeAction::new_from_data_and_sig(
146 signed_action.into_inner().into_data(),
147 sig_info,
148 );
149 Ok(Some(VerifiedCertifiedBridgeAction::new_from_verified(
151 certified_action,
152 )))
153 } else {
154 Ok(None)
155 }
156 }
157
158 fn add_bad_stake(&mut self, bad_stake: StakeUnit) {
159 self.total_bad_stake += bad_stake;
160 }
161
162 fn is_too_many_error(&self) -> bool {
163 TOTAL_VOTING_POWER - self.total_bad_stake - self.committee.total_blocklisted_stake()
164 < self.validity_threshold
165 }
166}
167
168async fn request_sign_bridge_action_into_certification(
169 action: BridgeAction,
170 committee: Arc<BridgeCommittee>,
171 clients: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>>>,
172 state: GetSigsState,
173) -> BridgeResult<VerifiedCertifiedBridgeAction> {
174 let preference = match action {
184 BridgeAction::IotaToEthBridgeAction(_) => Some(BTreeSet::new()),
185 BridgeAction::EthToIotaBridgeAction(_) => None,
186 _ => {
187 if action.chain_id().is_iota_chain() {
188 None
189 } else {
190 Some(BTreeSet::new())
191 }
192 }
193 };
194 let (result, _) = quorum_map_then_reduce_with_timeout_and_prefs(
195 committee,
196 clients,
197 preference.as_ref(),
198 state,
199 |_name, client| {
200 Box::pin(async move { client.request_sign_bridge_action(action.clone()).await })
201 },
202 |mut state, name, stake, result| {
203 Box::pin(async move {
204 match result {
205 Ok(verified_signed_action) => {
206 match state.handle_verified_signed_action(
207 name.clone(),
208 stake,
209 verified_signed_action,
210 ) {
211 Ok(Some(certified_action)) => {
212 return ReduceOutput::Success(certified_action);
213 }
214 Ok(None) => (),
215 Err(e) => {
216 error!(
217 "Failed to handle verified signed action from {}: {:?}",
218 name.concise(),
219 e
220 );
221 state.add_bad_stake(stake);
222 }
223 }
224 }
225 Err(e) => {
226 warn!(
227 "Failed to get signature from {:?}. Error: {:?}",
228 name.concise(),
229 e
230 );
231 state.add_bad_stake(stake);
232 }
233 };
234
235 if state.is_too_many_error() {
237 ReduceOutput::Failed(state)
238 } else {
239 ReduceOutput::Continue(state)
240 }
241 })
242 },
243 Duration::from_secs(5),
245 )
246 .await
247 .map_err(|state| {
248 error!(
249 "Failed to get enough signatures, bad stake: {}, blocklisted stake: {}, good stake: {}, validity threshold: {}",
250 state.total_bad_stake,
251 state.committee.total_blocklisted_stake(),
252 state.total_ok_stake,
253 state.validity_threshold,
254 );
255 BridgeError::AuthoritySignatureAggregationTooManyErrors(format!(
256 "Failed to get enough signatures, bad stake: {}, blocklisted stake: {}, good stake: {}, validity threshold: {}",
257 state.total_bad_stake,
258 state.committee.total_blocklisted_stake(),
259 state.total_ok_stake,
260 state.validity_threshold,
261 ))
262 })?;
263 Ok(result)
264}
265
266#[cfg(test)]
267mod tests {
268 use std::collections::BTreeSet;
269
270 use fastcrypto::traits::ToFromBytes;
271 use iota_types::{committee::VALIDITY_THRESHOLD, digests::TransactionDigest};
272
273 use super::*;
274 use crate::{
275 crypto::BridgeAuthorityPublicKey,
276 server::mock_handler::BridgeRequestMockHandler,
277 test_utils::{
278 get_test_authorities_and_run_mock_bridge_server, get_test_authority_and_key,
279 get_test_iota_to_eth_bridge_action, sign_action_with_key,
280 },
281 types::BridgeCommittee,
282 };
283
284 #[tokio::test]
285 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
286 async fn test_bridge_auth_agg_construction() {
287 telemetry_subscribers::init_for_testing();
288
289 let mut authorities = vec![];
290 for _i in 0..4 {
291 let (authority, _, _) = get_test_authority_and_key(2500, 12345);
292 authorities.push(authority);
293 }
294 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
295
296 let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
297 assert_eq!(
298 agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
299 BTreeSet::from_iter(vec![
300 authorities[0].pubkey_bytes(),
301 authorities[1].pubkey_bytes(),
302 authorities[2].pubkey_bytes(),
303 authorities[3].pubkey_bytes()
304 ])
305 );
306
307 authorities[2].is_blocklisted = true;
309 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
310 let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
311 assert_eq!(
312 agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
313 BTreeSet::from_iter(vec![
314 authorities[0].pubkey_bytes(),
315 authorities[1].pubkey_bytes(),
316 authorities[3].pubkey_bytes()
317 ])
318 );
319
320 authorities[3].base_url = "".into();
322 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
323 let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
324 assert_eq!(
325 agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
326 BTreeSet::from_iter(vec![
327 authorities[0].pubkey_bytes(),
328 authorities[1].pubkey_bytes(),
329 authorities[3].pubkey_bytes()
330 ])
331 );
332 }
333
334 #[tokio::test]
335 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
336 async fn test_bridge_auth_agg_ok() {
337 telemetry_subscribers::init_for_testing();
338
339 let mock0 = BridgeRequestMockHandler::new();
340 let mock1 = BridgeRequestMockHandler::new();
341 let mock2 = BridgeRequestMockHandler::new();
342 let mock3 = BridgeRequestMockHandler::new();
343
344 let (_handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
346 vec![2500, 2500, 2500, 2500],
347 vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
348 );
349
350 let committee = BridgeCommittee::new(authorities).unwrap();
351
352 let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
353
354 let iota_tx_digest = TransactionDigest::random();
355 let iota_tx_event_index = 0;
356 let nonce = 0;
357 let amount = 1000;
358 let action = get_test_iota_to_eth_bridge_action(
359 Some(iota_tx_digest),
360 Some(iota_tx_event_index),
361 Some(nonce),
362 Some(amount),
363 None,
364 None,
365 None,
366 );
367
368 mock0.add_iota_event_response(
370 iota_tx_digest,
371 iota_tx_event_index,
372 Ok(sign_action_with_key(&action, &secrets[0])),
373 );
374 mock1.add_iota_event_response(
375 iota_tx_digest,
376 iota_tx_event_index,
377 Ok(sign_action_with_key(&action, &secrets[1])),
378 );
379 mock2.add_iota_event_response(
380 iota_tx_digest,
381 iota_tx_event_index,
382 Ok(sign_action_with_key(&action, &secrets[2])),
383 );
384 mock3.add_iota_event_response(
385 iota_tx_digest,
386 iota_tx_event_index,
387 Ok(sign_action_with_key(&action, &secrets[3])),
388 );
389 agg.request_committee_signatures(action.clone())
390 .await
391 .unwrap();
392
393 mock3.add_iota_event_response(
395 iota_tx_digest,
396 iota_tx_event_index,
397 Err(BridgeError::RestAPI("".into())),
398 );
399 agg.request_committee_signatures(action.clone())
400 .await
401 .unwrap();
402
403 mock2.add_iota_event_response(
405 iota_tx_digest,
406 iota_tx_event_index,
407 Err(BridgeError::RestAPI("".into())),
408 );
409 agg.request_committee_signatures(action.clone())
410 .await
411 .unwrap();
412
413 mock1.add_iota_event_response(
415 iota_tx_digest,
416 iota_tx_event_index,
417 Err(BridgeError::RestAPI("".into())),
418 );
419 let err = agg
420 .request_committee_signatures(action.clone())
421 .await
422 .unwrap_err();
423 assert!(matches!(
424 err,
425 BridgeError::AuthoritySignatureAggregationTooManyErrors(_)
426 ));
427 }
428
429 #[tokio::test]
430 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
431 async fn test_bridge_auth_agg_more_cases() {
432 telemetry_subscribers::init_for_testing();
433
434 let mock0 = BridgeRequestMockHandler::new();
435 let mock1 = BridgeRequestMockHandler::new();
436 let mock2 = BridgeRequestMockHandler::new();
437 let mock3 = BridgeRequestMockHandler::new();
438
439 let (_handles, mut authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
441 vec![2500, 2500, 2500, 2500],
442 vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
443 );
444 authorities[0].is_blocklisted = true;
446 authorities[1].is_blocklisted = true;
447
448 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
449
450 let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
451
452 let iota_tx_digest = TransactionDigest::random();
453 let iota_tx_event_index = 0;
454 let nonce = 0;
455 let amount = 1000;
456 let action = get_test_iota_to_eth_bridge_action(
457 Some(iota_tx_digest),
458 Some(iota_tx_event_index),
459 Some(nonce),
460 Some(amount),
461 None,
462 None,
463 None,
464 );
465
466 mock2.add_iota_event_response(
470 iota_tx_digest,
471 iota_tx_event_index,
472 Ok(sign_action_with_key(&action, &secrets[2])),
473 );
474 mock3.add_iota_event_response(
475 iota_tx_digest,
476 iota_tx_event_index,
477 Ok(sign_action_with_key(&action, &secrets[3])),
478 );
479 let certified = agg
480 .request_committee_signatures(action.clone())
481 .await
482 .unwrap();
483 let signers = certified
484 .auth_sig()
485 .signatures
486 .keys()
487 .cloned()
488 .collect::<BTreeSet<_>>();
489 assert_eq!(
490 signers,
491 BTreeSet::from_iter(vec![
492 authorities[2].pubkey_bytes(),
493 authorities[3].pubkey_bytes()
494 ])
495 );
496
497 mock3.add_iota_event_response(
499 iota_tx_digest,
500 iota_tx_event_index,
501 Err(BridgeError::RestAPI("".into())),
502 );
503 let err = agg
504 .request_committee_signatures(action.clone())
505 .await
506 .unwrap_err();
507 assert!(matches!(
508 err,
509 BridgeError::AuthoritySignatureAggregationTooManyErrors(_)
510 ));
511
512 mock3.add_iota_event_response(
515 iota_tx_digest,
516 iota_tx_event_index,
517 Ok(sign_action_with_key(&action, &secrets[2])),
518 );
519 let err = agg
520 .request_committee_signatures(action.clone())
521 .await
522 .unwrap_err();
523 assert!(matches!(
524 err,
525 BridgeError::AuthoritySignatureAggregationTooManyErrors(_)
526 ));
527 }
528
529 #[test]
530 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
531 fn test_get_sigs_state() {
532 telemetry_subscribers::init_for_testing();
533
534 let mut authorities = vec![];
535 let mut secrets = vec![];
536 for _i in 0..4 {
537 let (authority, _, secret) = get_test_authority_and_key(2500, 12345);
538 authorities.push(authority);
539 secrets.push(secret);
540 }
541
542 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
543
544 let threshold = VALIDITY_THRESHOLD;
545 let mut state = GetSigsState::new(threshold, Arc::new(committee));
546
547 assert!(!state.is_too_many_error());
548
549 state.add_bad_stake(2500);
551 assert!(!state.is_too_many_error());
552
553 state.add_bad_stake(2500);
555 assert!(!state.is_too_many_error());
556
557 state.add_bad_stake(1666);
559 assert!(!state.is_too_many_error());
560
561 state.add_bad_stake(1);
563 assert!(state.is_too_many_error());
564
565 authorities[0].is_blocklisted = true;
567 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
568 let threshold = VALIDITY_THRESHOLD;
569 let mut state = GetSigsState::new(threshold, Arc::new(committee));
570
571 assert!(!state.is_too_many_error());
572
573 state.add_bad_stake(2500);
575 assert!(!state.is_too_many_error());
576
577 state.add_bad_stake(2500);
579 assert!(state.is_too_many_error());
580
581 authorities[0].is_blocklisted = false;
583 authorities[1].voting_power = 1; authorities[2].voting_power = 4999;
585 authorities[3].is_blocklisted = true; let committee = BridgeCommittee::new(authorities.clone()).unwrap();
587 let threshold = VALIDITY_THRESHOLD;
588 let mut state = GetSigsState::new(threshold, Arc::new(committee.clone()));
589
590 let iota_tx_digest = TransactionDigest::random();
591 let iota_tx_event_index = 0;
592 let nonce = 0;
593 let amount = 1000;
594 let action = get_test_iota_to_eth_bridge_action(
595 Some(iota_tx_digest),
596 Some(iota_tx_event_index),
597 Some(nonce),
598 Some(amount),
599 None,
600 None,
601 None,
602 );
603
604 let sig_0 = sign_action_with_key(&action, &secrets[0]);
605 assert!(
607 state
608 .handle_verified_signed_action(
609 authorities[0].pubkey_bytes().clone(),
610 authorities[0].voting_power,
611 VerifiedSignedBridgeAction::new_from_verified(sig_0.clone())
612 )
613 .unwrap()
614 .is_none()
615 );
616 assert_eq!(state.total_ok_stake, 2500);
617
618 let new_sig_0 = sign_action_with_key(&action, &secrets[0]);
620 let err = state
622 .handle_verified_signed_action(
623 authorities[0].pubkey_bytes().clone(),
624 authorities[0].voting_power,
625 VerifiedSignedBridgeAction::new_from_verified(new_sig_0.clone()),
626 )
627 .unwrap_err();
628 assert!(matches!(err, BridgeError::AuthoritySignatureDuplication(_)));
629 assert_eq!(state.total_ok_stake, 2500);
630
631 let (unknown_authority, _, kp) = get_test_authority_and_key(2500, 12345);
633 let unknown_sig = sign_action_with_key(&action, &kp);
634 let err = state
636 .handle_verified_signed_action(
637 unknown_authority.pubkey_bytes().clone(),
638 authorities[0].voting_power,
639 VerifiedSignedBridgeAction::new_from_verified(unknown_sig.clone()),
640 )
641 .unwrap_err();
642 assert!(matches!(err, BridgeError::InvalidBridgeAuthority(_)));
643 assert_eq!(state.total_ok_stake, 2500);
644
645 let sig_3 = sign_action_with_key(&action, &secrets[3]);
647 let err = state
649 .handle_verified_signed_action(
650 authorities[3].pubkey_bytes().clone(),
651 authorities[3].voting_power,
652 VerifiedSignedBridgeAction::new_from_verified(sig_3.clone()),
653 )
654 .unwrap_err();
655 assert!(matches!(err, BridgeError::InvalidBridgeAuthority(_)));
656 assert_eq!(state.total_ok_stake, 2500);
657
658 let sig_1 = sign_action_with_key(&action, &secrets[1]);
660 assert!(
662 state
663 .handle_verified_signed_action(
664 authorities[1].pubkey_bytes().clone(),
665 authorities[1].voting_power,
666 VerifiedSignedBridgeAction::new_from_verified(sig_1.clone())
667 )
668 .unwrap()
669 .is_none()
670 );
671 assert_eq!(state.total_ok_stake, 2501);
672
673 let sig_2 = sign_action_with_key(&action, &secrets[2]);
675 let certificate = state
677 .handle_verified_signed_action(
678 authorities[2].pubkey_bytes().clone(),
679 authorities[2].voting_power,
680 VerifiedSignedBridgeAction::new_from_verified(sig_2.clone()),
681 )
682 .unwrap()
683 .unwrap();
684 assert_eq!(state.total_ok_stake, 7500);
685
686 assert_eq!(certificate.data(), &action);
687 let signers = certificate
688 .auth_sig()
689 .signatures
690 .keys()
691 .cloned()
692 .collect::<BTreeSet<_>>();
693 assert_eq!(
694 signers,
695 BTreeSet::from_iter(vec![
696 authorities[0].pubkey_bytes(),
697 authorities[1].pubkey_bytes(),
698 authorities[2].pubkey_bytes()
699 ])
700 );
701
702 for (pubkey, sig) in &certificate.auth_sig().signatures {
703 let sign_info = BridgeAuthoritySignInfo {
704 authority_pub_key: BridgeAuthorityPublicKey::from_bytes(pubkey.as_ref()).unwrap(),
705 signature: sig.clone(),
706 };
707 assert!(sign_info.verify(&action, &committee).is_ok());
708 }
709 }
710}