consensus_core/
authority_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Instant};
6
7use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
8use iota_protocol_config::{ConsensusNetwork, ProtocolConfig};
9use itertools::Itertools;
10use parking_lot::RwLock;
11use prometheus::Registry;
12use tracing::{info, warn};
13
14use crate::{
15    CommitConsumer, CommitConsumerMonitor,
16    authority_service::AuthorityService,
17    block_manager::BlockManager,
18    block_verifier::SignedBlockVerifier,
19    broadcaster::Broadcaster,
20    commit_observer::CommitObserver,
21    commit_syncer::{CommitSyncer, CommitSyncerHandle},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::{Clock, Context},
24    core::{Core, CoreSignals},
25    core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
26    dag_state::DagState,
27    leader_schedule::LeaderSchedule,
28    leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
29    metrics::initialise_metrics,
30    network::{NetworkClient as _, NetworkManager, tonic_network::TonicManager},
31    round_prober::{RoundProber, RoundProberHandle},
32    storage::rocksdb_store::RocksDBStore,
33    subscriber::Subscriber,
34    synchronizer::{Synchronizer, SynchronizerHandle},
35    transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
36};
37
38/// ConsensusAuthority is used by IOTA to manage the lifetime of AuthorityNode.
39/// It hides the details of the implementation from the caller,
40/// MysticetiManager.
41pub enum ConsensusAuthority {
42    #[expect(private_interfaces)]
43    WithTonic(AuthorityNode<TonicManager>),
44}
45
46impl ConsensusAuthority {
47    /// Starts the `ConsensusAuthority` for the specified network type.
48    pub async fn start(
49        network_type: ConsensusNetwork,
50        own_index: AuthorityIndex,
51        committee: Committee,
52        parameters: Parameters,
53        protocol_config: ProtocolConfig,
54        protocol_keypair: ProtocolKeyPair,
55        network_keypair: NetworkKeyPair,
56        transaction_verifier: Arc<dyn TransactionVerifier>,
57        commit_consumer: CommitConsumer,
58        registry: Registry,
59        // A counter that keeps track of how many times the authority node has been booted while
60        // the binary or the component that is calling the `ConsensusAuthority` has been
61        // running. It's mostly useful to make decisions on whether amnesia recovery should
62        // run or not. When `boot_counter` is 0, then `ConsensusAuthority` will initiate
63        // the process of amnesia recovery if that's enabled in the parameters.
64        boot_counter: u64,
65    ) -> Self {
66        match network_type {
67            ConsensusNetwork::Tonic => {
68                let authority = AuthorityNode::start(
69                    own_index,
70                    committee,
71                    parameters,
72                    protocol_config,
73                    protocol_keypair,
74                    network_keypair,
75                    transaction_verifier,
76                    commit_consumer,
77                    registry,
78                    boot_counter,
79                )
80                .await;
81                Self::WithTonic(authority)
82            }
83        }
84    }
85
86    pub async fn stop(self) {
87        match self {
88            Self::WithTonic(authority) => authority.stop().await,
89        }
90    }
91
92    pub fn transaction_client(&self) -> Arc<TransactionClient> {
93        match self {
94            Self::WithTonic(authority) => authority.transaction_client(),
95        }
96    }
97
98    pub async fn replay_complete(&self) {
99        match self {
100            Self::WithTonic(authority) => authority.replay_complete().await,
101        }
102    }
103
104    #[cfg(test)]
105    fn context(&self) -> &Arc<Context> {
106        match self {
107            Self::WithTonic(authority) => &authority.context,
108        }
109    }
110
111    #[cfg(test)]
112    fn sync_last_known_own_block_enabled(&self) -> bool {
113        match self {
114            Self::WithTonic(authority) => authority.sync_last_known_own_block,
115        }
116    }
117}
118
119pub(crate) struct AuthorityNode<N>
120where
121    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
122{
123    context: Arc<Context>,
124    start_time: Instant,
125    transaction_client: Arc<TransactionClient>,
126    synchronizer: Arc<SynchronizerHandle>,
127    commit_consumer_monitor: Arc<CommitConsumerMonitor>,
128
129    commit_syncer_handle: CommitSyncerHandle,
130    round_prober_handle: Option<RoundProberHandle>,
131    leader_timeout_handle: LeaderTimeoutTaskHandle,
132    core_thread_handle: CoreThreadHandle,
133    // Only one of broadcaster and subscriber gets created, depending on
134    // if streaming is supported.
135    broadcaster: Option<Broadcaster>,
136    subscriber: Option<Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>>,
137    network_manager: N,
138    #[cfg(test)]
139    sync_last_known_own_block: bool,
140}
141
142impl<N> AuthorityNode<N>
143where
144    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
145{
146    /// This function initializes and starts the consensus authority node
147    /// It ensures that the authority node is fully initialized and
148    /// ready to participate in the consensus process.
149    pub(crate) async fn start(
150        own_index: AuthorityIndex,
151        committee: Committee,
152        parameters: Parameters,
153        protocol_config: ProtocolConfig,
154        // To avoid accidentally leaking the private key, the protocol key pair should only be
155        // kept in Core.
156        protocol_keypair: ProtocolKeyPair,
157        network_keypair: NetworkKeyPair,
158        transaction_verifier: Arc<dyn TransactionVerifier>,
159        commit_consumer: CommitConsumer,
160        registry: Registry,
161        boot_counter: u64,
162    ) -> Self {
163        assert!(
164            committee.is_valid_index(own_index),
165            "Invalid own index {}",
166            own_index
167        );
168        let own_hostname = &committee.authority(own_index).hostname;
169        info!(
170            "Starting consensus authority {} {}, {:?}, boot counter {}",
171            own_index, own_hostname, protocol_config.version, boot_counter
172        );
173        info!(
174            "Consensus authorities: {}",
175            committee
176                .authorities()
177                .map(|(i, a)| format!("{}: {}", i, a.hostname))
178                .join(", ")
179        );
180        info!("Consensus parameters: {:?}", parameters);
181        info!("Consensus committee: {:?}", committee);
182        let context = Arc::new(Context::new(
183            own_index,
184            committee,
185            parameters,
186            protocol_config,
187            initialise_metrics(registry),
188            Arc::new(Clock::new()),
189        ));
190        let start_time = Instant::now();
191
192        let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
193        let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
194
195        let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
196
197        let mut network_manager = N::new(context.clone(), network_keypair);
198        let network_client = network_manager.client();
199
200        // REQUIRED: Broadcaster must be created before Core, to start listening on the
201        // broadcast channel in order to not miss blocks and cause test failures.
202        let broadcaster = if N::Client::SUPPORT_STREAMING {
203            None
204        } else {
205            Some(Broadcaster::new(
206                context.clone(),
207                network_client.clone(),
208                &signals_receivers,
209            ))
210        };
211
212        let store_path = context.parameters.db_path.as_path().to_str().unwrap();
213        let store = Arc::new(RocksDBStore::new(store_path));
214        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
215
216        let highest_known_commit_at_startup = dag_state.read().last_commit_index();
217
218        let sync_last_known_own_block = boot_counter == 0
219            && dag_state.read().highest_accepted_round() == 0
220            && !context
221                .parameters
222                .sync_last_known_own_block_timeout
223                .is_zero();
224        info!("Sync last known own block: {sync_last_known_own_block}");
225
226        let block_verifier = Arc::new(SignedBlockVerifier::new(
227            context.clone(),
228            transaction_verifier,
229        ));
230
231        let block_manager =
232            BlockManager::new(context.clone(), dag_state.clone(), block_verifier.clone());
233
234        let leader_schedule = Arc::new(LeaderSchedule::from_store(
235            context.clone(),
236            dag_state.clone(),
237        ));
238
239        let commit_consumer_monitor = commit_consumer.monitor();
240        commit_consumer_monitor
241            .set_highest_observed_commit_at_startup(highest_known_commit_at_startup);
242        let commit_observer = CommitObserver::new(
243            context.clone(),
244            commit_consumer,
245            dag_state.clone(),
246            store.clone(),
247            leader_schedule.clone(),
248        );
249
250        let core = Core::new(
251            context.clone(),
252            leader_schedule,
253            tx_consumer,
254            block_manager,
255            // For streaming RPC, Core will be notified when consumer is available.
256            // For non-streaming RPC, there is no way to know so default to true.
257            // When there is only one (this) authority, assume subscriber exists.
258            !N::Client::SUPPORT_STREAMING || context.committee.size() == 1,
259            commit_observer,
260            core_signals,
261            protocol_keypair,
262            dag_state.clone(),
263            sync_last_known_own_block,
264        );
265
266        let (core_dispatcher, core_thread_handle) =
267            ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
268        let core_dispatcher = Arc::new(core_dispatcher);
269        let leader_timeout_handle =
270            LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
271
272        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
273
274        let synchronizer = Synchronizer::start(
275            network_client.clone(),
276            context.clone(),
277            core_dispatcher.clone(),
278            commit_vote_monitor.clone(),
279            block_verifier.clone(),
280            dag_state.clone(),
281            sync_last_known_own_block,
282        );
283
284        let commit_syncer_handle = CommitSyncer::new(
285            context.clone(),
286            core_dispatcher.clone(),
287            commit_vote_monitor.clone(),
288            commit_consumer_monitor.clone(),
289            network_client.clone(),
290            block_verifier.clone(),
291            dag_state.clone(),
292        )
293        .start();
294
295        let round_prober_handle = if context.protocol_config.consensus_round_prober() {
296            Some(
297                RoundProber::new(
298                    context.clone(),
299                    core_dispatcher.clone(),
300                    dag_state.clone(),
301                    network_client.clone(),
302                )
303                .start(),
304            )
305        } else {
306            None
307        };
308
309        let network_service = Arc::new(AuthorityService::new(
310            context.clone(),
311            block_verifier,
312            commit_vote_monitor,
313            synchronizer.clone(),
314            core_dispatcher,
315            signals_receivers.block_broadcast_receiver(),
316            dag_state.clone(),
317            store,
318        ));
319
320        let subscriber = if N::Client::SUPPORT_STREAMING {
321            let s = Subscriber::new(
322                context.clone(),
323                network_client,
324                network_service.clone(),
325                dag_state,
326            );
327            for (peer, _) in context.committee.authorities() {
328                if peer != context.own_index {
329                    s.subscribe(peer);
330                }
331            }
332            Some(s)
333        } else {
334            None
335        };
336
337        network_manager.install_service(network_service).await;
338
339        info!(
340            "Consensus authority started, took {:?}",
341            start_time.elapsed()
342        );
343
344        Self {
345            context,
346            start_time,
347            transaction_client: Arc::new(tx_client),
348            synchronizer,
349            commit_consumer_monitor,
350            commit_syncer_handle,
351            round_prober_handle,
352            leader_timeout_handle,
353            core_thread_handle,
354            broadcaster,
355            subscriber,
356            network_manager,
357            #[cfg(test)]
358            sync_last_known_own_block,
359        }
360    }
361
362    pub(crate) async fn stop(mut self) {
363        info!(
364            "Stopping authority. Total run time: {:?}",
365            self.start_time.elapsed()
366        );
367
368        // First shutdown components calling into Core.
369        if let Err(e) = self.synchronizer.stop().await {
370            if e.is_panic() {
371                std::panic::resume_unwind(e.into_panic());
372            }
373            warn!(
374                "Failed to stop synchronizer when shutting down consensus: {:?}",
375                e
376            );
377        };
378        self.commit_syncer_handle.stop().await;
379        if let Some(round_prober_handle) = self.round_prober_handle.take() {
380            round_prober_handle.stop().await;
381        }
382        self.leader_timeout_handle.stop().await;
383        // Shutdown Core to stop block productions and broadcast.
384        // When using streaming, all subscribers to broadcasted blocks stop after this.
385        self.core_thread_handle.stop().await;
386        if let Some(mut broadcaster) = self.broadcaster.take() {
387            broadcaster.stop();
388        }
389        // Stop outgoing long lived streams before stopping network server.
390        if let Some(subscriber) = self.subscriber.take() {
391            subscriber.stop();
392        }
393        self.network_manager.stop().await;
394
395        self.context
396            .metrics
397            .node_metrics
398            .uptime
399            .observe(self.start_time.elapsed().as_secs_f64());
400    }
401
402    pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
403        self.transaction_client.clone()
404    }
405
406    pub(crate) async fn replay_complete(&self) {
407        self.commit_consumer_monitor.replay_complete().await;
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    #![allow(non_snake_case)]
414
415    use std::{
416        collections::{BTreeMap, BTreeSet},
417        sync::Arc,
418        time::Duration,
419    };
420
421    use consensus_config::{Parameters, local_committee_and_keys};
422    use iota_metrics::monitored_mpsc::{UnboundedReceiver, unbounded_channel};
423    use iota_protocol_config::ProtocolConfig;
424    use prometheus::Registry;
425    use rstest::rstest;
426    use tempfile::TempDir;
427    use tokio::time::{sleep, timeout};
428    use typed_store::DBMetrics;
429
430    use super::*;
431    use crate::{
432        CommittedSubDag,
433        block::{BlockAPI as _, GENESIS_ROUND},
434        transaction::NoopTransactionVerifier,
435    };
436
437    #[rstest]
438    #[tokio::test]
439    async fn test_authority_start_and_stop(
440        #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
441    ) {
442        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
443        let registry = Registry::new();
444
445        let temp_dir = TempDir::new().unwrap();
446        let parameters = Parameters {
447            db_path: temp_dir.into_path(),
448            ..Default::default()
449        };
450        let txn_verifier = NoopTransactionVerifier {};
451
452        let own_index = committee.to_authority_index(0).unwrap();
453        let protocol_keypair = keypairs[own_index].1.clone();
454        let network_keypair = keypairs[own_index].0.clone();
455
456        let (sender, _receiver) = unbounded_channel("consensus_output");
457        let commit_consumer = CommitConsumer::new(sender, 0);
458
459        let authority = ConsensusAuthority::start(
460            network_type,
461            own_index,
462            committee,
463            parameters,
464            ProtocolConfig::get_for_max_version_UNSAFE(),
465            protocol_keypair,
466            network_keypair,
467            Arc::new(txn_verifier),
468            commit_consumer,
469            registry,
470            0,
471        )
472        .await;
473
474        assert_eq!(authority.context().own_index, own_index);
475        assert_eq!(authority.context().committee.epoch(), 0);
476        assert_eq!(authority.context().committee.size(), 1);
477
478        authority.stop().await;
479    }
480
481    // TODO: build AuthorityFixture.
482    #[rstest]
483    #[tokio::test(flavor = "current_thread")]
484    async fn test_authority_committee(
485        #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
486        #[values(0, 5, 10)] gc_depth: u32,
487    ) {
488        let db_registry = Registry::new();
489        DBMetrics::init(&db_registry);
490
491        const NUM_OF_AUTHORITIES: usize = 4;
492        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
493        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
494        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
495
496        if gc_depth == 0 {
497            protocol_config.set_consensus_linearize_subdag_v2_for_testing(false);
498        }
499
500        let temp_dirs = (0..NUM_OF_AUTHORITIES)
501            .map(|_| TempDir::new().unwrap())
502            .collect::<Vec<_>>();
503
504        let mut output_receivers = Vec::with_capacity(committee.size());
505        let mut authorities = Vec::with_capacity(committee.size());
506        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
507
508        for (index, _authority_info) in committee.authorities() {
509            let (authority, receiver) = make_authority(
510                index,
511                &temp_dirs[index.value()],
512                committee.clone(),
513                keypairs.clone(),
514                network_type,
515                boot_counters[index],
516                protocol_config.clone(),
517            )
518            .await;
519            boot_counters[index] += 1;
520            output_receivers.push(receiver);
521            authorities.push(authority);
522        }
523
524        const NUM_TRANSACTIONS: u8 = 15;
525        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
526        for i in 0..NUM_TRANSACTIONS {
527            let txn = vec![i; 16];
528            submitted_transactions.insert(txn.clone());
529            authorities[i as usize % authorities.len()]
530                .transaction_client()
531                .submit(vec![txn])
532                .await
533                .unwrap();
534        }
535
536        for receiver in &mut output_receivers {
537            let mut expected_transactions = submitted_transactions.clone();
538            loop {
539                let committed_subdag =
540                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
541                        .await
542                        .unwrap()
543                        .unwrap();
544                for b in committed_subdag.blocks {
545                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
546                        assert!(
547                            expected_transactions.remove(&txn),
548                            "Transaction not submitted or already seen: {:?}",
549                            txn
550                        );
551                    }
552                }
553                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
554                if expected_transactions.is_empty() {
555                    break;
556                }
557            }
558        }
559
560        // Stop authority 1.
561        let index = committee.to_authority_index(1).unwrap();
562        authorities.remove(index.value()).stop().await;
563        sleep(Duration::from_secs(10)).await;
564
565        // Restart authority 1 and let it run.
566        let (authority, receiver) = make_authority(
567            index,
568            &temp_dirs[index.value()],
569            committee.clone(),
570            keypairs.clone(),
571            network_type,
572            boot_counters[index],
573            protocol_config.clone(),
574        )
575        .await;
576        boot_counters[index] += 1;
577        output_receivers[index] = receiver;
578        authorities.insert(index.value(), authority);
579        sleep(Duration::from_secs(10)).await;
580
581        // Stop all authorities and exit.
582        for authority in authorities {
583            authority.stop().await;
584        }
585    }
586
587    #[rstest]
588    #[tokio::test(flavor = "current_thread")]
589    async fn test_small_committee(
590        #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
591        #[values(1, 2, 3)] num_authorities: usize,
592    ) {
593        let db_registry = Registry::new();
594        DBMetrics::init(&db_registry);
595
596        let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
597        let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
598
599        let temp_dirs = (0..num_authorities)
600            .map(|_| TempDir::new().unwrap())
601            .collect::<Vec<_>>();
602
603        let mut output_receivers = Vec::with_capacity(committee.size());
604        let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
605        let mut boot_counters = vec![0; num_authorities];
606
607        for (index, _authority_info) in committee.authorities() {
608            let (authority, receiver) = make_authority(
609                index,
610                &temp_dirs[index.value()],
611                committee.clone(),
612                keypairs.clone(),
613                network_type,
614                boot_counters[index],
615                protocol_config.clone(),
616            )
617            .await;
618            boot_counters[index] += 1;
619            output_receivers.push(receiver);
620            authorities.push(authority);
621        }
622
623        const NUM_TRANSACTIONS: u8 = 15;
624        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
625        for i in 0..NUM_TRANSACTIONS {
626            let txn = vec![i; 16];
627            submitted_transactions.insert(txn.clone());
628            authorities[i as usize % authorities.len()]
629                .transaction_client()
630                .submit(vec![txn])
631                .await
632                .unwrap();
633        }
634
635        for receiver in &mut output_receivers {
636            let mut expected_transactions = submitted_transactions.clone();
637            loop {
638                let committed_subdag =
639                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
640                        .await
641                        .unwrap()
642                        .unwrap();
643                for b in committed_subdag.blocks {
644                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
645                        assert!(
646                            expected_transactions.remove(&txn),
647                            "Transaction not submitted or already seen: {:?}",
648                            txn
649                        );
650                    }
651                }
652                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
653                if expected_transactions.is_empty() {
654                    break;
655                }
656            }
657        }
658
659        // Stop authority 0.
660        let index = committee.to_authority_index(0).unwrap();
661        authorities.remove(index.value()).stop().await;
662        sleep(Duration::from_secs(10)).await;
663
664        // Restart authority 0 and let it run.
665        let (authority, receiver) = make_authority(
666            index,
667            &temp_dirs[index.value()],
668            committee.clone(),
669            keypairs.clone(),
670            network_type,
671            boot_counters[index],
672            protocol_config.clone(),
673        )
674        .await;
675        boot_counters[index] += 1;
676        output_receivers[index] = receiver;
677        authorities.insert(index.value(), authority);
678        sleep(Duration::from_secs(10)).await;
679
680        // Stop all authorities and exit.
681        for authority in authorities {
682            authority.stop().await;
683        }
684    }
685
686    #[rstest]
687    #[tokio::test(flavor = "current_thread")]
688    async fn test_amnesia_recovery_success(
689        #[values(ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
690        #[values(0, 5, 10)] gc_depth: u32,
691    ) {
692        telemetry_subscribers::init_for_testing();
693        let db_registry = Registry::new();
694        DBMetrics::init(&db_registry);
695
696        const NUM_OF_AUTHORITIES: usize = 4;
697        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
698        let mut output_receivers = vec![];
699        let mut authorities = BTreeMap::new();
700        let mut temp_dirs = BTreeMap::new();
701        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
702
703        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
704        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
705
706        if gc_depth == 0 {
707            protocol_config.set_consensus_linearize_subdag_v2_for_testing(false);
708        }
709
710        for (index, _authority_info) in committee.authorities() {
711            let dir = TempDir::new().unwrap();
712            let (authority, receiver) = make_authority(
713                index,
714                &dir,
715                committee.clone(),
716                keypairs.clone(),
717                network_type,
718                boot_counters[index],
719                protocol_config.clone(),
720            )
721            .await;
722            assert!(
723                authority.sync_last_known_own_block_enabled(),
724                "Expected syncing of last known own block to be enabled as all authorities are of empty db and boot for first time."
725            );
726            boot_counters[index] += 1;
727            output_receivers.push(receiver);
728            authorities.insert(index, authority);
729            temp_dirs.insert(index, dir);
730        }
731
732        // Now we take the receiver of authority 1 and we wait until we see at least one
733        // block committed from this authority We wait until we see at least one
734        // committed block authored from this authority. That way we'll be 100% sure
735        // that at least one block has been proposed and successfully received
736        // by a quorum of nodes.
737        let index_1 = committee.to_authority_index(1).unwrap();
738        'outer: while let Some(result) =
739            timeout(Duration::from_secs(10), output_receivers[index_1].recv())
740                .await
741                .expect("Timed out while waiting for at least one committed block from authority 1")
742        {
743            for block in result.blocks {
744                if block.round() > GENESIS_ROUND && block.author() == index_1 {
745                    break 'outer;
746                }
747            }
748        }
749
750        // Stop authority 1 & 2.
751        // * Authority 1 will be used to wipe out their DB and practically "force" the
752        //   amnesia recovery.
753        // * Authority 2 is stopped in order to simulate less than f+1 availability
754        //   which will
755        // make authority 1 retry during amnesia recovery until it has finally managed
756        // to successfully get back f+1 responses. once authority 2 is up and
757        // running again.
758        authorities.remove(&index_1).unwrap().stop().await;
759        let index_2 = committee.to_authority_index(2).unwrap();
760        authorities.remove(&index_2).unwrap().stop().await;
761        sleep(Duration::from_secs(5)).await;
762
763        // Authority 1: create a new directory to simulate amnesia. The node will start
764        // having participated previously to consensus but now will attempt to
765        // synchronize the last own block and recover from there. It won't be able
766        // to do that successfully as authority 2 is still down.
767        let dir = TempDir::new().unwrap();
768        // We do reset the boot counter for this one to simulate a "binary" restart
769        boot_counters[index_1] = 0;
770        let (authority, mut receiver) = make_authority(
771            index_1,
772            &dir,
773            committee.clone(),
774            keypairs.clone(),
775            network_type,
776            boot_counters[index_1],
777            protocol_config.clone(),
778        )
779        .await;
780        assert!(
781            authority.sync_last_known_own_block_enabled(),
782            "Authority should have the sync of last own block enabled"
783        );
784        boot_counters[index_1] += 1;
785        authorities.insert(index_1, authority);
786        temp_dirs.insert(index_1, dir);
787        sleep(Duration::from_secs(5)).await;
788
789        // Now spin up authority 2 using its earlier directly - so no amnesia recovery
790        // should be forced here. Authority 1 should be able to recover from
791        // amnesia successfully.
792        let (authority, _receiver) = make_authority(
793            index_2,
794            &temp_dirs[&index_2],
795            committee.clone(),
796            keypairs,
797            network_type,
798            boot_counters[index_2],
799            protocol_config.clone(),
800        )
801        .await;
802        assert!(
803            !authority.sync_last_known_own_block_enabled(),
804            "Authority should not have attempted to sync the last own block"
805        );
806        boot_counters[index_2] += 1;
807        authorities.insert(index_2, authority);
808        sleep(Duration::from_secs(5)).await;
809
810        // We wait until we see at least one committed block authored from this
811        // authority
812        'outer: while let Some(result) = receiver.recv().await {
813            for block in result.blocks {
814                if block.round() > GENESIS_ROUND && block.author() == index_1 {
815                    break 'outer;
816                }
817            }
818        }
819
820        // Stop all authorities and exit.
821        for (_, authority) in authorities {
822            authority.stop().await;
823        }
824    }
825
826    // TODO: create a fixture
827    async fn make_authority(
828        index: AuthorityIndex,
829        db_dir: &TempDir,
830        committee: Committee,
831        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
832        network_type: ConsensusNetwork,
833        boot_counter: u64,
834        protocol_config: ProtocolConfig,
835    ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
836        let registry = Registry::new();
837
838        // Cache less blocks to exercise commit sync.
839        let parameters = Parameters {
840            db_path: db_dir.path().to_path_buf(),
841            dag_state_cached_rounds: 5,
842            commit_sync_parallel_fetches: 2,
843            commit_sync_batch_size: 3,
844            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
845            ..Default::default()
846        };
847        let txn_verifier = NoopTransactionVerifier {};
848
849        let protocol_keypair = keypairs[index].1.clone();
850        let network_keypair = keypairs[index].0.clone();
851
852        let (sender, receiver) = unbounded_channel("consensus_output");
853        let commit_consumer = CommitConsumer::new(sender, 0);
854
855        let authority = ConsensusAuthority::start(
856            network_type,
857            index,
858            committee,
859            parameters,
860            protocol_config,
861            protocol_keypair,
862            network_keypair,
863            Arc::new(txn_verifier),
864            commit_consumer,
865            registry,
866            boot_counter,
867        )
868        .await;
869
870        (authority, receiver)
871    }
872}