consensus_core/
authority_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Instant};
6
7use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
8use iota_protocol_config::{ConsensusNetwork, ProtocolConfig};
9use itertools::Itertools;
10use parking_lot::RwLock;
11use prometheus::Registry;
12use tracing::{info, warn};
13
14use crate::{
15    CommitConsumer, CommitConsumerMonitor,
16    authority_service::AuthorityService,
17    block_manager::BlockManager,
18    block_verifier::SignedBlockVerifier,
19    broadcaster::Broadcaster,
20    commit_observer::CommitObserver,
21    commit_syncer::{CommitSyncer, CommitSyncerHandle},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::{Clock, Context},
24    core::{Core, CoreSignals},
25    core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
26    dag_state::DagState,
27    leader_schedule::LeaderSchedule,
28    leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
29    metrics::initialise_metrics,
30    network::{NetworkClient as _, NetworkManager, tonic_network::TonicManager},
31    round_prober::{RoundProber, RoundProberHandle},
32    storage::rocksdb_store::RocksDBStore,
33    subscriber::Subscriber,
34    synchronizer::{Synchronizer, SynchronizerHandle},
35    transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
36};
37
38/// ConsensusAuthority is used by IOTA to manage the lifetime of AuthorityNode.
39/// It hides the details of the implementation from the caller,
40/// MysticetiManager.
41pub enum ConsensusAuthority {
42    #[expect(private_interfaces)]
43    WithTonic(AuthorityNode<TonicManager>),
44}
45
46impl ConsensusAuthority {
47    /// Starts the `ConsensusAuthority` for the specified network type.
48    pub async fn start(
49        network_type: ConsensusNetwork,
50        epoch_start_timestamp_ms: u64,
51        own_index: AuthorityIndex,
52        committee: Committee,
53        parameters: Parameters,
54        protocol_config: ProtocolConfig,
55        protocol_keypair: ProtocolKeyPair,
56        network_keypair: NetworkKeyPair,
57        clock: Arc<Clock>,
58        transaction_verifier: Arc<dyn TransactionVerifier>,
59        commit_consumer: CommitConsumer,
60        registry: Registry,
61        // A counter that keeps track of how many times the authority node has been booted while
62        // the binary or the component that is calling the `ConsensusAuthority` has been
63        // running. It's mostly useful to make decisions on whether amnesia recovery should
64        // run or not. When `boot_counter` is 0, then `ConsensusAuthority` will initiate
65        // the process of amnesia recovery if that's enabled in the parameters.
66        boot_counter: u64,
67    ) -> Self {
68        match network_type {
69            ConsensusNetwork::Tonic => {
70                let authority = AuthorityNode::start(
71                    epoch_start_timestamp_ms,
72                    own_index,
73                    committee,
74                    parameters,
75                    protocol_config,
76                    protocol_keypair,
77                    network_keypair,
78                    clock,
79                    transaction_verifier,
80                    commit_consumer,
81                    registry,
82                    boot_counter,
83                )
84                .await;
85                Self::WithTonic(authority)
86            }
87        }
88    }
89
90    pub async fn stop(self) {
91        match self {
92            Self::WithTonic(authority) => authority.stop().await,
93        }
94    }
95
96    pub fn transaction_client(&self) -> Arc<TransactionClient> {
97        match self {
98            Self::WithTonic(authority) => authority.transaction_client(),
99        }
100    }
101
102    pub async fn replay_complete(&self) {
103        match self {
104            Self::WithTonic(authority) => authority.replay_complete().await,
105        }
106    }
107
108    #[cfg(test)]
109    fn context(&self) -> &Arc<Context> {
110        match self {
111            Self::WithTonic(authority) => &authority.context,
112        }
113    }
114
115    #[cfg(test)]
116    fn sync_last_known_own_block_enabled(&self) -> bool {
117        match self {
118            Self::WithTonic(authority) => authority.sync_last_known_own_block,
119        }
120    }
121}
122
123pub(crate) struct AuthorityNode<N>
124where
125    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
126{
127    context: Arc<Context>,
128    start_time: Instant,
129    transaction_client: Arc<TransactionClient>,
130    synchronizer: Arc<SynchronizerHandle>,
131    commit_consumer_monitor: Arc<CommitConsumerMonitor>,
132
133    commit_syncer_handle: CommitSyncerHandle,
134    round_prober_handle: Option<RoundProberHandle>,
135    leader_timeout_handle: LeaderTimeoutTaskHandle,
136    core_thread_handle: CoreThreadHandle,
137    // Only one of broadcaster and subscriber gets created, depending on
138    // if streaming is supported.
139    broadcaster: Option<Broadcaster>,
140    subscriber: Option<Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>>,
141    network_manager: N,
142    #[cfg(test)]
143    sync_last_known_own_block: bool,
144}
145
146impl<N> AuthorityNode<N>
147where
148    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
149{
150    /// This function initializes and starts the consensus authority node
151    /// It ensures that the authority node is fully initialized and
152    /// ready to participate in the consensus process.
153    pub(crate) async fn start(
154        epoch_start_timestamp_ms: u64,
155        own_index: AuthorityIndex,
156        committee: Committee,
157        parameters: Parameters,
158        protocol_config: ProtocolConfig,
159        // To avoid accidentally leaking the private key, the protocol key pair should only be
160        // kept in Core.
161        protocol_keypair: ProtocolKeyPair,
162        network_keypair: NetworkKeyPair,
163        clock: Arc<Clock>,
164        transaction_verifier: Arc<dyn TransactionVerifier>,
165        commit_consumer: CommitConsumer,
166        registry: Registry,
167        boot_counter: u64,
168    ) -> Self {
169        assert!(
170            committee.is_valid_index(own_index),
171            "Invalid own index {own_index}"
172        );
173        let own_hostname = &committee.authority(own_index).hostname;
174        info!(
175            "Starting consensus authority {own_index} {own_hostname}, {0:?}, epoch start timestamp {epoch_start_timestamp_ms}, boot counter {boot_counter}",
176            protocol_config.version
177        );
178        info!(
179            "Consensus authorities: {}",
180            committee
181                .authorities()
182                .map(|(i, a)| format!("{}: {}", i, a.hostname))
183                .join(", ")
184        );
185        info!("Consensus parameters: {:?}", parameters);
186        info!("Consensus committee: {:?}", committee);
187        let committee_size = committee.size();
188        let context = Arc::new(Context::new(
189            epoch_start_timestamp_ms,
190            own_index,
191            committee,
192            parameters,
193            protocol_config,
194            initialise_metrics(registry, committee_size),
195            clock,
196        ));
197        let start_time = Instant::now();
198
199        let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
200        let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
201
202        let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
203
204        let mut network_manager = N::new(context.clone(), network_keypair);
205        let network_client = network_manager.client();
206
207        // REQUIRED: Broadcaster must be created before Core, to start listening on the
208        // broadcast channel in order to not miss blocks and cause test failures.
209        let broadcaster = if N::Client::SUPPORT_STREAMING {
210            None
211        } else {
212            Some(Broadcaster::new(
213                context.clone(),
214                network_client.clone(),
215                &signals_receivers,
216            ))
217        };
218
219        let store_path = context.parameters.db_path.as_path().to_str().unwrap();
220        let store = Arc::new(RocksDBStore::new(store_path));
221        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
222
223        let highest_known_commit_at_startup = dag_state.read().last_commit_index();
224
225        // Sync last known own block is enabled when:
226        // 1. This is the first boot of the authority node (e.g. disable if the
227        //    validator was active in the previous epoch) and
228        // 2. The timeout for syncing last known own block is not set to zero.
229        let sync_last_known_own_block = boot_counter == 0
230            && !context
231                .parameters
232                .sync_last_known_own_block_timeout
233                .is_zero();
234        info!("Sync last known own block: {sync_last_known_own_block}");
235
236        let block_verifier = Arc::new(SignedBlockVerifier::new(
237            context.clone(),
238            transaction_verifier,
239        ));
240
241        let block_manager =
242            BlockManager::new(context.clone(), dag_state.clone(), block_verifier.clone());
243
244        let leader_schedule = Arc::new(LeaderSchedule::from_store(
245            context.clone(),
246            dag_state.clone(),
247        ));
248
249        let commit_consumer_monitor = commit_consumer.monitor();
250        commit_consumer_monitor
251            .set_highest_observed_commit_at_startup(highest_known_commit_at_startup);
252        let commit_observer = CommitObserver::new(
253            context.clone(),
254            commit_consumer,
255            dag_state.clone(),
256            store.clone(),
257            leader_schedule.clone(),
258        );
259
260        let core = Core::new(
261            context.clone(),
262            leader_schedule,
263            tx_consumer,
264            block_manager,
265            // For streaming RPC, Core will be notified when consumer is available.
266            // For non-streaming RPC, there is no way to know so default to true.
267            // When there is only one (this) authority, assume subscriber exists.
268            !N::Client::SUPPORT_STREAMING || context.committee.size() == 1,
269            commit_observer,
270            core_signals,
271            protocol_keypair,
272            dag_state.clone(),
273            sync_last_known_own_block,
274        );
275
276        let (core_dispatcher, core_thread_handle) =
277            ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
278        let core_dispatcher = Arc::new(core_dispatcher);
279        let leader_timeout_handle =
280            LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
281
282        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
283
284        let synchronizer = Synchronizer::start(
285            network_client.clone(),
286            context.clone(),
287            core_dispatcher.clone(),
288            commit_vote_monitor.clone(),
289            block_verifier.clone(),
290            dag_state.clone(),
291            sync_last_known_own_block,
292        );
293
294        let commit_syncer_handle = CommitSyncer::new(
295            context.clone(),
296            core_dispatcher.clone(),
297            commit_vote_monitor.clone(),
298            commit_consumer_monitor.clone(),
299            network_client.clone(),
300            block_verifier.clone(),
301            dag_state.clone(),
302        )
303        .start();
304
305        let round_prober_handle = if context.protocol_config.consensus_round_prober() {
306            Some(
307                RoundProber::new(
308                    context.clone(),
309                    core_dispatcher.clone(),
310                    dag_state.clone(),
311                    network_client.clone(),
312                )
313                .start(),
314            )
315        } else {
316            None
317        };
318
319        let network_service = Arc::new(AuthorityService::new(
320            context.clone(),
321            block_verifier,
322            commit_vote_monitor,
323            synchronizer.clone(),
324            core_dispatcher,
325            signals_receivers.block_broadcast_receiver(),
326            dag_state.clone(),
327            store,
328        ));
329
330        let subscriber = if N::Client::SUPPORT_STREAMING {
331            let s = Subscriber::new(
332                context.clone(),
333                network_client,
334                network_service.clone(),
335                dag_state,
336            );
337            for (peer, _) in context.committee.authorities() {
338                if peer != context.own_index {
339                    s.subscribe(peer);
340                }
341            }
342            Some(s)
343        } else {
344            None
345        };
346
347        network_manager.install_service(network_service).await;
348
349        info!(
350            "Consensus authority started, took {:?}",
351            start_time.elapsed()
352        );
353
354        Self {
355            context,
356            start_time,
357            transaction_client: Arc::new(tx_client),
358            synchronizer,
359            commit_consumer_monitor,
360            commit_syncer_handle,
361            round_prober_handle,
362            leader_timeout_handle,
363            core_thread_handle,
364            broadcaster,
365            subscriber,
366            network_manager,
367            #[cfg(test)]
368            sync_last_known_own_block,
369        }
370    }
371
372    pub(crate) async fn stop(mut self) {
373        info!(
374            "Stopping authority. Total run time: {:?}",
375            self.start_time.elapsed()
376        );
377
378        // First shutdown components calling into Core.
379        if let Err(e) = self.synchronizer.stop().await {
380            if e.is_panic() {
381                std::panic::resume_unwind(e.into_panic());
382            }
383            warn!(
384                "Failed to stop synchronizer when shutting down consensus: {:?}",
385                e
386            );
387        };
388        self.commit_syncer_handle.stop().await;
389        if let Some(round_prober_handle) = self.round_prober_handle.take() {
390            round_prober_handle.stop().await;
391        }
392        self.leader_timeout_handle.stop().await;
393        // Shutdown Core to stop block productions and broadcast.
394        // When using streaming, all subscribers to broadcasted blocks stop after this.
395        self.core_thread_handle.stop().await;
396        if let Some(mut broadcaster) = self.broadcaster.take() {
397            broadcaster.stop();
398        }
399        // Stop outgoing long lived streams before stopping network server.
400        if let Some(subscriber) = self.subscriber.take() {
401            subscriber.stop();
402        }
403        self.network_manager.stop().await;
404
405        self.context
406            .metrics
407            .node_metrics
408            .uptime
409            .observe(self.start_time.elapsed().as_secs_f64());
410    }
411
412    pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
413        self.transaction_client.clone()
414    }
415
416    pub(crate) async fn replay_complete(&self) {
417        self.commit_consumer_monitor.replay_complete().await;
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    #![allow(non_snake_case)]
424
425    use std::{
426        collections::{BTreeMap, BTreeSet},
427        sync::Arc,
428        time::Duration,
429    };
430
431    use consensus_config::{Parameters, local_committee_and_keys};
432    use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
433    use iota_protocol_config::ProtocolConfig;
434    use prometheus::Registry;
435    use rstest::rstest;
436    use tempfile::TempDir;
437    use tokio::time::{sleep, timeout};
438    use typed_store::DBMetrics;
439
440    use super::*;
441    use crate::{
442        CommittedSubDag,
443        block::{BlockAPI as _, GENESIS_ROUND},
444        transaction::NoopTransactionVerifier,
445    };
446
447    #[rstest]
448    #[tokio::test]
449    async fn test_authority_start_and_stop(
450        #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
451    ) {
452        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
453        let registry = Registry::new();
454
455        let temp_dir = TempDir::new().unwrap();
456        let parameters = Parameters {
457            db_path: temp_dir.keep(),
458            ..Default::default()
459        };
460        let txn_verifier = NoopTransactionVerifier {};
461
462        let own_index = committee.to_authority_index(0).unwrap();
463        let protocol_keypair = keypairs[own_index].1.clone();
464        let network_keypair = keypairs[own_index].0.clone();
465
466        let (sender, _receiver) = unbounded_channel("consensus_output");
467        let commit_consumer = CommitConsumer::new(sender, 0);
468
469        let authority = ConsensusAuthority::start(
470            network_type,
471            0,
472            own_index,
473            committee,
474            parameters,
475            ProtocolConfig::get_for_max_version_UNSAFE(),
476            protocol_keypair,
477            network_keypair,
478            Arc::new(Clock::default()),
479            Arc::new(txn_verifier),
480            commit_consumer,
481            registry,
482            0,
483        )
484        .await;
485
486        assert_eq!(authority.context().own_index, own_index);
487        assert_eq!(authority.context().committee.epoch(), 0);
488        assert_eq!(authority.context().committee.size(), 1);
489
490        authority.stop().await;
491    }
492
493    // TODO: build AuthorityFixture.
494    #[rstest]
495    #[tokio::test(flavor = "current_thread")]
496    async fn test_authority_committee(
497        #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
498        #[values(0, 5, 10)] gc_depth: u32,
499    ) {
500        let db_registry = Registry::new();
501        DBMetrics::init(&db_registry);
502
503        const NUM_OF_AUTHORITIES: usize = 4;
504        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
505        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
506        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
507
508        if gc_depth == 0 {
509            protocol_config.set_consensus_linearize_subdag_v2_for_testing(false);
510            protocol_config
511                .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
512        }
513
514        let temp_dirs = (0..NUM_OF_AUTHORITIES)
515            .map(|_| TempDir::new().unwrap())
516            .collect::<Vec<_>>();
517
518        let mut output_receivers = Vec::with_capacity(committee.size());
519        let mut authorities = Vec::with_capacity(committee.size());
520        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
521
522        for (index, _authority_info) in committee.authorities() {
523            let (authority, receiver) = make_authority(
524                index,
525                &temp_dirs[index.value()],
526                committee.clone(),
527                keypairs.clone(),
528                network_type,
529                boot_counters[index],
530                protocol_config.clone(),
531            )
532            .await;
533            boot_counters[index] += 1;
534            output_receivers.push(receiver);
535            authorities.push(authority);
536        }
537
538        const NUM_TRANSACTIONS: u8 = 15;
539        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
540        for i in 0..NUM_TRANSACTIONS {
541            let txn = vec![i; 16];
542            submitted_transactions.insert(txn.clone());
543            authorities[i as usize % authorities.len()]
544                .transaction_client()
545                .submit(vec![txn])
546                .await
547                .unwrap();
548        }
549
550        for receiver in &mut output_receivers {
551            let mut expected_transactions = submitted_transactions.clone();
552            loop {
553                let committed_subdag =
554                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
555                        .await
556                        .unwrap()
557                        .unwrap();
558                for b in committed_subdag.blocks {
559                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
560                        assert!(
561                            expected_transactions.remove(&txn),
562                            "Transaction not submitted or already seen: {txn:?}"
563                        );
564                    }
565                }
566                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
567                if expected_transactions.is_empty() {
568                    break;
569                }
570            }
571        }
572
573        // Stop authority 1.
574        let index = committee.to_authority_index(1).unwrap();
575        authorities.remove(index.value()).stop().await;
576        sleep(Duration::from_secs(10)).await;
577
578        // Restart authority 1 and let it run.
579        let (authority, receiver) = make_authority(
580            index,
581            &temp_dirs[index.value()],
582            committee.clone(),
583            keypairs.clone(),
584            network_type,
585            boot_counters[index],
586            protocol_config.clone(),
587        )
588        .await;
589        boot_counters[index] += 1;
590        output_receivers[index] = receiver;
591        authorities.insert(index.value(), authority);
592        sleep(Duration::from_secs(10)).await;
593
594        // Stop all authorities and exit.
595        for authority in authorities {
596            authority.stop().await;
597        }
598    }
599
600    #[rstest]
601    #[tokio::test(flavor = "current_thread")]
602    async fn test_small_committee(
603        #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
604        #[values(1, 2, 3)] num_authorities: usize,
605    ) {
606        let db_registry = Registry::new();
607        DBMetrics::init(&db_registry);
608
609        let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
610        let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
611
612        let temp_dirs = (0..num_authorities)
613            .map(|_| TempDir::new().unwrap())
614            .collect::<Vec<_>>();
615
616        let mut output_receivers = Vec::with_capacity(committee.size());
617        let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
618        let mut boot_counters = vec![0; num_authorities];
619
620        for (index, _authority_info) in committee.authorities() {
621            let (authority, receiver) = make_authority(
622                index,
623                &temp_dirs[index.value()],
624                committee.clone(),
625                keypairs.clone(),
626                network_type,
627                boot_counters[index],
628                protocol_config.clone(),
629            )
630            .await;
631            boot_counters[index] += 1;
632            output_receivers.push(receiver);
633            authorities.push(authority);
634        }
635
636        const NUM_TRANSACTIONS: u8 = 15;
637        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
638        for i in 0..NUM_TRANSACTIONS {
639            let txn = vec![i; 16];
640            submitted_transactions.insert(txn.clone());
641            authorities[i as usize % authorities.len()]
642                .transaction_client()
643                .submit(vec![txn])
644                .await
645                .unwrap();
646        }
647
648        for receiver in &mut output_receivers {
649            let mut expected_transactions = submitted_transactions.clone();
650            loop {
651                let committed_subdag =
652                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
653                        .await
654                        .unwrap()
655                        .unwrap();
656                for b in committed_subdag.blocks {
657                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
658                        assert!(
659                            expected_transactions.remove(&txn),
660                            "Transaction not submitted or already seen: {txn:?}"
661                        );
662                    }
663                }
664                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
665                if expected_transactions.is_empty() {
666                    break;
667                }
668            }
669        }
670
671        // Stop authority 0.
672        let index = committee.to_authority_index(0).unwrap();
673        authorities.remove(index.value()).stop().await;
674        sleep(Duration::from_secs(10)).await;
675
676        // Restart authority 0 and let it run.
677        let (authority, receiver) = make_authority(
678            index,
679            &temp_dirs[index.value()],
680            committee.clone(),
681            keypairs.clone(),
682            network_type,
683            boot_counters[index],
684            protocol_config.clone(),
685        )
686        .await;
687        boot_counters[index] += 1;
688        output_receivers[index] = receiver;
689        authorities.insert(index.value(), authority);
690        sleep(Duration::from_secs(10)).await;
691
692        // Stop all authorities and exit.
693        for authority in authorities {
694            authority.stop().await;
695        }
696    }
697
698    #[rstest]
699    #[tokio::test(flavor = "current_thread")]
700    async fn test_amnesia_recovery_success(
701        #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
702        #[values(0, 5, 10)] gc_depth: u32,
703    ) {
704        telemetry_subscribers::init_for_testing();
705        let db_registry = Registry::new();
706        DBMetrics::init(&db_registry);
707
708        const NUM_OF_AUTHORITIES: usize = 4;
709        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
710        let mut output_receivers = vec![];
711        let mut authorities = BTreeMap::new();
712        let mut temp_dirs = BTreeMap::new();
713        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
714
715        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
716        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
717
718        if gc_depth == 0 {
719            protocol_config.set_consensus_linearize_subdag_v2_for_testing(false);
720            protocol_config
721                .set_consensus_median_timestamp_with_checkpoint_enforcement_for_testing(false);
722        }
723
724        for (index, _authority_info) in committee.authorities() {
725            let dir = TempDir::new().unwrap();
726            let (authority, receiver) = make_authority(
727                index,
728                &dir,
729                committee.clone(),
730                keypairs.clone(),
731                network_type,
732                boot_counters[index],
733                protocol_config.clone(),
734            )
735            .await;
736            assert!(
737                authority.sync_last_known_own_block_enabled(),
738                "Expected syncing of last known own block to be enabled as all authorities are of empty db and boot for first time."
739            );
740            boot_counters[index] += 1;
741            output_receivers.push(receiver);
742            authorities.insert(index, authority);
743            temp_dirs.insert(index, dir);
744        }
745
746        // Now we take the receiver of authority 1 and we wait until we see at least one
747        // block committed from this authority We wait until we see at least one
748        // committed block authored from this authority. That way we'll be 100% sure
749        // that at least one block has been proposed and successfully received
750        // by a quorum of nodes.
751        let index_1 = committee.to_authority_index(1).unwrap();
752        'outer: while let Some(result) =
753            timeout(Duration::from_secs(10), output_receivers[index_1].recv())
754                .await
755                .expect("Timed out while waiting for at least one committed block from authority 1")
756        {
757            for block in result.blocks {
758                if block.round() > GENESIS_ROUND && block.author() == index_1 {
759                    break 'outer;
760                }
761            }
762        }
763
764        // Stop authority 1 & 2.
765        // * Authority 1 will be used to wipe out their DB and practically "force" the
766        //   amnesia recovery.
767        // * Authority 2 is stopped in order to simulate less than f+1 availability
768        //   which will
769        // make authority 1 retry during amnesia recovery until it has finally managed
770        // to successfully get back f+1 responses. once authority 2 is up and
771        // running again.
772        authorities.remove(&index_1).unwrap().stop().await;
773        let index_2 = committee.to_authority_index(2).unwrap();
774        authorities.remove(&index_2).unwrap().stop().await;
775        sleep(Duration::from_secs(5)).await;
776
777        // Authority 1: create a new directory to simulate amnesia. The node will start
778        // having participated previously to consensus but now will attempt to
779        // synchronize the last own block and recover from there. It won't be able
780        // to do that successfully as authority 2 is still down.
781        let dir = TempDir::new().unwrap();
782        // We do reset the boot counter for this one to simulate a "binary" restart
783        boot_counters[index_1] = 0;
784        let (authority, mut receiver) = make_authority(
785            index_1,
786            &dir,
787            committee.clone(),
788            keypairs.clone(),
789            network_type,
790            boot_counters[index_1],
791            protocol_config.clone(),
792        )
793        .await;
794        assert!(
795            authority.sync_last_known_own_block_enabled(),
796            "Authority should have the sync of last own block enabled"
797        );
798        boot_counters[index_1] += 1;
799        authorities.insert(index_1, authority);
800        temp_dirs.insert(index_1, dir);
801        sleep(Duration::from_secs(5)).await;
802
803        // Now spin up authority 2 using its earlier directly - so no amnesia recovery
804        // should be forced here. Authority 1 should be able to recover from
805        // amnesia successfully.
806        let (authority, _receiver) = make_authority(
807            index_2,
808            &temp_dirs[&index_2],
809            committee.clone(),
810            keypairs,
811            network_type,
812            boot_counters[index_2],
813            protocol_config.clone(),
814        )
815        .await;
816        assert!(
817            !authority.sync_last_known_own_block_enabled(),
818            "Authority should not have attempted to sync the last own block"
819        );
820        boot_counters[index_2] += 1;
821        authorities.insert(index_2, authority);
822        sleep(Duration::from_secs(5)).await;
823
824        // We wait until we see at least one committed block authored from this
825        // authority
826        'outer: while let Some(result) = receiver.recv().await {
827            for block in result.blocks {
828                if block.round() > GENESIS_ROUND && block.author() == index_1 {
829                    break 'outer;
830                }
831            }
832        }
833
834        // Stop all authorities and exit.
835        for (_, authority) in authorities {
836            authority.stop().await;
837        }
838    }
839
840    // TODO: create a fixture
841    async fn make_authority(
842        index: AuthorityIndex,
843        db_dir: &TempDir,
844        committee: Committee,
845        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
846        network_type: ConsensusNetwork,
847        boot_counter: u64,
848        protocol_config: ProtocolConfig,
849    ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
850        let registry = Registry::new();
851
852        // Cache less blocks to exercise commit sync.
853        let parameters = Parameters {
854            db_path: db_dir.path().to_path_buf(),
855            dag_state_cached_rounds: 5,
856            commit_sync_parallel_fetches: 2,
857            commit_sync_batch_size: 3,
858            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
859            ..Default::default()
860        };
861        let txn_verifier = NoopTransactionVerifier {};
862
863        let protocol_keypair = keypairs[index].1.clone();
864        let network_keypair = keypairs[index].0.clone();
865
866        let (sender, receiver) = unbounded_channel("consensus_output");
867        let commit_consumer = CommitConsumer::new(sender, 0);
868
869        let authority = ConsensusAuthority::start(
870            network_type,
871            0,
872            index,
873            committee,
874            parameters,
875            protocol_config,
876            protocol_keypair,
877            network_keypair,
878            Arc::new(Clock::default()),
879            Arc::new(txn_verifier),
880            commit_consumer,
881            registry,
882            boot_counter,
883        )
884        .await;
885
886        (authority, receiver)
887    }
888}