1use std::{sync::Arc, time::Instant};
6
7use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
8use iota_protocol_config::{ConsensusNetwork, ProtocolConfig};
9use itertools::Itertools;
10use parking_lot::RwLock;
11use prometheus::Registry;
12use tracing::{info, warn};
13
14use crate::{
15 CommitConsumer, CommitConsumerMonitor,
16 authority_service::AuthorityService,
17 block_manager::BlockManager,
18 block_verifier::SignedBlockVerifier,
19 broadcaster::Broadcaster,
20 commit_observer::CommitObserver,
21 commit_syncer::{CommitSyncer, CommitSyncerHandle},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::{Clock, Context},
24 core::{Core, CoreSignals},
25 core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
26 dag_state::DagState,
27 leader_schedule::LeaderSchedule,
28 leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
29 metrics::initialise_metrics,
30 network::{NetworkClient as _, NetworkManager, tonic_network::TonicManager},
31 round_prober::{RoundProber, RoundProberHandle},
32 storage::rocksdb_store::RocksDBStore,
33 subscriber::Subscriber,
34 synchronizer::{Synchronizer, SynchronizerHandle},
35 transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
36};
37
38pub enum ConsensusAuthority {
42 #[expect(private_interfaces)]
43 WithTonic(AuthorityNode<TonicManager>),
44}
45
46impl ConsensusAuthority {
47 pub async fn start(
49 network_type: ConsensusNetwork,
50 epoch_start_timestamp_ms: u64,
51 own_index: AuthorityIndex,
52 committee: Committee,
53 parameters: Parameters,
54 protocol_config: ProtocolConfig,
55 protocol_keypair: ProtocolKeyPair,
56 network_keypair: NetworkKeyPair,
57 clock: Arc<Clock>,
58 transaction_verifier: Arc<dyn TransactionVerifier>,
59 commit_consumer: CommitConsumer,
60 registry: Registry,
61 boot_counter: u64,
67 ) -> Self {
68 match network_type {
69 ConsensusNetwork::Tonic => {
70 let authority = AuthorityNode::start(
71 epoch_start_timestamp_ms,
72 own_index,
73 committee,
74 parameters,
75 protocol_config,
76 protocol_keypair,
77 network_keypair,
78 clock,
79 transaction_verifier,
80 commit_consumer,
81 registry,
82 boot_counter,
83 )
84 .await;
85 Self::WithTonic(authority)
86 }
87 }
88 }
89
90 pub async fn stop(self) {
91 match self {
92 Self::WithTonic(authority) => authority.stop().await,
93 }
94 }
95
96 pub fn transaction_client(&self) -> Arc<TransactionClient> {
97 match self {
98 Self::WithTonic(authority) => authority.transaction_client(),
99 }
100 }
101
102 pub async fn replay_complete(&self) {
103 match self {
104 Self::WithTonic(authority) => authority.replay_complete().await,
105 }
106 }
107
108 #[cfg(test)]
109 fn context(&self) -> &Arc<Context> {
110 match self {
111 Self::WithTonic(authority) => &authority.context,
112 }
113 }
114
115 #[cfg(test)]
116 fn sync_last_known_own_block_enabled(&self) -> bool {
117 match self {
118 Self::WithTonic(authority) => authority.sync_last_known_own_block,
119 }
120 }
121}
122
123pub(crate) struct AuthorityNode<N>
124where
125 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
126{
127 context: Arc<Context>,
128 start_time: Instant,
129 transaction_client: Arc<TransactionClient>,
130 synchronizer: Arc<SynchronizerHandle>,
131 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
132
133 commit_syncer_handle: CommitSyncerHandle,
134 round_prober_handle: Option<RoundProberHandle>,
135 leader_timeout_handle: LeaderTimeoutTaskHandle,
136 core_thread_handle: CoreThreadHandle,
137 broadcaster: Option<Broadcaster>,
140 subscriber: Option<Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>>,
141 network_manager: N,
142 #[cfg(test)]
143 sync_last_known_own_block: bool,
144}
145
146impl<N> AuthorityNode<N>
147where
148 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
149{
150 pub(crate) async fn start(
154 epoch_start_timestamp_ms: u64,
155 own_index: AuthorityIndex,
156 committee: Committee,
157 parameters: Parameters,
158 protocol_config: ProtocolConfig,
159 protocol_keypair: ProtocolKeyPair,
162 network_keypair: NetworkKeyPair,
163 clock: Arc<Clock>,
164 transaction_verifier: Arc<dyn TransactionVerifier>,
165 commit_consumer: CommitConsumer,
166 registry: Registry,
167 boot_counter: u64,
168 ) -> Self {
169 assert!(
170 committee.is_valid_index(own_index),
171 "Invalid own index {own_index}"
172 );
173 let own_hostname = &committee.authority(own_index).hostname;
174 info!(
175 "Starting consensus authority {own_index} {own_hostname}, {0:?}, epoch start timestamp {epoch_start_timestamp_ms}, boot counter {boot_counter}",
176 protocol_config.version
177 );
178 info!(
179 "Consensus authorities: {}",
180 committee
181 .authorities()
182 .map(|(i, a)| format!("{}: {}", i, a.hostname))
183 .join(", ")
184 );
185 info!("Consensus parameters: {:?}", parameters);
186 info!("Consensus committee: {:?}", committee);
187 let committee_size = committee.size();
188 let context = Arc::new(Context::new(
189 epoch_start_timestamp_ms,
190 own_index,
191 committee,
192 parameters,
193 protocol_config,
194 initialise_metrics(registry, committee_size),
195 clock,
196 ));
197 let start_time = Instant::now();
198
199 let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
200 let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
201
202 let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
203
204 let mut network_manager = N::new(context.clone(), network_keypair);
205 let network_client = network_manager.client();
206
207 let broadcaster = if N::Client::SUPPORT_STREAMING {
210 None
211 } else {
212 Some(Broadcaster::new(
213 context.clone(),
214 network_client.clone(),
215 &signals_receivers,
216 ))
217 };
218
219 let store_path = context.parameters.db_path.as_path().to_str().unwrap();
220 let store = Arc::new(RocksDBStore::new(store_path));
221 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
222
223 let highest_known_commit_at_startup = dag_state.read().last_commit_index();
224
225 let sync_last_known_own_block = boot_counter == 0
230 && !context
231 .parameters
232 .sync_last_known_own_block_timeout
233 .is_zero();
234 info!("Sync last known own block: {sync_last_known_own_block}");
235
236 let block_verifier = Arc::new(SignedBlockVerifier::new(
237 context.clone(),
238 transaction_verifier,
239 ));
240
241 let block_manager =
242 BlockManager::new(context.clone(), dag_state.clone(), block_verifier.clone());
243
244 let leader_schedule = Arc::new(LeaderSchedule::from_store(
245 context.clone(),
246 dag_state.clone(),
247 ));
248
249 let commit_consumer_monitor = commit_consumer.monitor();
250 commit_consumer_monitor
251 .set_highest_observed_commit_at_startup(highest_known_commit_at_startup);
252 let commit_observer = CommitObserver::new(
253 context.clone(),
254 commit_consumer,
255 dag_state.clone(),
256 store.clone(),
257 leader_schedule.clone(),
258 );
259
260 let core = Core::new(
261 context.clone(),
262 leader_schedule,
263 tx_consumer,
264 block_manager,
265 !N::Client::SUPPORT_STREAMING || context.committee.size() == 1,
269 commit_observer,
270 core_signals,
271 protocol_keypair,
272 dag_state.clone(),
273 sync_last_known_own_block,
274 );
275
276 let (core_dispatcher, core_thread_handle) =
277 ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
278 let core_dispatcher = Arc::new(core_dispatcher);
279 let leader_timeout_handle =
280 LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
281
282 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
283
284 let synchronizer = Synchronizer::start(
285 network_client.clone(),
286 context.clone(),
287 core_dispatcher.clone(),
288 commit_vote_monitor.clone(),
289 block_verifier.clone(),
290 dag_state.clone(),
291 sync_last_known_own_block,
292 );
293
294 let commit_syncer_handle = CommitSyncer::new(
295 context.clone(),
296 core_dispatcher.clone(),
297 commit_vote_monitor.clone(),
298 commit_consumer_monitor.clone(),
299 network_client.clone(),
300 block_verifier.clone(),
301 dag_state.clone(),
302 )
303 .start();
304
305 let round_prober_handle = if context.protocol_config.consensus_round_prober() {
306 Some(
307 RoundProber::new(
308 context.clone(),
309 core_dispatcher.clone(),
310 dag_state.clone(),
311 network_client.clone(),
312 )
313 .start(),
314 )
315 } else {
316 None
317 };
318
319 let network_service = Arc::new(AuthorityService::new(
320 context.clone(),
321 block_verifier,
322 commit_vote_monitor,
323 synchronizer.clone(),
324 core_dispatcher,
325 signals_receivers.block_broadcast_receiver(),
326 dag_state.clone(),
327 store,
328 ));
329
330 let subscriber = if N::Client::SUPPORT_STREAMING {
331 let s = Subscriber::new(
332 context.clone(),
333 network_client,
334 network_service.clone(),
335 dag_state,
336 );
337 for (peer, _) in context.committee.authorities() {
338 if peer != context.own_index {
339 s.subscribe(peer);
340 }
341 }
342 Some(s)
343 } else {
344 None
345 };
346
347 network_manager.install_service(network_service).await;
348
349 info!(
350 "Consensus authority started, took {:?}",
351 start_time.elapsed()
352 );
353
354 Self {
355 context,
356 start_time,
357 transaction_client: Arc::new(tx_client),
358 synchronizer,
359 commit_consumer_monitor,
360 commit_syncer_handle,
361 round_prober_handle,
362 leader_timeout_handle,
363 core_thread_handle,
364 broadcaster,
365 subscriber,
366 network_manager,
367 #[cfg(test)]
368 sync_last_known_own_block,
369 }
370 }
371
372 pub(crate) async fn stop(mut self) {
373 info!(
374 "Stopping authority. Total run time: {:?}",
375 self.start_time.elapsed()
376 );
377
378 if let Err(e) = self.synchronizer.stop().await {
380 if e.is_panic() {
381 std::panic::resume_unwind(e.into_panic());
382 }
383 warn!(
384 "Failed to stop synchronizer when shutting down consensus: {:?}",
385 e
386 );
387 };
388 self.commit_syncer_handle.stop().await;
389 if let Some(round_prober_handle) = self.round_prober_handle.take() {
390 round_prober_handle.stop().await;
391 }
392 self.leader_timeout_handle.stop().await;
393 self.core_thread_handle.stop().await;
396 if let Some(mut broadcaster) = self.broadcaster.take() {
397 broadcaster.stop();
398 }
399 if let Some(subscriber) = self.subscriber.take() {
401 subscriber.stop();
402 }
403 self.network_manager.stop().await;
404
405 self.context
406 .metrics
407 .node_metrics
408 .uptime
409 .observe(self.start_time.elapsed().as_secs_f64());
410 }
411
412 pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
413 self.transaction_client.clone()
414 }
415
416 pub(crate) async fn replay_complete(&self) {
417 self.commit_consumer_monitor.replay_complete().await;
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 #![allow(non_snake_case)]
424
425 use std::{
426 collections::{BTreeMap, BTreeSet},
427 sync::Arc,
428 time::Duration,
429 };
430
431 use consensus_config::{Parameters, local_committee_and_keys};
432 use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
433 use iota_protocol_config::ProtocolConfig;
434 use prometheus::Registry;
435 use rstest::rstest;
436 use tempfile::TempDir;
437 use tokio::time::{sleep, timeout};
438 use typed_store::DBMetrics;
439
440 use super::*;
441 use crate::{
442 CommittedSubDag,
443 block::{BlockAPI as _, GENESIS_ROUND},
444 transaction::NoopTransactionVerifier,
445 };
446
447 #[rstest]
448 #[tokio::test]
449 async fn test_authority_start_and_stop(
450 #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
451 ) {
452 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
453 let registry = Registry::new();
454
455 let temp_dir = TempDir::new().unwrap();
456 let parameters = Parameters {
457 db_path: temp_dir.keep(),
458 ..Default::default()
459 };
460 let txn_verifier = NoopTransactionVerifier {};
461
462 let own_index = committee.to_authority_index(0).unwrap();
463 let protocol_keypair = keypairs[own_index].1.clone();
464 let network_keypair = keypairs[own_index].0.clone();
465
466 let (sender, _receiver) = unbounded_channel("consensus_output");
467 let commit_consumer = CommitConsumer::new(sender, 0);
468
469 let authority = ConsensusAuthority::start(
470 network_type,
471 0,
472 own_index,
473 committee,
474 parameters,
475 ProtocolConfig::get_for_max_version_UNSAFE(),
476 protocol_keypair,
477 network_keypair,
478 Arc::new(Clock::default()),
479 Arc::new(txn_verifier),
480 commit_consumer,
481 registry,
482 0,
483 )
484 .await;
485
486 assert_eq!(authority.context().own_index, own_index);
487 assert_eq!(authority.context().committee.epoch(), 0);
488 assert_eq!(authority.context().committee.size(), 1);
489
490 authority.stop().await;
491 }
492
493 #[rstest]
495 #[tokio::test(flavor = "current_thread")]
496 async fn test_authority_committee(
497 #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
498 #[values(0, 5, 10)] gc_depth: u32,
499 ) {
500 let db_registry = Registry::new();
501 DBMetrics::init(&db_registry);
502
503 const NUM_OF_AUTHORITIES: usize = 4;
504 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
505 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
506 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
507
508 if gc_depth == 0 {
509 protocol_config.set_consensus_linearize_subdag_v2_for_testing(false);
510 protocol_config
511 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
512 }
513
514 let temp_dirs = (0..NUM_OF_AUTHORITIES)
515 .map(|_| TempDir::new().unwrap())
516 .collect::<Vec<_>>();
517
518 let mut output_receivers = Vec::with_capacity(committee.size());
519 let mut authorities = Vec::with_capacity(committee.size());
520 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
521
522 for (index, _authority_info) in committee.authorities() {
523 let (authority, receiver) = make_authority(
524 index,
525 &temp_dirs[index.value()],
526 committee.clone(),
527 keypairs.clone(),
528 network_type,
529 boot_counters[index],
530 protocol_config.clone(),
531 )
532 .await;
533 boot_counters[index] += 1;
534 output_receivers.push(receiver);
535 authorities.push(authority);
536 }
537
538 const NUM_TRANSACTIONS: u8 = 15;
539 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
540 for i in 0..NUM_TRANSACTIONS {
541 let txn = vec![i; 16];
542 submitted_transactions.insert(txn.clone());
543 authorities[i as usize % authorities.len()]
544 .transaction_client()
545 .submit(vec![txn])
546 .await
547 .unwrap();
548 }
549
550 for receiver in &mut output_receivers {
551 let mut expected_transactions = submitted_transactions.clone();
552 loop {
553 let committed_subdag =
554 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
555 .await
556 .unwrap()
557 .unwrap();
558 for b in committed_subdag.blocks {
559 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
560 assert!(
561 expected_transactions.remove(&txn),
562 "Transaction not submitted or already seen: {txn:?}"
563 );
564 }
565 }
566 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
567 if expected_transactions.is_empty() {
568 break;
569 }
570 }
571 }
572
573 let index = committee.to_authority_index(1).unwrap();
575 authorities.remove(index.value()).stop().await;
576 sleep(Duration::from_secs(10)).await;
577
578 let (authority, receiver) = make_authority(
580 index,
581 &temp_dirs[index.value()],
582 committee.clone(),
583 keypairs.clone(),
584 network_type,
585 boot_counters[index],
586 protocol_config.clone(),
587 )
588 .await;
589 boot_counters[index] += 1;
590 output_receivers[index] = receiver;
591 authorities.insert(index.value(), authority);
592 sleep(Duration::from_secs(10)).await;
593
594 for authority in authorities {
596 authority.stop().await;
597 }
598 }
599
600 #[rstest]
601 #[tokio::test(flavor = "current_thread")]
602 async fn test_small_committee(
603 #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
604 #[values(1, 2, 3)] num_authorities: usize,
605 ) {
606 let db_registry = Registry::new();
607 DBMetrics::init(&db_registry);
608
609 let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
610 let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
611
612 let temp_dirs = (0..num_authorities)
613 .map(|_| TempDir::new().unwrap())
614 .collect::<Vec<_>>();
615
616 let mut output_receivers = Vec::with_capacity(committee.size());
617 let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
618 let mut boot_counters = vec![0; num_authorities];
619
620 for (index, _authority_info) in committee.authorities() {
621 let (authority, receiver) = make_authority(
622 index,
623 &temp_dirs[index.value()],
624 committee.clone(),
625 keypairs.clone(),
626 network_type,
627 boot_counters[index],
628 protocol_config.clone(),
629 )
630 .await;
631 boot_counters[index] += 1;
632 output_receivers.push(receiver);
633 authorities.push(authority);
634 }
635
636 const NUM_TRANSACTIONS: u8 = 15;
637 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
638 for i in 0..NUM_TRANSACTIONS {
639 let txn = vec![i; 16];
640 submitted_transactions.insert(txn.clone());
641 authorities[i as usize % authorities.len()]
642 .transaction_client()
643 .submit(vec![txn])
644 .await
645 .unwrap();
646 }
647
648 for receiver in &mut output_receivers {
649 let mut expected_transactions = submitted_transactions.clone();
650 loop {
651 let committed_subdag =
652 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
653 .await
654 .unwrap()
655 .unwrap();
656 for b in committed_subdag.blocks {
657 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
658 assert!(
659 expected_transactions.remove(&txn),
660 "Transaction not submitted or already seen: {txn:?}"
661 );
662 }
663 }
664 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
665 if expected_transactions.is_empty() {
666 break;
667 }
668 }
669 }
670
671 let index = committee.to_authority_index(0).unwrap();
673 authorities.remove(index.value()).stop().await;
674 sleep(Duration::from_secs(10)).await;
675
676 let (authority, receiver) = make_authority(
678 index,
679 &temp_dirs[index.value()],
680 committee.clone(),
681 keypairs.clone(),
682 network_type,
683 boot_counters[index],
684 protocol_config.clone(),
685 )
686 .await;
687 boot_counters[index] += 1;
688 output_receivers[index] = receiver;
689 authorities.insert(index.value(), authority);
690 sleep(Duration::from_secs(10)).await;
691
692 for authority in authorities {
694 authority.stop().await;
695 }
696 }
697
698 #[rstest]
699 #[tokio::test(flavor = "current_thread")]
700 async fn test_amnesia_recovery_success(
701 #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
702 #[values(0, 5, 10)] gc_depth: u32,
703 ) {
704 telemetry_subscribers::init_for_testing();
705 let db_registry = Registry::new();
706 DBMetrics::init(&db_registry);
707
708 const NUM_OF_AUTHORITIES: usize = 4;
709 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
710 let mut output_receivers = vec![];
711 let mut authorities = BTreeMap::new();
712 let mut temp_dirs = BTreeMap::new();
713 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
714
715 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
716 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
717
718 if gc_depth == 0 {
719 protocol_config.set_consensus_linearize_subdag_v2_for_testing(false);
720 protocol_config
721 .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
722 }
723
724 for (index, _authority_info) in committee.authorities() {
725 let dir = TempDir::new().unwrap();
726 let (authority, receiver) = make_authority(
727 index,
728 &dir,
729 committee.clone(),
730 keypairs.clone(),
731 network_type,
732 boot_counters[index],
733 protocol_config.clone(),
734 )
735 .await;
736 assert!(
737 authority.sync_last_known_own_block_enabled(),
738 "Expected syncing of last known own block to be enabled as all authorities are of empty db and boot for first time."
739 );
740 boot_counters[index] += 1;
741 output_receivers.push(receiver);
742 authorities.insert(index, authority);
743 temp_dirs.insert(index, dir);
744 }
745
746 let index_1 = committee.to_authority_index(1).unwrap();
752 'outer: while let Some(result) =
753 timeout(Duration::from_secs(10), output_receivers[index_1].recv())
754 .await
755 .expect("Timed out while waiting for at least one committed block from authority 1")
756 {
757 for block in result.blocks {
758 if block.round() > GENESIS_ROUND && block.author() == index_1 {
759 break 'outer;
760 }
761 }
762 }
763
764 authorities.remove(&index_1).unwrap().stop().await;
773 let index_2 = committee.to_authority_index(2).unwrap();
774 authorities.remove(&index_2).unwrap().stop().await;
775 sleep(Duration::from_secs(5)).await;
776
777 let dir = TempDir::new().unwrap();
782 boot_counters[index_1] = 0;
784 let (authority, mut receiver) = make_authority(
785 index_1,
786 &dir,
787 committee.clone(),
788 keypairs.clone(),
789 network_type,
790 boot_counters[index_1],
791 protocol_config.clone(),
792 )
793 .await;
794 assert!(
795 authority.sync_last_known_own_block_enabled(),
796 "Authority should have the sync of last own block enabled"
797 );
798 boot_counters[index_1] += 1;
799 authorities.insert(index_1, authority);
800 temp_dirs.insert(index_1, dir);
801 sleep(Duration::from_secs(5)).await;
802
803 let (authority, _receiver) = make_authority(
807 index_2,
808 &temp_dirs[&index_2],
809 committee.clone(),
810 keypairs,
811 network_type,
812 boot_counters[index_2],
813 protocol_config.clone(),
814 )
815 .await;
816 assert!(
817 !authority.sync_last_known_own_block_enabled(),
818 "Authority should not have attempted to sync the last own block"
819 );
820 boot_counters[index_2] += 1;
821 authorities.insert(index_2, authority);
822 sleep(Duration::from_secs(5)).await;
823
824 'outer: while let Some(result) = receiver.recv().await {
827 for block in result.blocks {
828 if block.round() > GENESIS_ROUND && block.author() == index_1 {
829 break 'outer;
830 }
831 }
832 }
833
834 for (_, authority) in authorities {
836 authority.stop().await;
837 }
838 }
839
840 async fn make_authority(
842 index: AuthorityIndex,
843 db_dir: &TempDir,
844 committee: Committee,
845 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
846 network_type: ConsensusNetwork,
847 boot_counter: u64,
848 protocol_config: ProtocolConfig,
849 ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
850 let registry = Registry::new();
851
852 let parameters = Parameters {
854 db_path: db_dir.path().to_path_buf(),
855 dag_state_cached_rounds: 5,
856 commit_sync_parallel_fetches: 2,
857 commit_sync_batch_size: 3,
858 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
859 ..Default::default()
860 };
861 let txn_verifier = NoopTransactionVerifier {};
862
863 let protocol_keypair = keypairs[index].1.clone();
864 let network_keypair = keypairs[index].0.clone();
865
866 let (sender, receiver) = unbounded_channel("consensus_output");
867 let commit_consumer = CommitConsumer::new(sender, 0);
868
869 let authority = ConsensusAuthority::start(
870 network_type,
871 0,
872 index,
873 committee,
874 parameters,
875 protocol_config,
876 protocol_keypair,
877 network_keypair,
878 Arc::new(Clock::default()),
879 Arc::new(txn_verifier),
880 commit_consumer,
881 registry,
882 boot_counter,
883 )
884 .await;
885
886 (authority, receiver)
887 }
888}