1use std::{sync::Arc, time::Instant};
6
7use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
8use iota_protocol_config::{ConsensusNetwork, ProtocolConfig};
9use itertools::Itertools;
10use parking_lot::RwLock;
11use prometheus::Registry;
12use tracing::{info, warn};
13
14use crate::{
15 CommitConsumer, CommitConsumerMonitor,
16 authority_service::AuthorityService,
17 block_manager::BlockManager,
18 block_verifier::SignedBlockVerifier,
19 broadcaster::Broadcaster,
20 commit_observer::CommitObserver,
21 commit_syncer::{CommitSyncer, CommitSyncerHandle},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::{Clock, Context},
24 core::{Core, CoreSignals},
25 core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
26 dag_state::DagState,
27 leader_schedule::LeaderSchedule,
28 leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
29 metrics::initialise_metrics,
30 network::{NetworkClient as _, NetworkManager, tonic_network::TonicManager},
31 round_prober::{RoundProber, RoundProberHandle},
32 storage::rocksdb_store::RocksDBStore,
33 subscriber::Subscriber,
34 synchronizer::{Synchronizer, SynchronizerHandle},
35 transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
36};
37
38pub enum ConsensusAuthority {
42 #[expect(private_interfaces)]
43 WithTonic(AuthorityNode<TonicManager>),
44}
45
46impl ConsensusAuthority {
47 pub async fn start(
49 network_type: ConsensusNetwork,
50 own_index: AuthorityIndex,
51 committee: Committee,
52 parameters: Parameters,
53 protocol_config: ProtocolConfig,
54 protocol_keypair: ProtocolKeyPair,
55 network_keypair: NetworkKeyPair,
56 transaction_verifier: Arc<dyn TransactionVerifier>,
57 commit_consumer: CommitConsumer,
58 registry: Registry,
59 boot_counter: u64,
65 ) -> Self {
66 match network_type {
67 ConsensusNetwork::Tonic => {
68 let authority = AuthorityNode::start(
69 own_index,
70 committee,
71 parameters,
72 protocol_config,
73 protocol_keypair,
74 network_keypair,
75 transaction_verifier,
76 commit_consumer,
77 registry,
78 boot_counter,
79 )
80 .await;
81 Self::WithTonic(authority)
82 }
83 }
84 }
85
86 pub async fn stop(self) {
87 match self {
88 Self::WithTonic(authority) => authority.stop().await,
89 }
90 }
91
92 pub fn transaction_client(&self) -> Arc<TransactionClient> {
93 match self {
94 Self::WithTonic(authority) => authority.transaction_client(),
95 }
96 }
97
98 pub async fn replay_complete(&self) {
99 match self {
100 Self::WithTonic(authority) => authority.replay_complete().await,
101 }
102 }
103
104 #[cfg(test)]
105 fn context(&self) -> &Arc<Context> {
106 match self {
107 Self::WithTonic(authority) => &authority.context,
108 }
109 }
110
111 #[cfg(test)]
112 fn sync_last_known_own_block_enabled(&self) -> bool {
113 match self {
114 Self::WithTonic(authority) => authority.sync_last_known_own_block,
115 }
116 }
117}
118
119pub(crate) struct AuthorityNode<N>
120where
121 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
122{
123 context: Arc<Context>,
124 start_time: Instant,
125 transaction_client: Arc<TransactionClient>,
126 synchronizer: Arc<SynchronizerHandle>,
127 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
128
129 commit_syncer_handle: CommitSyncerHandle,
130 round_prober_handle: Option<RoundProberHandle>,
131 leader_timeout_handle: LeaderTimeoutTaskHandle,
132 core_thread_handle: CoreThreadHandle,
133 broadcaster: Option<Broadcaster>,
136 subscriber: Option<Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>>,
137 network_manager: N,
138 #[cfg(test)]
139 sync_last_known_own_block: bool,
140}
141
142impl<N> AuthorityNode<N>
143where
144 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
145{
146 pub(crate) async fn start(
150 own_index: AuthorityIndex,
151 committee: Committee,
152 parameters: Parameters,
153 protocol_config: ProtocolConfig,
154 protocol_keypair: ProtocolKeyPair,
157 network_keypair: NetworkKeyPair,
158 transaction_verifier: Arc<dyn TransactionVerifier>,
159 commit_consumer: CommitConsumer,
160 registry: Registry,
161 boot_counter: u64,
162 ) -> Self {
163 assert!(
164 committee.is_valid_index(own_index),
165 "Invalid own index {}",
166 own_index
167 );
168 let own_hostname = &committee.authority(own_index).hostname;
169 info!(
170 "Starting consensus authority {} {}, {:?}, boot counter {}",
171 own_index, own_hostname, protocol_config.version, boot_counter
172 );
173 info!(
174 "Consensus authorities: {}",
175 committee
176 .authorities()
177 .map(|(i, a)| format!("{}: {}", i, a.hostname))
178 .join(", ")
179 );
180 info!("Consensus parameters: {:?}", parameters);
181 info!("Consensus committee: {:?}", committee);
182 let context = Arc::new(Context::new(
183 own_index,
184 committee,
185 parameters,
186 protocol_config,
187 initialise_metrics(registry),
188 Arc::new(Clock::new()),
189 ));
190 let start_time = Instant::now();
191
192 let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
193 let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
194
195 let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
196
197 let mut network_manager = N::new(context.clone(), network_keypair);
198 let network_client = network_manager.client();
199
200 let broadcaster = if N::Client::SUPPORT_STREAMING {
203 None
204 } else {
205 Some(Broadcaster::new(
206 context.clone(),
207 network_client.clone(),
208 &signals_receivers,
209 ))
210 };
211
212 let store_path = context.parameters.db_path.as_path().to_str().unwrap();
213 let store = Arc::new(RocksDBStore::new(store_path));
214 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
215
216 let highest_known_commit_at_startup = dag_state.read().last_commit_index();
217
218 let sync_last_known_own_block = boot_counter == 0
219 && dag_state.read().highest_accepted_round() == 0
220 && !context
221 .parameters
222 .sync_last_known_own_block_timeout
223 .is_zero();
224 info!("Sync last known own block: {sync_last_known_own_block}");
225
226 let block_verifier = Arc::new(SignedBlockVerifier::new(
227 context.clone(),
228 transaction_verifier,
229 ));
230
231 let block_manager =
232 BlockManager::new(context.clone(), dag_state.clone(), block_verifier.clone());
233
234 let leader_schedule = Arc::new(LeaderSchedule::from_store(
235 context.clone(),
236 dag_state.clone(),
237 ));
238
239 let commit_consumer_monitor = commit_consumer.monitor();
240 commit_consumer_monitor
241 .set_highest_observed_commit_at_startup(highest_known_commit_at_startup);
242 let commit_observer = CommitObserver::new(
243 context.clone(),
244 commit_consumer,
245 dag_state.clone(),
246 store.clone(),
247 leader_schedule.clone(),
248 );
249
250 let core = Core::new(
251 context.clone(),
252 leader_schedule,
253 tx_consumer,
254 block_manager,
255 !N::Client::SUPPORT_STREAMING || context.committee.size() == 1,
259 commit_observer,
260 core_signals,
261 protocol_keypair,
262 dag_state.clone(),
263 sync_last_known_own_block,
264 );
265
266 let (core_dispatcher, core_thread_handle) =
267 ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
268 let core_dispatcher = Arc::new(core_dispatcher);
269 let leader_timeout_handle =
270 LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
271
272 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
273
274 let synchronizer = Synchronizer::start(
275 network_client.clone(),
276 context.clone(),
277 core_dispatcher.clone(),
278 commit_vote_monitor.clone(),
279 block_verifier.clone(),
280 dag_state.clone(),
281 sync_last_known_own_block,
282 );
283
284 let commit_syncer_handle = CommitSyncer::new(
285 context.clone(),
286 core_dispatcher.clone(),
287 commit_vote_monitor.clone(),
288 commit_consumer_monitor.clone(),
289 network_client.clone(),
290 block_verifier.clone(),
291 dag_state.clone(),
292 )
293 .start();
294
295 let round_prober_handle = if context.protocol_config.consensus_round_prober() {
296 Some(
297 RoundProber::new(
298 context.clone(),
299 core_dispatcher.clone(),
300 dag_state.clone(),
301 network_client.clone(),
302 )
303 .start(),
304 )
305 } else {
306 None
307 };
308
309 let network_service = Arc::new(AuthorityService::new(
310 context.clone(),
311 block_verifier,
312 commit_vote_monitor,
313 synchronizer.clone(),
314 core_dispatcher,
315 signals_receivers.block_broadcast_receiver(),
316 dag_state.clone(),
317 store,
318 ));
319
320 let subscriber = if N::Client::SUPPORT_STREAMING {
321 let s = Subscriber::new(
322 context.clone(),
323 network_client,
324 network_service.clone(),
325 dag_state,
326 );
327 for (peer, _) in context.committee.authorities() {
328 if peer != context.own_index {
329 s.subscribe(peer);
330 }
331 }
332 Some(s)
333 } else {
334 None
335 };
336
337 network_manager.install_service(network_service).await;
338
339 info!(
340 "Consensus authority started, took {:?}",
341 start_time.elapsed()
342 );
343
344 Self {
345 context,
346 start_time,
347 transaction_client: Arc::new(tx_client),
348 synchronizer,
349 commit_consumer_monitor,
350 commit_syncer_handle,
351 round_prober_handle,
352 leader_timeout_handle,
353 core_thread_handle,
354 broadcaster,
355 subscriber,
356 network_manager,
357 #[cfg(test)]
358 sync_last_known_own_block,
359 }
360 }
361
362 pub(crate) async fn stop(mut self) {
363 info!(
364 "Stopping authority. Total run time: {:?}",
365 self.start_time.elapsed()
366 );
367
368 if let Err(e) = self.synchronizer.stop().await {
370 if e.is_panic() {
371 std::panic::resume_unwind(e.into_panic());
372 }
373 warn!(
374 "Failed to stop synchronizer when shutting down consensus: {:?}",
375 e
376 );
377 };
378 self.commit_syncer_handle.stop().await;
379 if let Some(round_prober_handle) = self.round_prober_handle.take() {
380 round_prober_handle.stop().await;
381 }
382 self.leader_timeout_handle.stop().await;
383 self.core_thread_handle.stop().await;
386 if let Some(mut broadcaster) = self.broadcaster.take() {
387 broadcaster.stop();
388 }
389 if let Some(subscriber) = self.subscriber.take() {
391 subscriber.stop();
392 }
393 self.network_manager.stop().await;
394
395 self.context
396 .metrics
397 .node_metrics
398 .uptime
399 .observe(self.start_time.elapsed().as_secs_f64());
400 }
401
402 pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
403 self.transaction_client.clone()
404 }
405
406 pub(crate) async fn replay_complete(&self) {
407 self.commit_consumer_monitor.replay_complete().await;
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 #![allow(non_snake_case)]
414
415 use std::{
416 collections::{BTreeMap, BTreeSet},
417 sync::Arc,
418 time::Duration,
419 };
420
421 use consensus_config::{Parameters, local_committee_and_keys};
422 use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
423 use iota_protocol_config::ProtocolConfig;
424 use prometheus::Registry;
425 use rstest::rstest;
426 use tempfile::TempDir;
427 use tokio::time::{sleep, timeout};
428 use typed_store::DBMetrics;
429
430 use super::*;
431 use crate::{
432 CommittedSubDag,
433 block::{BlockAPI as _, GENESIS_ROUND},
434 transaction::NoopTransactionVerifier,
435 };
436
437 #[rstest]
438 #[tokio::test]
439 async fn test_authority_start_and_stop(
440 #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
441 ) {
442 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
443 let registry = Registry::new();
444
445 let temp_dir = TempDir::new().unwrap();
446 let parameters = Parameters {
447 db_path: temp_dir.into_path(),
448 ..Default::default()
449 };
450 let txn_verifier = NoopTransactionVerifier {};
451
452 let own_index = committee.to_authority_index(0).unwrap();
453 let protocol_keypair = keypairs[own_index].1.clone();
454 let network_keypair = keypairs[own_index].0.clone();
455
456 let (sender, _receiver) = unbounded_channel("consensus_output");
457 let commit_consumer = CommitConsumer::new(sender, 0);
458
459 let authority = ConsensusAuthority::start(
460 network_type,
461 own_index,
462 committee,
463 parameters,
464 ProtocolConfig::get_for_max_version_UNSAFE(),
465 protocol_keypair,
466 network_keypair,
467 Arc::new(txn_verifier),
468 commit_consumer,
469 registry,
470 0,
471 )
472 .await;
473
474 assert_eq!(authority.context().own_index, own_index);
475 assert_eq!(authority.context().committee.epoch(), 0);
476 assert_eq!(authority.context().committee.size(), 1);
477
478 authority.stop().await;
479 }
480
481 #[rstest]
483 #[tokio::test(flavor = "current_thread")]
484 async fn test_authority_committee(
485 #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
486 #[values(0, 5, 10)] gc_depth: u32,
487 ) {
488 let db_registry = Registry::new();
489 DBMetrics::init(&db_registry);
490
491 const NUM_OF_AUTHORITIES: usize = 4;
492 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
493 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
494 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
495
496 if gc_depth == 0 {
497 protocol_config.set_consensus_linearize_subdag_v2_for_testing(false);
498 }
499
500 let temp_dirs = (0..NUM_OF_AUTHORITIES)
501 .map(|_| TempDir::new().unwrap())
502 .collect::<Vec<_>>();
503
504 let mut output_receivers = Vec::with_capacity(committee.size());
505 let mut authorities = Vec::with_capacity(committee.size());
506 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
507
508 for (index, _authority_info) in committee.authorities() {
509 let (authority, receiver) = make_authority(
510 index,
511 &temp_dirs[index.value()],
512 committee.clone(),
513 keypairs.clone(),
514 network_type,
515 boot_counters[index],
516 protocol_config.clone(),
517 )
518 .await;
519 boot_counters[index] += 1;
520 output_receivers.push(receiver);
521 authorities.push(authority);
522 }
523
524 const NUM_TRANSACTIONS: u8 = 15;
525 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
526 for i in 0..NUM_TRANSACTIONS {
527 let txn = vec![i; 16];
528 submitted_transactions.insert(txn.clone());
529 authorities[i as usize % authorities.len()]
530 .transaction_client()
531 .submit(vec![txn])
532 .await
533 .unwrap();
534 }
535
536 for receiver in &mut output_receivers {
537 let mut expected_transactions = submitted_transactions.clone();
538 loop {
539 let committed_subdag =
540 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
541 .await
542 .unwrap()
543 .unwrap();
544 for b in committed_subdag.blocks {
545 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
546 assert!(
547 expected_transactions.remove(&txn),
548 "Transaction not submitted or already seen: {:?}",
549 txn
550 );
551 }
552 }
553 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
554 if expected_transactions.is_empty() {
555 break;
556 }
557 }
558 }
559
560 let index = committee.to_authority_index(1).unwrap();
562 authorities.remove(index.value()).stop().await;
563 sleep(Duration::from_secs(10)).await;
564
565 let (authority, receiver) = make_authority(
567 index,
568 &temp_dirs[index.value()],
569 committee.clone(),
570 keypairs.clone(),
571 network_type,
572 boot_counters[index],
573 protocol_config.clone(),
574 )
575 .await;
576 boot_counters[index] += 1;
577 output_receivers[index] = receiver;
578 authorities.insert(index.value(), authority);
579 sleep(Duration::from_secs(10)).await;
580
581 for authority in authorities {
583 authority.stop().await;
584 }
585 }
586
587 #[rstest]
588 #[tokio::test(flavor = "current_thread")]
589 async fn test_small_committee(
590 #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
591 #[values(1, 2, 3)] num_authorities: usize,
592 ) {
593 let db_registry = Registry::new();
594 DBMetrics::init(&db_registry);
595
596 let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
597 let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
598
599 let temp_dirs = (0..num_authorities)
600 .map(|_| TempDir::new().unwrap())
601 .collect::<Vec<_>>();
602
603 let mut output_receivers = Vec::with_capacity(committee.size());
604 let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
605 let mut boot_counters = vec![0; num_authorities];
606
607 for (index, _authority_info) in committee.authorities() {
608 let (authority, receiver) = make_authority(
609 index,
610 &temp_dirs[index.value()],
611 committee.clone(),
612 keypairs.clone(),
613 network_type,
614 boot_counters[index],
615 protocol_config.clone(),
616 )
617 .await;
618 boot_counters[index] += 1;
619 output_receivers.push(receiver);
620 authorities.push(authority);
621 }
622
623 const NUM_TRANSACTIONS: u8 = 15;
624 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
625 for i in 0..NUM_TRANSACTIONS {
626 let txn = vec![i; 16];
627 submitted_transactions.insert(txn.clone());
628 authorities[i as usize % authorities.len()]
629 .transaction_client()
630 .submit(vec![txn])
631 .await
632 .unwrap();
633 }
634
635 for receiver in &mut output_receivers {
636 let mut expected_transactions = submitted_transactions.clone();
637 loop {
638 let committed_subdag =
639 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
640 .await
641 .unwrap()
642 .unwrap();
643 for b in committed_subdag.blocks {
644 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
645 assert!(
646 expected_transactions.remove(&txn),
647 "Transaction not submitted or already seen: {:?}",
648 txn
649 );
650 }
651 }
652 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
653 if expected_transactions.is_empty() {
654 break;
655 }
656 }
657 }
658
659 let index = committee.to_authority_index(0).unwrap();
661 authorities.remove(index.value()).stop().await;
662 sleep(Duration::from_secs(10)).await;
663
664 let (authority, receiver) = make_authority(
666 index,
667 &temp_dirs[index.value()],
668 committee.clone(),
669 keypairs.clone(),
670 network_type,
671 boot_counters[index],
672 protocol_config.clone(),
673 )
674 .await;
675 boot_counters[index] += 1;
676 output_receivers[index] = receiver;
677 authorities.insert(index.value(), authority);
678 sleep(Duration::from_secs(10)).await;
679
680 for authority in authorities {
682 authority.stop().await;
683 }
684 }
685
686 #[rstest]
687 #[tokio::test(flavor = "current_thread")]
688 async fn test_amnesia_recovery_success(
689 #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
690 #[values(0, 5, 10)] gc_depth: u32,
691 ) {
692 telemetry_subscribers::init_for_testing();
693 let db_registry = Registry::new();
694 DBMetrics::init(&db_registry);
695
696 const NUM_OF_AUTHORITIES: usize = 4;
697 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
698 let mut output_receivers = vec![];
699 let mut authorities = BTreeMap::new();
700 let mut temp_dirs = BTreeMap::new();
701 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
702
703 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
704 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
705
706 if gc_depth == 0 {
707 protocol_config.set_consensus_linearize_subdag_v2_for_testing(false);
708 }
709
710 for (index, _authority_info) in committee.authorities() {
711 let dir = TempDir::new().unwrap();
712 let (authority, receiver) = make_authority(
713 index,
714 &dir,
715 committee.clone(),
716 keypairs.clone(),
717 network_type,
718 boot_counters[index],
719 protocol_config.clone(),
720 )
721 .await;
722 assert!(
723 authority.sync_last_known_own_block_enabled(),
724 "Expected syncing of last known own block to be enabled as all authorities are of empty db and boot for first time."
725 );
726 boot_counters[index] += 1;
727 output_receivers.push(receiver);
728 authorities.insert(index, authority);
729 temp_dirs.insert(index, dir);
730 }
731
732 let index_1 = committee.to_authority_index(1).unwrap();
738 'outer: while let Some(result) =
739 timeout(Duration::from_secs(10), output_receivers[index_1].recv())
740 .await
741 .expect("Timed out while waiting for at least one committed block from authority 1")
742 {
743 for block in result.blocks {
744 if block.round() > GENESIS_ROUND && block.author() == index_1 {
745 break 'outer;
746 }
747 }
748 }
749
750 authorities.remove(&index_1).unwrap().stop().await;
759 let index_2 = committee.to_authority_index(2).unwrap();
760 authorities.remove(&index_2).unwrap().stop().await;
761 sleep(Duration::from_secs(5)).await;
762
763 let dir = TempDir::new().unwrap();
768 boot_counters[index_1] = 0;
770 let (authority, mut receiver) = make_authority(
771 index_1,
772 &dir,
773 committee.clone(),
774 keypairs.clone(),
775 network_type,
776 boot_counters[index_1],
777 protocol_config.clone(),
778 )
779 .await;
780 assert!(
781 authority.sync_last_known_own_block_enabled(),
782 "Authority should have the sync of last own block enabled"
783 );
784 boot_counters[index_1] += 1;
785 authorities.insert(index_1, authority);
786 temp_dirs.insert(index_1, dir);
787 sleep(Duration::from_secs(5)).await;
788
789 let (authority, _receiver) = make_authority(
793 index_2,
794 &temp_dirs[&index_2],
795 committee.clone(),
796 keypairs,
797 network_type,
798 boot_counters[index_2],
799 protocol_config.clone(),
800 )
801 .await;
802 assert!(
803 !authority.sync_last_known_own_block_enabled(),
804 "Authority should not have attempted to sync the last own block"
805 );
806 boot_counters[index_2] += 1;
807 authorities.insert(index_2, authority);
808 sleep(Duration::from_secs(5)).await;
809
810 'outer: while let Some(result) = receiver.recv().await {
813 for block in result.blocks {
814 if block.round() > GENESIS_ROUND && block.author() == index_1 {
815 break 'outer;
816 }
817 }
818 }
819
820 for (_, authority) in authorities {
822 authority.stop().await;
823 }
824 }
825
826 async fn make_authority(
828 index: AuthorityIndex,
829 db_dir: &TempDir,
830 committee: Committee,
831 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
832 network_type: ConsensusNetwork,
833 boot_counter: u64,
834 protocol_config: ProtocolConfig,
835 ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
836 let registry = Registry::new();
837
838 let parameters = Parameters {
840 db_path: db_dir.path().to_path_buf(),
841 dag_state_cached_rounds: 5,
842 commit_sync_parallel_fetches: 2,
843 commit_sync_batch_size: 3,
844 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
845 ..Default::default()
846 };
847 let txn_verifier = NoopTransactionVerifier {};
848
849 let protocol_keypair = keypairs[index].1.clone();
850 let network_keypair = keypairs[index].0.clone();
851
852 let (sender, receiver) = unbounded_channel("consensus_output");
853 let commit_consumer = CommitConsumer::new(sender, 0);
854
855 let authority = ConsensusAuthority::start(
856 network_type,
857 index,
858 committee,
859 parameters,
860 protocol_config,
861 protocol_keypair,
862 network_keypair,
863 Arc::new(txn_verifier),
864 commit_consumer,
865 registry,
866 boot_counter,
867 )
868 .await;
869
870 (authority, receiver)
871 }
872}