iota_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet},
7    hash::Hash,
8    num::NonZeroUsize,
9    sync::Arc,
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::{CommitConsumerMonitor, CommitIndex};
15use iota_macros::{fail_point, fail_point_if};
16use iota_metrics::{monitored_mpsc::UnboundedReceiver, monitored_scope, spawn_monitored_task};
17use iota_types::{
18    authenticator_state::ActiveJwk,
19    base_types::{AuthorityName, EpochId, ObjectID, SequenceNumber, TransactionDigest},
20    digests::ConsensusCommitDigest,
21    executable_transaction::{TrustedExecutableTransaction, VerifiedExecutableTransaction},
22    iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
23    messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
24    transaction::{SenderSignedData, VerifiedTransaction},
25};
26use lru::LruCache;
27use serde::{Deserialize, Serialize};
28use tracing::{debug, info, instrument, trace_span, warn};
29
30use crate::{
31    authority::{
32        AuthorityMetrics, AuthorityState,
33        authority_per_epoch_store::{
34            AuthorityPerEpochStore, ConsensusStats, ConsensusStatsAPI, ExecutionIndices,
35            ExecutionIndicesWithStats,
36        },
37        backpressure::{BackpressureManager, BackpressureSubscriber},
38        epoch_start_configuration::EpochStartConfigTrait,
39    },
40    checkpoints::{CheckpointService, CheckpointServiceNotify},
41    consensus_types::{AuthorityIndex, consensus_output_api::ConsensusOutputAPI},
42    execution_cache::{ObjectCacheRead, TransactionCacheRead},
43    scoring_decision::update_low_scoring_authorities,
44    transaction_manager::TransactionManager,
45};
46
47pub struct ConsensusHandlerInitializer {
48    state: Arc<AuthorityState>,
49    checkpoint_service: Arc<CheckpointService>,
50    epoch_store: Arc<AuthorityPerEpochStore>,
51    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
52    backpressure_manager: Arc<BackpressureManager>,
53}
54
55impl ConsensusHandlerInitializer {
56    pub fn new(
57        state: Arc<AuthorityState>,
58        checkpoint_service: Arc<CheckpointService>,
59        epoch_store: Arc<AuthorityPerEpochStore>,
60        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
61        backpressure_manager: Arc<BackpressureManager>,
62    ) -> Self {
63        Self {
64            state,
65            checkpoint_service,
66            epoch_store,
67            low_scoring_authorities,
68            backpressure_manager,
69        }
70    }
71
72    pub fn new_for_testing(
73        state: Arc<AuthorityState>,
74        checkpoint_service: Arc<CheckpointService>,
75    ) -> Self {
76        let backpressure_manager = BackpressureManager::new_for_tests();
77        Self {
78            state: state.clone(),
79            checkpoint_service,
80            epoch_store: state.epoch_store_for_testing().clone(),
81            low_scoring_authorities: Arc::new(Default::default()),
82            backpressure_manager,
83        }
84    }
85
86    pub fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
87        let new_epoch_start_state = self.epoch_store.epoch_start_state();
88        let consensus_committee = new_epoch_start_state.get_consensus_committee();
89
90        ConsensusHandler::new(
91            self.epoch_store.clone(),
92            self.checkpoint_service.clone(),
93            self.state.transaction_manager().clone(),
94            self.state.get_object_cache_reader().clone(),
95            self.state.get_transaction_cache_reader().clone(),
96            self.low_scoring_authorities.clone(),
97            consensus_committee,
98            self.state.metrics.clone(),
99            self.backpressure_manager.subscribe(),
100        )
101    }
102}
103
104pub struct ConsensusHandler<C> {
105    /// A store created for each epoch. ConsensusHandler is recreated each
106    /// epoch, with the corresponding store. This store is also used to get
107    /// the current epoch ID.
108    epoch_store: Arc<AuthorityPerEpochStore>,
109    /// Holds the indices, hash and stats after the last consensus commit
110    /// It is used for avoiding replaying already processed transactions,
111    /// checking chain consistency, and accumulating per-epoch consensus output
112    /// stats.
113    last_consensus_stats: ExecutionIndicesWithStats,
114    checkpoint_service: Arc<C>,
115    /// cache reader is needed when determining the next version to assign for
116    /// shared objects.
117    cache_reader: Arc<dyn ObjectCacheRead>,
118    /// used to read randomness transactions during crash recovery
119    tx_reader: Arc<dyn TransactionCacheRead>,
120    /// Reputation scores used by consensus adapter that we update, forwarded
121    /// from consensus
122    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
123    /// The consensus committee used to do stake computations for deciding set
124    /// of low scoring authorities
125    committee: ConsensusCommittee,
126    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
127    // a new metrics type here if we want to.
128    metrics: Arc<AuthorityMetrics>,
129    /// Lru cache to quickly discard transactions processed by consensus
130    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
131    transaction_scheduler: AsyncTransactionScheduler,
132
133    backpressure_subscriber: BackpressureSubscriber,
134}
135
136const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
137
138impl<C> ConsensusHandler<C> {
139    pub fn new(
140        epoch_store: Arc<AuthorityPerEpochStore>,
141        checkpoint_service: Arc<C>,
142        transaction_manager: Arc<TransactionManager>,
143        cache_reader: Arc<dyn ObjectCacheRead>,
144        tx_reader: Arc<dyn TransactionCacheRead>,
145        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
146        committee: ConsensusCommittee,
147        metrics: Arc<AuthorityMetrics>,
148        backpressure_subscriber: BackpressureSubscriber,
149    ) -> Self {
150        // Recover last_consensus_stats so it is consistent across validators.
151        let mut last_consensus_stats = epoch_store
152            .get_last_consensus_stats()
153            .expect("Should be able to read last consensus index");
154        // stats is empty at the beginning of epoch.
155        if !last_consensus_stats.stats.is_initialized() {
156            last_consensus_stats.stats = ConsensusStats::new(committee.size());
157        }
158        let transaction_scheduler =
159            AsyncTransactionScheduler::start(transaction_manager, epoch_store.clone());
160        Self {
161            epoch_store,
162            last_consensus_stats,
163            checkpoint_service,
164            cache_reader,
165            tx_reader,
166            low_scoring_authorities,
167            committee,
168            metrics,
169            processed_cache: LruCache::new(NonZeroUsize::new(PROCESSED_CACHE_CAP).unwrap()),
170            transaction_scheduler,
171            backpressure_subscriber,
172        }
173    }
174
175    /// Returns the last subdag index processed by the handler.
176    pub fn last_processed_subdag_index(&self) -> u64 {
177        self.last_consensus_stats.index.sub_dag_index
178    }
179}
180
181impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
182    /// Called during startup to allow us to observe commits we previously
183    /// processed, for crash recovery. Any state computed here must be a
184    /// pure function of the commits observed, it cannot depend on any state
185    /// recorded in the epoch db.
186    fn handle_prior_consensus_output(&mut self, consensus_commit: impl ConsensusOutputAPI) {
187        // TODO: this will be used to recover state computed from previous commits at
188        // startup.
189        let round = consensus_commit.leader_round();
190        info!("Ignoring prior consensus commit for round {:?}", round);
191    }
192
193    #[instrument("handle_consensus_output", level = "trace", skip_all)]
194    async fn handle_consensus_output(&mut self, consensus_output: impl ConsensusOutputAPI) {
195        // This may block until one of two conditions happens:
196        // - Number of uncommitted transactions in the writeback cache goes below the
197        //   backpressure threshold.
198        // - The highest executed checkpoint catches up to the highest certified
199        //   checkpoint.
200        self.backpressure_subscriber.await_no_backpressure().await;
201
202        let _scope = monitored_scope("HandleConsensusOutput");
203
204        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
205
206        let round = consensus_output.leader_round();
207
208        // TODO: Is this check necessary? For now mysticeti will not
209        // return more than one leader per round so we are not in danger of
210        // ignoring any commits.
211        assert!(
212            round >= last_committed_round,
213            "Consensus output round {round} is less than last committed round {last_committed_round}"
214        );
215        if last_committed_round == round {
216            // we can receive the same commit twice after restart
217            // It is critical that the writes done by this function are atomic - otherwise
218            // we can lose the later parts of a commit if we restart midway
219            // through processing it.
220            warn!(
221                "Ignoring consensus output for round {} as it is already committed. NOTE: This is only expected if consensus is running.",
222                round
223            );
224            return;
225        }
226
227        // (serialized, transaction, output_cert)
228        let mut transactions = vec![];
229        let leader_author = consensus_output.leader_author_index();
230        let commit_sub_dag_index = consensus_output.commit_sub_dag_index();
231
232        debug!(
233            %consensus_output,
234            epoch = ?self.epoch_store.epoch(),
235            "Received consensus output"
236        );
237
238        let execution_index = ExecutionIndices {
239            last_committed_round: round,
240            sub_dag_index: commit_sub_dag_index,
241            transaction_index: 0_u64,
242        };
243        // This function has filtered out any already processed consensus output.
244        // So we can safely assume that the index is always increasing.
245        assert!(self.last_consensus_stats.index < execution_index);
246
247        // TODO: test empty commit explicitly.
248        // Note that consensus commit batch may contain no transactions, but we still
249        // need to record the current round and subdag index in the
250        // last_consensus_stats, so that it won't be re-executed in the future.
251        self.last_consensus_stats.index = execution_index;
252
253        // Load all jwks that became active in the previous round, and commit them in
254        // this round. We want to delay one round because none of the
255        // transactions in the previous round could have been authenticated with
256        // the jwks that became active in that round.
257        //
258        // Because of this delay, jwks that become active in the last round of the epoch
259        // will never be committed. That is ok, because in the new epoch, the
260        // validators should immediately re-submit these jwks, and they can
261        // become active then.
262        let new_jwks = self
263            .epoch_store
264            .get_new_jwks(last_committed_round)
265            .expect("Unrecoverable error in consensus handler");
266
267        if !new_jwks.is_empty() {
268            let authenticator_state_update_transaction =
269                self.authenticator_state_update_transaction(round, new_jwks);
270            debug!(
271                "adding AuthenticatorStateUpdateV1({:?}) tx: {:?}",
272                authenticator_state_update_transaction.digest(),
273                authenticator_state_update_transaction,
274            );
275
276            transactions.push((
277                SequencedConsensusTransactionKind::System(authenticator_state_update_transaction),
278                leader_author,
279            ));
280        }
281
282        update_low_scoring_authorities(
283            self.low_scoring_authorities.clone(),
284            self.epoch_store.committee(),
285            &self.committee,
286            consensus_output.reputation_score_sorted_desc(),
287            &self.metrics,
288            self.epoch_store
289                .protocol_config()
290                .consensus_bad_nodes_stake_threshold(),
291        );
292
293        self.metrics
294            .consensus_committed_subdags
295            .with_label_values(&[&leader_author.to_string()])
296            .inc();
297
298        for (authority_index, number_of_committed_headers) in
299            consensus_output.number_of_headers_in_commit_by_authority()
300        {
301            self.last_consensus_stats
302                .stats
303                .inc_num_messages(authority_index as usize, number_of_committed_headers);
304        }
305
306        {
307            let span = trace_span!("process_consensus_certs");
308            let _guard = span.enter();
309            for (authority_index, authority_transactions) in consensus_output.transactions() {
310                // TODO: consider only messages within 1~3 rounds of the leader?
311                for (transaction, serialized_len) in authority_transactions {
312                    let kind = classify(&transaction);
313                    self.metrics
314                        .consensus_handler_processed
315                        .with_label_values(&[kind])
316                        .inc();
317                    self.metrics
318                        .consensus_handler_transaction_sizes
319                        .with_label_values(&[kind])
320                        .observe(serialized_len as f64);
321                    if matches!(
322                        &transaction.kind,
323                        ConsensusTransactionKind::CertifiedTransaction(_)
324                    ) {
325                        self.last_consensus_stats
326                            .stats
327                            .inc_num_user_transactions(authority_index as usize);
328                    }
329                    let transaction = SequencedConsensusTransactionKind::External(transaction);
330                    transactions.push((transaction, authority_index));
331                }
332            }
333        }
334
335        for (i, authority) in self.committee.authorities() {
336            let hostname = &authority.hostname;
337            self.metrics
338                .consensus_committed_messages
339                .with_label_values(&[hostname])
340                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
341            self.metrics
342                .consensus_committed_user_transactions
343                .with_label_values(&[hostname])
344                .set(
345                    self.last_consensus_stats
346                        .stats
347                        .get_num_user_transactions(i.value()) as i64,
348                );
349        }
350
351        let mut all_transactions = Vec::new();
352        {
353            // We need a set here as well, since the processed_cache is a LRU cache and can
354            // drop entries while we're iterating over the sequenced
355            // transactions.
356            let mut processed_set = HashSet::new();
357
358            for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
359                // In process_consensus_transactions_and_commit_boundary(), we will add a system
360                // consensus commit prologue transaction, which will be the
361                // first transaction in this consensus commit batch. Therefore,
362                // the transaction sequence number starts from 1 here.
363                let current_tx_index = ExecutionIndices {
364                    last_committed_round: round,
365                    sub_dag_index: commit_sub_dag_index,
366                    transaction_index: (seq + 1) as u64,
367                };
368
369                self.last_consensus_stats.index = current_tx_index;
370
371                let certificate_author = *self
372                    .epoch_store
373                    .committee()
374                    .authority_by_index(cert_origin)
375                    .unwrap();
376
377                let sequenced_transaction = SequencedConsensusTransaction {
378                    certificate_author_index: cert_origin,
379                    certificate_author,
380                    consensus_index: current_tx_index,
381                    transaction,
382                };
383
384                let key = sequenced_transaction.key();
385                let in_set = !processed_set.insert(key);
386                let in_cache = self
387                    .processed_cache
388                    .put(sequenced_transaction.key(), ())
389                    .is_some();
390
391                if in_set || in_cache {
392                    self.metrics.skipped_consensus_txns_cache_hit.inc();
393                    continue;
394                }
395
396                all_transactions.push(sequenced_transaction);
397            }
398        }
399
400        let transactions_to_schedule = self
401            .epoch_store
402            .process_consensus_transactions_and_commit_boundary(
403                all_transactions,
404                &self.last_consensus_stats,
405                &self.checkpoint_service,
406                self.cache_reader.as_ref(),
407                self.tx_reader.as_ref(),
408                &ConsensusCommitInfo::new(&consensus_output),
409                &self.metrics,
410            )
411            .await
412            .expect("Unrecoverable error in consensus handler");
413
414        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
415            let key = [commit_sub_dag_index, self.epoch_store.epoch()];
416            if iota_simulator::random::deterministic_probability_once(key, 0.01) {
417                iota_simulator::task::kill_current_node(None);
418            }
419        });
420
421        fail_point!("crash"); // for tests that produce random crashes
422
423        self.transaction_scheduler
424            .schedule(transactions_to_schedule)
425            .await;
426    }
427}
428
429struct AsyncTransactionScheduler {
430    sender: tokio::sync::mpsc::Sender<Vec<VerifiedExecutableTransaction>>,
431}
432
433impl AsyncTransactionScheduler {
434    pub fn start(
435        transaction_manager: Arc<TransactionManager>,
436        epoch_store: Arc<AuthorityPerEpochStore>,
437    ) -> Self {
438        let (sender, recv) = tokio::sync::mpsc::channel(16);
439        spawn_monitored_task!(Self::run(recv, transaction_manager, epoch_store));
440        Self { sender }
441    }
442
443    pub async fn schedule(&self, transactions: Vec<VerifiedExecutableTransaction>) {
444        tracing::trace_span!("transaction_scheduler_enqueue");
445        self.sender.send(transactions).await.ok();
446    }
447
448    pub async fn run(
449        mut recv: tokio::sync::mpsc::Receiver<Vec<VerifiedExecutableTransaction>>,
450        transaction_manager: Arc<TransactionManager>,
451        epoch_store: Arc<AuthorityPerEpochStore>,
452    ) {
453        while let Some(transactions) = recv.recv().await {
454            let _guard = monitored_scope("ConsensusHandler::enqueue");
455            transaction_manager.enqueue(transactions, &epoch_store);
456        }
457    }
458}
459
460/// Consensus handler used by Mysticeti. Since Mysticeti repo is not yet
461/// integrated, we use a channel to receive the consensus output from Mysticeti.
462/// During initialization, the sender is passed into Mysticeti which can send
463/// consensus output to the channel.
464pub struct MysticetiConsensusHandler {
465    handle: Option<tokio::task::JoinHandle<()>>,
466}
467
468impl MysticetiConsensusHandler {
469    pub fn new(
470        last_processed_commit_at_startup: CommitIndex,
471        mut consensus_handler: ConsensusHandler<CheckpointService>,
472        mut receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
473        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
474    ) -> Self {
475        let handle = spawn_monitored_task!(async move {
476            // TODO: pause when execution is overloaded, so consensus can detect the
477            // backpressure.
478            while let Some(consensus_output) = receiver.recv().await {
479                let commit_index = consensus_output.commit_ref.index;
480                if commit_index <= last_processed_commit_at_startup {
481                    consensus_handler.handle_prior_consensus_output(consensus_output);
482                } else {
483                    consensus_handler
484                        .handle_consensus_output(consensus_output)
485                        .await;
486                    commit_consumer_monitor.set_highest_handled_commit(commit_index);
487                }
488            }
489        });
490        Self {
491            handle: Some(handle),
492        }
493    }
494
495    pub async fn abort(&mut self) {
496        if let Some(handle) = self.handle.take() {
497            handle.abort();
498            let _ = handle.await;
499        }
500    }
501}
502
503impl Drop for MysticetiConsensusHandler {
504    fn drop(&mut self) {
505        if let Some(handle) = self.handle.take() {
506            handle.abort();
507        }
508    }
509}
510
511/// Consensus handler used by Starfish.
512/// During initialization, the sender is passed into Starfish which can send
513/// consensus output to the channel.
514pub struct StarfishConsensusHandler {
515    handle: Option<tokio::task::JoinHandle<()>>,
516}
517
518impl StarfishConsensusHandler {
519    pub fn new(
520        last_processed_commit_at_startup: starfish_core::CommitIndex,
521        mut consensus_handler: ConsensusHandler<CheckpointService>,
522        mut receiver: UnboundedReceiver<starfish_core::CommittedSubDag>,
523        commit_consumer_monitor: Arc<starfish_core::CommitConsumerMonitor>,
524    ) -> Self {
525        let handle = spawn_monitored_task!(async move {
526            // TODO: pause when execution is overloaded, so consensus can detect the
527            // backpressure.
528            while let Some(consensus_output) = receiver.recv().await {
529                let commit_index = consensus_output.commit_ref.index;
530                if commit_index <= last_processed_commit_at_startup {
531                    consensus_handler.handle_prior_consensus_output(consensus_output);
532                } else {
533                    consensus_handler
534                        .handle_consensus_output(consensus_output)
535                        .await;
536                    commit_consumer_monitor.set_highest_handled_commit(commit_index);
537                }
538            }
539        });
540        Self {
541            handle: Some(handle),
542        }
543    }
544
545    pub async fn abort(&mut self) {
546        if let Some(handle) = self.handle.take() {
547            handle.abort();
548            let _ = handle.await;
549        }
550    }
551}
552
553impl Drop for StarfishConsensusHandler {
554    fn drop(&mut self) {
555        if let Some(handle) = self.handle.take() {
556            handle.abort();
557        }
558    }
559}
560
561impl<C> ConsensusHandler<C> {
562    fn authenticator_state_update_transaction(
563        &self,
564        round: u64,
565        mut new_active_jwks: Vec<ActiveJwk>,
566    ) -> VerifiedExecutableTransaction {
567        new_active_jwks.sort();
568
569        info!("creating authenticator state update transaction");
570        assert!(self.epoch_store.authenticator_state_enabled());
571        let transaction = VerifiedTransaction::new_authenticator_state_update(
572            self.epoch(),
573            round,
574            new_active_jwks,
575            self.epoch_store
576                .epoch_start_config()
577                .authenticator_obj_initial_shared_version()
578                .expect("authenticator state obj must exist"),
579        );
580        VerifiedExecutableTransaction::new_system(transaction, self.epoch())
581    }
582
583    fn epoch(&self) -> EpochId {
584        self.epoch_store.epoch()
585    }
586}
587
588pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
589    match &transaction.kind {
590        ConsensusTransactionKind::CertifiedTransaction(certificate) => {
591            if certificate.contains_shared_object() {
592                "shared_certificate"
593            } else {
594                "owned_certificate"
595            }
596        }
597        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
598        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
599        ConsensusTransactionKind::CapabilityNotificationV1(_) => "capability_notification_v1",
600        ConsensusTransactionKind::SignedCapabilityNotificationV1(_) => {
601            "signed_capability_notification_v1"
602        }
603        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
604        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
605        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
606    }
607}
608
609#[derive(Debug, Clone, Serialize, Deserialize)]
610pub struct SequencedConsensusTransaction {
611    pub certificate_author_index: AuthorityIndex,
612    pub certificate_author: AuthorityName,
613    pub consensus_index: ExecutionIndices,
614    pub transaction: SequencedConsensusTransactionKind,
615}
616
617#[derive(Debug, Clone)]
618pub enum SequencedConsensusTransactionKind {
619    External(ConsensusTransaction),
620    System(VerifiedExecutableTransaction),
621}
622
623impl Serialize for SequencedConsensusTransactionKind {
624    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
625        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
626        serializable.serialize(serializer)
627    }
628}
629
630impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
631    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
632        let serializable =
633            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
634        Ok(serializable.into())
635    }
636}
637
638// We can't serialize SequencedConsensusTransactionKind directly because it
639// contains a VerifiedExecutableTransaction, which is not serializable (by
640// design). This wrapper allows us to convert to a serializable format easily.
641#[derive(Debug, Clone, Serialize, Deserialize)]
642enum SerializableSequencedConsensusTransactionKind {
643    External(Box<ConsensusTransaction>),
644    System(Box<TrustedExecutableTransaction>),
645}
646
647impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
648    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
649        match kind {
650            SequencedConsensusTransactionKind::External(ext) => {
651                SerializableSequencedConsensusTransactionKind::External(Box::new(ext.clone()))
652            }
653            SequencedConsensusTransactionKind::System(txn) => {
654                SerializableSequencedConsensusTransactionKind::System(Box::new(
655                    txn.clone().serializable(),
656                ))
657            }
658        }
659    }
660}
661
662impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
663    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
664        match kind {
665            SerializableSequencedConsensusTransactionKind::External(ext) => {
666                SequencedConsensusTransactionKind::External(*ext)
667            }
668            SerializableSequencedConsensusTransactionKind::System(txn) => {
669                SequencedConsensusTransactionKind::System((*txn).into())
670            }
671        }
672    }
673}
674
675#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
676pub enum SequencedConsensusTransactionKey {
677    External(ConsensusTransactionKey),
678    System(TransactionDigest),
679}
680
681impl SequencedConsensusTransactionKind {
682    pub fn key(&self) -> SequencedConsensusTransactionKey {
683        match self {
684            SequencedConsensusTransactionKind::External(ext) => {
685                SequencedConsensusTransactionKey::External(ext.key())
686            }
687            SequencedConsensusTransactionKind::System(txn) => {
688                SequencedConsensusTransactionKey::System(*txn.digest())
689            }
690        }
691    }
692
693    pub fn get_tracking_id(&self) -> u64 {
694        match self {
695            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
696            SequencedConsensusTransactionKind::System(_txn) => 0,
697        }
698    }
699
700    pub fn is_executable_transaction(&self) -> bool {
701        match self {
702            SequencedConsensusTransactionKind::External(ext) => ext.is_user_certificate(),
703            SequencedConsensusTransactionKind::System(_) => true,
704        }
705    }
706
707    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
708        match self {
709            SequencedConsensusTransactionKind::External(ext) => {
710                if let ConsensusTransactionKind::CertifiedTransaction(txn) = &ext.kind {
711                    Some(*txn.digest())
712                } else {
713                    None
714                }
715            }
716            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
717        }
718    }
719
720    pub fn is_end_of_publish(&self) -> bool {
721        match self {
722            SequencedConsensusTransactionKind::External(ext) => {
723                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
724            }
725            SequencedConsensusTransactionKind::System(_) => false,
726        }
727    }
728}
729
730impl SequencedConsensusTransaction {
731    pub fn sender_authority(&self) -> AuthorityName {
732        self.certificate_author
733    }
734
735    pub fn key(&self) -> SequencedConsensusTransactionKey {
736        self.transaction.key()
737    }
738
739    pub fn is_end_of_publish(&self) -> bool {
740        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
741            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
742        } else {
743            false
744        }
745    }
746
747    pub fn is_system(&self) -> bool {
748        matches!(
749            self.transaction,
750            SequencedConsensusTransactionKind::System(_)
751        )
752    }
753
754    pub fn is_user_tx_with_randomness(&self) -> bool {
755        let SequencedConsensusTransactionKind::External(ConsensusTransaction {
756            kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
757            ..
758        }) = &self.transaction
759        else {
760            return false;
761        };
762        certificate.transaction_data().uses_randomness()
763    }
764
765    pub fn as_shared_object_txn(&self) -> Option<&SenderSignedData> {
766        match &self.transaction {
767            SequencedConsensusTransactionKind::External(ConsensusTransaction {
768                kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
769                ..
770            }) if certificate.contains_shared_object() => Some(certificate.data()),
771            SequencedConsensusTransactionKind::System(txn) if txn.contains_shared_object() => {
772                Some(txn.data())
773            }
774            _ => None,
775        }
776    }
777}
778
779#[derive(Debug, Clone, Serialize, Deserialize)]
780pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
781
782#[cfg(test)]
783impl VerifiedSequencedConsensusTransaction {
784    pub fn new_test(transaction: ConsensusTransaction) -> Self {
785        Self(SequencedConsensusTransaction::new_test(transaction))
786    }
787}
788
789impl SequencedConsensusTransaction {
790    pub fn new_test(transaction: ConsensusTransaction) -> Self {
791        Self {
792            certificate_author_index: 0,
793            certificate_author: AuthorityName::ZERO,
794            consensus_index: Default::default(),
795            transaction: SequencedConsensusTransactionKind::External(transaction),
796        }
797    }
798}
799
800/// Represents the information from the current consensus commit.
801pub struct ConsensusCommitInfo {
802    pub round: u64,
803    pub timestamp: u64,
804    pub consensus_commit_digest: ConsensusCommitDigest,
805
806    skip_consensus_commit_prologue_in_test: bool,
807}
808
809impl ConsensusCommitInfo {
810    fn new(consensus_output: &impl ConsensusOutputAPI) -> Self {
811        Self {
812            round: consensus_output.leader_round(),
813            timestamp: consensus_output.commit_timestamp_ms(),
814            consensus_commit_digest: consensus_output.consensus_digest(),
815
816            skip_consensus_commit_prologue_in_test: false,
817        }
818    }
819
820    pub fn new_for_test(
821        commit_round: u64,
822        commit_timestamp: u64,
823        skip_consensus_commit_prologue_in_test: bool,
824    ) -> Self {
825        Self {
826            round: commit_round,
827            timestamp: commit_timestamp,
828            consensus_commit_digest: ConsensusCommitDigest::default(),
829            skip_consensus_commit_prologue_in_test,
830        }
831    }
832
833    pub fn skip_consensus_commit_prologue_in_test(&self) -> bool {
834        self.skip_consensus_commit_prologue_in_test
835    }
836
837    fn consensus_commit_prologue_v1_transaction(
838        &self,
839        epoch: u64,
840        cancelled_txn_version_assignment: Vec<(TransactionDigest, Vec<(ObjectID, SequenceNumber)>)>,
841    ) -> VerifiedExecutableTransaction {
842        let transaction = VerifiedTransaction::new_consensus_commit_prologue_v1(
843            epoch,
844            self.round,
845            self.timestamp,
846            self.consensus_commit_digest,
847            cancelled_txn_version_assignment,
848        );
849        VerifiedExecutableTransaction::new_system(transaction, epoch)
850    }
851
852    pub fn create_consensus_commit_prologue_transaction(
853        &self,
854        epoch: u64,
855        cancelled_txn_version_assignment: Vec<(TransactionDigest, Vec<(ObjectID, SequenceNumber)>)>,
856    ) -> VerifiedExecutableTransaction {
857        self.consensus_commit_prologue_v1_transaction(epoch, cancelled_txn_version_assignment)
858    }
859}
860
861#[cfg(test)]
862mod tests {
863    use consensus_core::{
864        BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
865    };
866    use futures::pin_mut;
867    use iota_protocol_config::{Chain, ConsensusTransactionOrdering};
868    use iota_types::{
869        base_types::{AuthorityName, IotaAddress, random_object_ref},
870        committee::Committee,
871        messages_consensus::{
872            AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
873        },
874        object::Object,
875        supported_protocol_versions::{
876            SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
877        },
878        transaction::{
879            CertifiedTransaction, SenderSignedData, TransactionData, TransactionDataAPI,
880        },
881    };
882    use prometheus::Registry;
883
884    use super::*;
885    use crate::{
886        authority::{
887            authority_per_epoch_store::ConsensusStatsAPI,
888            test_authority_builder::TestAuthorityBuilder,
889        },
890        checkpoints::CheckpointServiceNoop,
891        consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
892        post_consensus_tx_reorder::PostConsensusTxReorder,
893    };
894
895    #[tokio::test(flavor = "current_thread", start_paused = true)]
896    pub async fn test_consensus_handler() {
897        // GIVEN
898        let mut objects = test_gas_objects();
899        let shared_object = Object::shared_for_testing();
900        objects.push(shared_object.clone());
901
902        let network_config =
903            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
904                .with_objects(objects.clone())
905                .build();
906
907        let state = TestAuthorityBuilder::new()
908            .with_network_config(&network_config, 0)
909            .build()
910            .await;
911
912        let epoch_store = state.epoch_store_for_testing().clone();
913        let new_epoch_start_state = epoch_store.epoch_start_state();
914        let consensus_committee = new_epoch_start_state.get_consensus_committee();
915
916        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
917
918        let backpressure_manager = BackpressureManager::new_for_tests();
919
920        let mut consensus_handler = ConsensusHandler::new(
921            epoch_store,
922            Arc::new(CheckpointServiceNoop {}),
923            state.transaction_manager().clone(),
924            state.get_object_cache_reader().clone(),
925            state.get_transaction_cache_reader().clone(),
926            Arc::new(ArcSwap::default()),
927            consensus_committee.clone(),
928            metrics,
929            backpressure_manager.subscribe(),
930        );
931
932        // AND
933        // Create test transactions
934        let transactions = test_certificates(&state, shared_object).await;
935        let mut blocks = Vec::new();
936
937        for (i, transaction) in transactions.iter().enumerate() {
938            let transaction_bytes: Vec<u8> = bcs::to_bytes(
939                &ConsensusTransaction::new_certificate_message(&state.name, transaction.clone()),
940            )
941            .unwrap();
942
943            // AND create block for each transaction
944            let block = VerifiedBlock::new_for_test(
945                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
946                    .set_transactions(vec![Transaction::new(transaction_bytes)])
947                    .build(),
948            );
949
950            blocks.push(block);
951        }
952
953        // AND create the consensus output
954        let leader_block = blocks[0].clone();
955        let committed_sub_dag = CommittedSubDag::new(
956            leader_block.reference(),
957            blocks.clone(),
958            leader_block.timestamp_ms(),
959            CommitRef::new(10, CommitDigest::MIN),
960            vec![],
961        );
962
963        // Test that the consensus handler respects backpressure.
964        backpressure_manager.set_backpressure(true);
965        // Default watermarks are 0,0 which will suppress the backpressure.
966        backpressure_manager.update_highest_certified_checkpoint(1);
967
968        // AND processing the consensus output once
969        {
970            let waiter = consensus_handler.handle_consensus_output(committed_sub_dag.clone());
971            pin_mut!(waiter);
972
973            // waiter should not complete within 5 seconds
974            tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
975                .await
976                .unwrap_err();
977
978            // lift backpressure
979            backpressure_manager.set_backpressure(false);
980
981            // waiter completes now.
982            tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
983                .await
984                .unwrap();
985        }
986
987        // AND capturing the consensus stats
988        let num_blocks = blocks.len();
989        let num_transactions = transactions.len();
990        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
991        assert_eq!(
992            last_consensus_stats_1.index.transaction_index,
993            num_transactions as u64
994        );
995        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
996        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
997        assert_eq!(last_consensus_stats_1.hash, 0);
998        assert_eq!(
999            last_consensus_stats_1.stats.get_num_messages(0),
1000            num_blocks as u64
1001        );
1002        assert_eq!(
1003            last_consensus_stats_1.stats.get_num_user_transactions(0),
1004            num_transactions as u64
1005        );
1006
1007        // WHEN processing the same output multiple times
1008        // THEN the consensus stats do not update
1009        for _ in 0..2 {
1010            consensus_handler
1011                .handle_consensus_output(committed_sub_dag.clone())
1012                .await;
1013            let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone();
1014            assert_eq!(last_consensus_stats_1, last_consensus_stats_2);
1015        }
1016    }
1017
1018    #[test]
1019    fn test_order_by_gas_price() {
1020        let chain = Chain::Unknown;
1021        let mut v = vec![
1022            cap_txn(10, chain),
1023            user_txn(42),
1024            user_txn(100),
1025            cap_txn(1, chain),
1026        ];
1027        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
1028        assert_eq!(
1029            extract(v),
1030            vec![
1031                "cap(10)".to_string(),
1032                "cap(1)".to_string(),
1033                "user(100)".to_string(),
1034                "user(42)".to_string(),
1035            ]
1036        );
1037
1038        let mut v = vec![
1039            user_txn(1200),
1040            cap_txn(10, chain),
1041            user_txn(12),
1042            user_txn(1000),
1043            user_txn(42),
1044            user_txn(100),
1045            cap_txn(1, chain),
1046            user_txn(1000),
1047        ];
1048        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
1049        assert_eq!(
1050            extract(v),
1051            vec![
1052                "cap(10)".to_string(),
1053                "cap(1)".to_string(),
1054                "user(1200)".to_string(),
1055                "user(1000)".to_string(),
1056                "user(1000)".to_string(),
1057                "user(100)".to_string(),
1058                "user(42)".to_string(),
1059                "user(12)".to_string(),
1060            ]
1061        );
1062
1063        // If there are no user transactions, the order should be preserved.
1064        let mut v = vec![
1065            cap_txn(10, chain),
1066            eop_txn(12),
1067            eop_txn(10),
1068            cap_txn(1, chain),
1069            eop_txn(11),
1070        ];
1071        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
1072        assert_eq!(
1073            extract(v),
1074            vec![
1075                "cap(10)".to_string(),
1076                "eop(12)".to_string(),
1077                "eop(10)".to_string(),
1078                "cap(1)".to_string(),
1079                "eop(11)".to_string(),
1080            ]
1081        );
1082    }
1083
1084    fn extract(v: Vec<VerifiedSequencedConsensusTransaction>) -> Vec<String> {
1085        v.into_iter().map(extract_one).collect()
1086    }
1087
1088    fn extract_one(t: VerifiedSequencedConsensusTransaction) -> String {
1089        match t.0.transaction {
1090            SequencedConsensusTransactionKind::External(ext) => match ext.kind {
1091                ConsensusTransactionKind::EndOfPublish(authority) => {
1092                    format!("eop({})", authority.0[0])
1093                }
1094                ConsensusTransactionKind::CapabilityNotificationV1(cap) => {
1095                    format!("cap({})", cap.generation)
1096                }
1097                ConsensusTransactionKind::CertifiedTransaction(txn) => {
1098                    format!("user({})", txn.transaction_data().gas_price())
1099                }
1100                _ => unreachable!(),
1101            },
1102            SequencedConsensusTransactionKind::System(_) => unreachable!(),
1103        }
1104    }
1105
1106    fn eop_txn(a: u8) -> VerifiedSequencedConsensusTransaction {
1107        let mut authority = AuthorityName::default();
1108        authority.0[0] = a;
1109        txn(ConsensusTransactionKind::EndOfPublish(authority))
1110    }
1111
1112    fn cap_txn(generation: u64, chain: Chain) -> VerifiedSequencedConsensusTransaction {
1113        txn(ConsensusTransactionKind::CapabilityNotificationV1(
1114            // we don't use the "new" constructor because we need to set the generation
1115            AuthorityCapabilitiesV1 {
1116                authority: Default::default(),
1117                generation,
1118                supported_protocol_versions:
1119                    SupportedProtocolVersionsWithHashes::from_supported_versions(
1120                        SupportedProtocolVersions::SYSTEM_DEFAULT,
1121                        chain,
1122                    ),
1123                available_system_packages: vec![],
1124            },
1125        ))
1126    }
1127
1128    fn user_txn(gas_price: u64) -> VerifiedSequencedConsensusTransaction {
1129        let (committee, keypairs) = Committee::new_simple_test_committee();
1130        let data = SenderSignedData::new(
1131            TransactionData::new_transfer(
1132                IotaAddress::default(),
1133                random_object_ref(),
1134                IotaAddress::default(),
1135                random_object_ref(),
1136                1000 * gas_price,
1137                gas_price,
1138            ),
1139            vec![],
1140        );
1141        txn(ConsensusTransactionKind::CertifiedTransaction(Box::new(
1142            CertifiedTransaction::new_from_keypairs_for_testing(data, &keypairs, &committee),
1143        )))
1144    }
1145
1146    fn txn(kind: ConsensusTransactionKind) -> VerifiedSequencedConsensusTransaction {
1147        VerifiedSequencedConsensusTransaction::new_test(ConsensusTransaction {
1148            kind,
1149            tracking_id: Default::default(),
1150        })
1151    }
1152}