iota_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet},
7    hash::Hash,
8    num::NonZeroUsize,
9    sync::Arc,
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::CommitConsumerMonitor;
15use iota_macros::{fail_point_async, fail_point_if};
16use iota_metrics::{monitored_mpsc::UnboundedReceiver, monitored_scope, spawn_monitored_task};
17use iota_types::{
18    authenticator_state::ActiveJwk,
19    base_types::{AuthorityName, EpochId, ObjectID, SequenceNumber, TransactionDigest},
20    digests::ConsensusCommitDigest,
21    executable_transaction::{TrustedExecutableTransaction, VerifiedExecutableTransaction},
22    iota_system_state::epoch_start_iota_system_state::EpochStartSystemStateTrait,
23    messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
24    transaction::{SenderSignedData, VerifiedTransaction},
25};
26use lru::LruCache;
27use serde::{Deserialize, Serialize};
28use tracing::{debug, info, instrument, trace_span, warn};
29
30use crate::{
31    authority::{
32        AuthorityMetrics, AuthorityState,
33        authority_per_epoch_store::{
34            AuthorityPerEpochStore, ConsensusStats, ConsensusStatsAPI, ExecutionIndices,
35            ExecutionIndicesWithStats,
36        },
37        epoch_start_configuration::EpochStartConfigTrait,
38    },
39    checkpoints::{CheckpointService, CheckpointServiceNotify},
40    consensus_types::{AuthorityIndex, consensus_output_api::ConsensusOutputAPI},
41    execution_cache::ObjectCacheRead,
42    scoring_decision::update_low_scoring_authorities,
43    transaction_manager::TransactionManager,
44};
45
46pub struct ConsensusHandlerInitializer {
47    state: Arc<AuthorityState>,
48    checkpoint_service: Arc<CheckpointService>,
49    epoch_store: Arc<AuthorityPerEpochStore>,
50    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
51}
52
53impl ConsensusHandlerInitializer {
54    pub fn new(
55        state: Arc<AuthorityState>,
56        checkpoint_service: Arc<CheckpointService>,
57        epoch_store: Arc<AuthorityPerEpochStore>,
58        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
59    ) -> Self {
60        Self {
61            state,
62            checkpoint_service,
63            epoch_store,
64            low_scoring_authorities,
65        }
66    }
67
68    pub fn new_for_testing(
69        state: Arc<AuthorityState>,
70        checkpoint_service: Arc<CheckpointService>,
71    ) -> Self {
72        Self {
73            state: state.clone(),
74            checkpoint_service,
75            epoch_store: state.epoch_store_for_testing().clone(),
76            low_scoring_authorities: Arc::new(Default::default()),
77        }
78    }
79
80    pub fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
81        let new_epoch_start_state = self.epoch_store.epoch_start_state();
82        let consensus_committee = new_epoch_start_state.get_consensus_committee();
83
84        ConsensusHandler::new(
85            self.epoch_store.clone(),
86            self.checkpoint_service.clone(),
87            self.state.transaction_manager().clone(),
88            self.state.get_object_cache_reader().clone(),
89            self.low_scoring_authorities.clone(),
90            consensus_committee,
91            self.state.metrics.clone(),
92        )
93    }
94}
95
96pub struct ConsensusHandler<C> {
97    /// A store created for each epoch. ConsensusHandler is recreated each
98    /// epoch, with the corresponding store. This store is also used to get
99    /// the current epoch ID.
100    epoch_store: Arc<AuthorityPerEpochStore>,
101    /// Holds the indices, hash and stats after the last consensus commit
102    /// It is used for avoiding replaying already processed transactions,
103    /// checking chain consistency, and accumulating per-epoch consensus output
104    /// stats.
105    last_consensus_stats: ExecutionIndicesWithStats,
106    checkpoint_service: Arc<C>,
107    /// cache reader is needed when determining the next version to assign for
108    /// shared objects.
109    cache_reader: Arc<dyn ObjectCacheRead>,
110    /// Reputation scores used by consensus adapter that we update, forwarded
111    /// from consensus
112    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
113    /// The consensus committee used to do stake computations for deciding set
114    /// of low scoring authorities
115    committee: ConsensusCommittee,
116    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
117    // a new metrics type here if we want to.
118    metrics: Arc<AuthorityMetrics>,
119    /// Lru cache to quickly discard transactions processed by consensus
120    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
121    transaction_scheduler: AsyncTransactionScheduler,
122}
123
124const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
125
126impl<C> ConsensusHandler<C> {
127    pub fn new(
128        epoch_store: Arc<AuthorityPerEpochStore>,
129        checkpoint_service: Arc<C>,
130        transaction_manager: Arc<TransactionManager>,
131        cache_reader: Arc<dyn ObjectCacheRead>,
132        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
133        committee: ConsensusCommittee,
134        metrics: Arc<AuthorityMetrics>,
135    ) -> Self {
136        // Recover last_consensus_stats so it is consistent across validators.
137        let mut last_consensus_stats = epoch_store
138            .get_last_consensus_stats()
139            .expect("Should be able to read last consensus index");
140        // stats is empty at the beginning of epoch.
141        if !last_consensus_stats.stats.is_initialized() {
142            last_consensus_stats.stats = ConsensusStats::new(committee.size());
143        }
144        let transaction_scheduler =
145            AsyncTransactionScheduler::start(transaction_manager, epoch_store.clone());
146        Self {
147            epoch_store,
148            last_consensus_stats,
149            checkpoint_service,
150            cache_reader,
151            low_scoring_authorities,
152            committee,
153            metrics,
154            processed_cache: LruCache::new(NonZeroUsize::new(PROCESSED_CACHE_CAP).unwrap()),
155            transaction_scheduler,
156        }
157    }
158
159    /// Returns the last subdag index processed by the handler.
160    pub fn last_processed_subdag_index(&self) -> u64 {
161        self.last_consensus_stats.index.sub_dag_index
162    }
163}
164
165impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
166    #[instrument(level = "debug", skip_all)]
167    async fn handle_consensus_output(&mut self, consensus_output: impl ConsensusOutputAPI) {
168        let _scope = monitored_scope("HandleConsensusOutput");
169
170        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
171
172        let round = consensus_output.leader_round();
173
174        // TODO: Is this check necessary? For now mysticeti will not
175        // return more than one leader per round so we are not in danger of
176        // ignoring any commits.
177        assert!(round >= last_committed_round);
178        if last_committed_round == round {
179            // we can receive the same commit twice after restart
180            // It is critical that the writes done by this function are atomic - otherwise
181            // we can lose the later parts of a commit if we restart midway
182            // through processing it.
183            warn!(
184                "Ignoring consensus output for round {} as it is already committed. NOTE: This is only expected if consensus is running.",
185                round
186            );
187            return;
188        }
189
190        // (serialized, transaction, output_cert)
191        let mut transactions = vec![];
192        let leader_author = consensus_output.leader_author_index();
193        let commit_sub_dag_index = consensus_output.commit_sub_dag_index();
194
195        debug!(
196            %consensus_output,
197            epoch = ?self.epoch_store.epoch(),
198            "Received consensus output"
199        );
200
201        let execution_index = ExecutionIndices {
202            last_committed_round: round,
203            sub_dag_index: commit_sub_dag_index,
204            transaction_index: 0_u64,
205        };
206        // This function has filtered out any already processed consensus output.
207        // So we can safely assume that the index is always increasing.
208        assert!(self.last_consensus_stats.index < execution_index);
209
210        // TODO: test empty commit explicitly.
211        // Note that consensus commit batch may contain no transactions, but we still
212        // need to record the current round and subdag index in the
213        // last_consensus_stats, so that it won't be re-executed in the future.
214        self.last_consensus_stats.index = execution_index;
215
216        // Load all jwks that became active in the previous round, and commit them in
217        // this round. We want to delay one round because none of the
218        // transactions in the previous round could have been authenticated with
219        // the jwks that became active in that round.
220        //
221        // Because of this delay, jwks that become active in the last round of the epoch
222        // will never be committed. That is ok, because in the new epoch, the
223        // validators should immediately re-submit these jwks, and they can
224        // become active then.
225        let new_jwks = self
226            .epoch_store
227            .get_new_jwks(last_committed_round)
228            .expect("Unrecoverable error in consensus handler");
229
230        if !new_jwks.is_empty() {
231            let authenticator_state_update_transaction =
232                self.authenticator_state_update_transaction(round, new_jwks);
233            debug!(
234                "adding AuthenticatorStateUpdateV1({:?}) tx: {:?}",
235                authenticator_state_update_transaction.digest(),
236                authenticator_state_update_transaction,
237            );
238
239            transactions.push((
240                SequencedConsensusTransactionKind::System(authenticator_state_update_transaction),
241                leader_author,
242            ));
243        }
244
245        update_low_scoring_authorities(
246            self.low_scoring_authorities.clone(),
247            self.epoch_store.committee(),
248            &self.committee,
249            consensus_output.reputation_score_sorted_desc(),
250            &self.metrics,
251            self.epoch_store
252                .protocol_config()
253                .consensus_bad_nodes_stake_threshold(),
254        );
255
256        self.metrics
257            .consensus_committed_subdags
258            .with_label_values(&[&leader_author.to_string()])
259            .inc();
260
261        {
262            let span = trace_span!("process_consensus_certs");
263            let _guard = span.enter();
264            for (authority_index, authority_transactions) in consensus_output.transactions() {
265                // TODO: consider only messages within 1~3 rounds of the leader?
266                self.last_consensus_stats
267                    .stats
268                    .inc_num_messages(authority_index as usize);
269                for (transaction, serialized_len) in authority_transactions {
270                    let kind = classify(&transaction);
271                    self.metrics
272                        .consensus_handler_processed
273                        .with_label_values(&[kind])
274                        .inc();
275                    self.metrics
276                        .consensus_handler_transaction_sizes
277                        .with_label_values(&[kind])
278                        .observe(serialized_len as f64);
279                    if matches!(
280                        &transaction.kind,
281                        ConsensusTransactionKind::CertifiedTransaction(_)
282                    ) {
283                        self.last_consensus_stats
284                            .stats
285                            .inc_num_user_transactions(authority_index as usize);
286                    }
287                    let transaction = SequencedConsensusTransactionKind::External(transaction);
288                    transactions.push((transaction, authority_index));
289                }
290            }
291        }
292
293        for (i, authority) in self.committee.authorities() {
294            let hostname = &authority.hostname;
295            self.metrics
296                .consensus_committed_messages
297                .with_label_values(&[hostname])
298                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
299            self.metrics
300                .consensus_committed_user_transactions
301                .with_label_values(&[hostname])
302                .set(
303                    self.last_consensus_stats
304                        .stats
305                        .get_num_user_transactions(i.value()) as i64,
306                );
307        }
308
309        let mut all_transactions = Vec::new();
310        {
311            // We need a set here as well, since the processed_cache is a LRU cache and can
312            // drop entries while we're iterating over the sequenced
313            // transactions.
314            let mut processed_set = HashSet::new();
315
316            for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
317                // In process_consensus_transactions_and_commit_boundary(), we will add a system
318                // consensus commit prologue transaction, which will be the
319                // first transaction in this consensus commit batch. Therefore,
320                // the transaction sequence number starts from 1 here.
321                let current_tx_index = ExecutionIndices {
322                    last_committed_round: round,
323                    sub_dag_index: commit_sub_dag_index,
324                    transaction_index: (seq + 1) as u64,
325                };
326
327                self.last_consensus_stats.index = current_tx_index;
328
329                let certificate_author = *self
330                    .epoch_store
331                    .committee()
332                    .authority_by_index(cert_origin)
333                    .unwrap();
334
335                let sequenced_transaction = SequencedConsensusTransaction {
336                    certificate_author_index: cert_origin,
337                    certificate_author,
338                    consensus_index: current_tx_index,
339                    transaction,
340                };
341
342                let key = sequenced_transaction.key();
343                let in_set = !processed_set.insert(key);
344                let in_cache = self
345                    .processed_cache
346                    .put(sequenced_transaction.key(), ())
347                    .is_some();
348
349                if in_set || in_cache {
350                    self.metrics.skipped_consensus_txns_cache_hit.inc();
351                    continue;
352                }
353
354                all_transactions.push(sequenced_transaction);
355            }
356        }
357
358        let transactions_to_schedule = self
359            .epoch_store
360            .process_consensus_transactions_and_commit_boundary(
361                all_transactions,
362                &self.last_consensus_stats,
363                &self.checkpoint_service,
364                self.cache_reader.as_ref(),
365                &ConsensusCommitInfo::new(&consensus_output),
366                &self.metrics,
367            )
368            .await
369            .expect("Unrecoverable error in consensus handler");
370
371        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
372            let key = [commit_sub_dag_index, self.epoch_store.epoch()];
373            if iota_simulator::random::deterministic_probability(key, 0.01) {
374                iota_simulator::task::kill_current_node(None);
375            }
376        });
377
378        fail_point_async!("crash"); // for tests that produce random crashes
379
380        self.transaction_scheduler
381            .schedule(transactions_to_schedule)
382            .await;
383    }
384}
385
386struct AsyncTransactionScheduler {
387    sender: tokio::sync::mpsc::Sender<Vec<VerifiedExecutableTransaction>>,
388}
389
390impl AsyncTransactionScheduler {
391    pub fn start(
392        transaction_manager: Arc<TransactionManager>,
393        epoch_store: Arc<AuthorityPerEpochStore>,
394    ) -> Self {
395        let (sender, recv) = tokio::sync::mpsc::channel(16);
396        spawn_monitored_task!(Self::run(recv, transaction_manager, epoch_store));
397        Self { sender }
398    }
399
400    pub async fn schedule(&self, transactions: Vec<VerifiedExecutableTransaction>) {
401        self.sender.send(transactions).await.ok();
402    }
403
404    pub async fn run(
405        mut recv: tokio::sync::mpsc::Receiver<Vec<VerifiedExecutableTransaction>>,
406        transaction_manager: Arc<TransactionManager>,
407        epoch_store: Arc<AuthorityPerEpochStore>,
408    ) {
409        while let Some(transactions) = recv.recv().await {
410            let _guard = monitored_scope("ConsensusHandler::enqueue");
411            transaction_manager.enqueue(transactions, &epoch_store);
412        }
413    }
414}
415
416/// Consensus handler used by Mysticeti. Since Mysticeti repo is not yet
417/// integrated, we use a channel to receive the consensus output from Mysticeti.
418/// During initialization, the sender is passed into Mysticeti which can send
419/// consensus output to the channel.
420pub struct MysticetiConsensusHandler {
421    handle: Option<tokio::task::JoinHandle<()>>,
422}
423
424impl MysticetiConsensusHandler {
425    pub fn new(
426        mut consensus_handler: ConsensusHandler<CheckpointService>,
427        mut receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
428        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
429    ) -> Self {
430        let handle = spawn_monitored_task!(async move {
431            // TODO: pause when execution is overloaded, so consensus can detect the
432            // backpressure.
433            while let Some(consensus_output) = receiver.recv().await {
434                let commit_index = consensus_output.commit_ref.index;
435                consensus_handler
436                    .handle_consensus_output(consensus_output)
437                    .await;
438                commit_consumer_monitor.set_highest_handled_commit(commit_index);
439            }
440        });
441        Self {
442            handle: Some(handle),
443        }
444    }
445
446    pub async fn abort(&mut self) {
447        if let Some(handle) = self.handle.take() {
448            handle.abort();
449            let _ = handle.await;
450        }
451    }
452}
453
454impl Drop for MysticetiConsensusHandler {
455    fn drop(&mut self) {
456        if let Some(handle) = self.handle.take() {
457            handle.abort();
458        }
459    }
460}
461
462impl<C> ConsensusHandler<C> {
463    fn authenticator_state_update_transaction(
464        &self,
465        round: u64,
466        mut new_active_jwks: Vec<ActiveJwk>,
467    ) -> VerifiedExecutableTransaction {
468        new_active_jwks.sort();
469
470        info!("creating authenticator state update transaction");
471        assert!(self.epoch_store.authenticator_state_enabled());
472        let transaction = VerifiedTransaction::new_authenticator_state_update(
473            self.epoch(),
474            round,
475            new_active_jwks,
476            self.epoch_store
477                .epoch_start_config()
478                .authenticator_obj_initial_shared_version()
479                .expect("authenticator state obj must exist"),
480        );
481        VerifiedExecutableTransaction::new_system(transaction, self.epoch())
482    }
483
484    fn epoch(&self) -> EpochId {
485        self.epoch_store.epoch()
486    }
487}
488
489pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
490    match &transaction.kind {
491        ConsensusTransactionKind::CertifiedTransaction(certificate) => {
492            if certificate.contains_shared_object() {
493                "shared_certificate"
494            } else {
495                "owned_certificate"
496            }
497        }
498        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
499        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
500        ConsensusTransactionKind::CapabilityNotificationV1(_) => "capability_notification_v1",
501        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
502        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
503        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
504    }
505}
506
507#[derive(Debug, Clone, Serialize, Deserialize)]
508pub struct SequencedConsensusTransaction {
509    pub certificate_author_index: AuthorityIndex,
510    pub certificate_author: AuthorityName,
511    pub consensus_index: ExecutionIndices,
512    pub transaction: SequencedConsensusTransactionKind,
513}
514
515#[derive(Debug, Clone)]
516pub enum SequencedConsensusTransactionKind {
517    External(ConsensusTransaction),
518    System(VerifiedExecutableTransaction),
519}
520
521impl Serialize for SequencedConsensusTransactionKind {
522    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
523        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
524        serializable.serialize(serializer)
525    }
526}
527
528impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
529    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
530        let serializable =
531            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
532        Ok(serializable.into())
533    }
534}
535
536// We can't serialize SequencedConsensusTransactionKind directly because it
537// contains a VerifiedExecutableTransaction, which is not serializable (by
538// design). This wrapper allows us to convert to a serializable format easily.
539#[derive(Debug, Clone, Serialize, Deserialize)]
540enum SerializableSequencedConsensusTransactionKind {
541    External(ConsensusTransaction),
542    System(TrustedExecutableTransaction),
543}
544
545impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
546    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
547        match kind {
548            SequencedConsensusTransactionKind::External(ext) => {
549                SerializableSequencedConsensusTransactionKind::External(ext.clone())
550            }
551            SequencedConsensusTransactionKind::System(txn) => {
552                SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
553            }
554        }
555    }
556}
557
558impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
559    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
560        match kind {
561            SerializableSequencedConsensusTransactionKind::External(ext) => {
562                SequencedConsensusTransactionKind::External(ext)
563            }
564            SerializableSequencedConsensusTransactionKind::System(txn) => {
565                SequencedConsensusTransactionKind::System(txn.into())
566            }
567        }
568    }
569}
570
571#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
572pub enum SequencedConsensusTransactionKey {
573    External(ConsensusTransactionKey),
574    System(TransactionDigest),
575}
576
577impl SequencedConsensusTransactionKind {
578    pub fn key(&self) -> SequencedConsensusTransactionKey {
579        match self {
580            SequencedConsensusTransactionKind::External(ext) => {
581                SequencedConsensusTransactionKey::External(ext.key())
582            }
583            SequencedConsensusTransactionKind::System(txn) => {
584                SequencedConsensusTransactionKey::System(*txn.digest())
585            }
586        }
587    }
588
589    pub fn get_tracking_id(&self) -> u64 {
590        match self {
591            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
592            SequencedConsensusTransactionKind::System(_txn) => 0,
593        }
594    }
595
596    pub fn is_executable_transaction(&self) -> bool {
597        match self {
598            SequencedConsensusTransactionKind::External(ext) => ext.is_user_certificate(),
599            SequencedConsensusTransactionKind::System(_) => true,
600        }
601    }
602
603    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
604        match self {
605            SequencedConsensusTransactionKind::External(ext) => {
606                if let ConsensusTransactionKind::CertifiedTransaction(txn) = &ext.kind {
607                    Some(*txn.digest())
608                } else {
609                    None
610                }
611            }
612            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
613        }
614    }
615
616    pub fn is_end_of_publish(&self) -> bool {
617        match self {
618            SequencedConsensusTransactionKind::External(ext) => {
619                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
620            }
621            SequencedConsensusTransactionKind::System(_) => false,
622        }
623    }
624}
625
626impl SequencedConsensusTransaction {
627    pub fn sender_authority(&self) -> AuthorityName {
628        self.certificate_author
629    }
630
631    pub fn key(&self) -> SequencedConsensusTransactionKey {
632        self.transaction.key()
633    }
634
635    pub fn is_end_of_publish(&self) -> bool {
636        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
637            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
638        } else {
639            false
640        }
641    }
642
643    pub fn is_system(&self) -> bool {
644        matches!(
645            self.transaction,
646            SequencedConsensusTransactionKind::System(_)
647        )
648    }
649
650    pub fn is_user_tx_with_randomness(&self) -> bool {
651        let SequencedConsensusTransactionKind::External(ConsensusTransaction {
652            kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
653            ..
654        }) = &self.transaction
655        else {
656            return false;
657        };
658        certificate.transaction_data().uses_randomness()
659    }
660
661    pub fn as_shared_object_txn(&self) -> Option<&SenderSignedData> {
662        match &self.transaction {
663            SequencedConsensusTransactionKind::External(ConsensusTransaction {
664                kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
665                ..
666            }) if certificate.contains_shared_object() => Some(certificate.data()),
667            SequencedConsensusTransactionKind::System(txn) if txn.contains_shared_object() => {
668                Some(txn.data())
669            }
670            _ => None,
671        }
672    }
673}
674
675#[derive(Debug, Clone, Serialize, Deserialize)]
676pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
677
678#[cfg(test)]
679impl VerifiedSequencedConsensusTransaction {
680    pub fn new_test(transaction: ConsensusTransaction) -> Self {
681        Self(SequencedConsensusTransaction::new_test(transaction))
682    }
683}
684
685impl SequencedConsensusTransaction {
686    pub fn new_test(transaction: ConsensusTransaction) -> Self {
687        Self {
688            certificate_author_index: 0,
689            certificate_author: AuthorityName::ZERO,
690            consensus_index: Default::default(),
691            transaction: SequencedConsensusTransactionKind::External(transaction),
692        }
693    }
694}
695
696/// Represents the information from the current consensus commit.
697pub struct ConsensusCommitInfo {
698    pub round: u64,
699    pub timestamp: u64,
700    pub consensus_commit_digest: ConsensusCommitDigest,
701
702    #[cfg(any(test, feature = "test-utils"))]
703    skip_consensus_commit_prologue_in_test: bool,
704}
705
706impl ConsensusCommitInfo {
707    fn new(consensus_output: &impl ConsensusOutputAPI) -> Self {
708        Self {
709            round: consensus_output.leader_round(),
710            timestamp: consensus_output.commit_timestamp_ms(),
711            consensus_commit_digest: consensus_output.consensus_digest(),
712
713            #[cfg(any(test, feature = "test-utils"))]
714            skip_consensus_commit_prologue_in_test: false,
715        }
716    }
717
718    #[cfg(any(test, feature = "test-utils"))]
719    pub fn new_for_test(
720        commit_round: u64,
721        commit_timestamp: u64,
722        skip_consensus_commit_prologue_in_test: bool,
723    ) -> Self {
724        Self {
725            round: commit_round,
726            timestamp: commit_timestamp,
727            consensus_commit_digest: ConsensusCommitDigest::default(),
728            skip_consensus_commit_prologue_in_test,
729        }
730    }
731
732    #[cfg(any(test, feature = "test-utils"))]
733    pub fn skip_consensus_commit_prologue_in_test(&self) -> bool {
734        self.skip_consensus_commit_prologue_in_test
735    }
736
737    fn consensus_commit_prologue_v1_transaction(
738        &self,
739        epoch: u64,
740        cancelled_txn_version_assignment: Vec<(TransactionDigest, Vec<(ObjectID, SequenceNumber)>)>,
741    ) -> VerifiedExecutableTransaction {
742        let transaction = VerifiedTransaction::new_consensus_commit_prologue_v1(
743            epoch,
744            self.round,
745            self.timestamp,
746            self.consensus_commit_digest,
747            cancelled_txn_version_assignment,
748        );
749        VerifiedExecutableTransaction::new_system(transaction, epoch)
750    }
751
752    pub fn create_consensus_commit_prologue_transaction(
753        &self,
754        epoch: u64,
755        cancelled_txn_version_assignment: Vec<(TransactionDigest, Vec<(ObjectID, SequenceNumber)>)>,
756    ) -> VerifiedExecutableTransaction {
757        self.consensus_commit_prologue_v1_transaction(epoch, cancelled_txn_version_assignment)
758    }
759}
760
761#[cfg(test)]
762mod tests {
763    use consensus_core::{
764        BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
765    };
766    use iota_protocol_config::{Chain, ConsensusTransactionOrdering};
767    use iota_types::{
768        base_types::{AuthorityName, IotaAddress, random_object_ref},
769        committee::Committee,
770        messages_consensus::{
771            AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
772        },
773        object::Object,
774        supported_protocol_versions::{
775            SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
776        },
777        transaction::{
778            CertifiedTransaction, SenderSignedData, TransactionData, TransactionDataAPI,
779        },
780    };
781    use prometheus::Registry;
782
783    use super::*;
784    use crate::{
785        authority::{
786            authority_per_epoch_store::ConsensusStatsAPI,
787            test_authority_builder::TestAuthorityBuilder,
788        },
789        checkpoints::CheckpointServiceNoop,
790        consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
791        post_consensus_tx_reorder::PostConsensusTxReorder,
792    };
793
794    #[tokio::test]
795    pub async fn test_consensus_handler() {
796        // GIVEN
797        let mut objects = test_gas_objects();
798        let shared_object = Object::shared_for_testing();
799        objects.push(shared_object.clone());
800
801        let network_config =
802            iota_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
803                .with_objects(objects.clone())
804                .build();
805
806        let state = TestAuthorityBuilder::new()
807            .with_network_config(&network_config, 0)
808            .build()
809            .await;
810
811        let epoch_store = state.epoch_store_for_testing().clone();
812        let new_epoch_start_state = epoch_store.epoch_start_state();
813        let consensus_committee = new_epoch_start_state.get_consensus_committee();
814
815        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
816
817        let mut consensus_handler = ConsensusHandler::new(
818            epoch_store,
819            Arc::new(CheckpointServiceNoop {}),
820            state.transaction_manager().clone(),
821            state.get_object_cache_reader().clone(),
822            Arc::new(ArcSwap::default()),
823            consensus_committee.clone(),
824            metrics,
825        );
826
827        // AND
828        // Create test transactions
829        let transactions = test_certificates(&state, shared_object).await;
830        let mut blocks = Vec::new();
831
832        for (i, transaction) in transactions.iter().enumerate() {
833            let transaction_bytes: Vec<u8> = bcs::to_bytes(
834                &ConsensusTransaction::new_certificate_message(&state.name, transaction.clone()),
835            )
836            .unwrap();
837
838            // AND create block for each transaction
839            let block = VerifiedBlock::new_for_test(
840                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
841                    .set_transactions(vec![Transaction::new(transaction_bytes)])
842                    .build(),
843            );
844
845            blocks.push(block);
846        }
847
848        // AND create the consensus output
849        let leader_block = blocks[0].clone();
850        let committed_sub_dag = CommittedSubDag::new(
851            leader_block.reference(),
852            blocks.clone(),
853            leader_block.timestamp_ms(),
854            CommitRef::new(10, CommitDigest::MIN),
855            vec![],
856        );
857
858        // AND processing the consensus output once
859        consensus_handler
860            .handle_consensus_output(committed_sub_dag.clone())
861            .await;
862
863        // AND capturing the consensus stats
864        let num_blocks = blocks.len();
865        let num_transactions = transactions.len();
866        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
867        assert_eq!(
868            last_consensus_stats_1.index.transaction_index,
869            num_transactions as u64
870        );
871        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
872        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
873        assert_eq!(last_consensus_stats_1.hash, 0);
874        assert_eq!(
875            last_consensus_stats_1.stats.get_num_messages(0),
876            num_blocks as u64
877        );
878        assert_eq!(
879            last_consensus_stats_1.stats.get_num_user_transactions(0),
880            num_transactions as u64
881        );
882
883        // WHEN processing the same output multiple times
884        // THEN the consensus stats do not update
885        for _ in 0..2 {
886            consensus_handler
887                .handle_consensus_output(committed_sub_dag.clone())
888                .await;
889            let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone();
890            assert_eq!(last_consensus_stats_1, last_consensus_stats_2);
891        }
892    }
893
894    #[test]
895    fn test_order_by_gas_price() {
896        let chain = Chain::Unknown;
897        let mut v = vec![
898            cap_txn(10, chain),
899            user_txn(42),
900            user_txn(100),
901            cap_txn(1, chain),
902        ];
903        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
904        assert_eq!(
905            extract(v),
906            vec![
907                "cap(10)".to_string(),
908                "cap(1)".to_string(),
909                "user(100)".to_string(),
910                "user(42)".to_string(),
911            ]
912        );
913
914        let mut v = vec![
915            user_txn(1200),
916            cap_txn(10, chain),
917            user_txn(12),
918            user_txn(1000),
919            user_txn(42),
920            user_txn(100),
921            cap_txn(1, chain),
922            user_txn(1000),
923        ];
924        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
925        assert_eq!(
926            extract(v),
927            vec![
928                "cap(10)".to_string(),
929                "cap(1)".to_string(),
930                "user(1200)".to_string(),
931                "user(1000)".to_string(),
932                "user(1000)".to_string(),
933                "user(100)".to_string(),
934                "user(42)".to_string(),
935                "user(12)".to_string(),
936            ]
937        );
938
939        // If there are no user transactions, the order should be preserved.
940        let mut v = vec![
941            cap_txn(10, chain),
942            eop_txn(12),
943            eop_txn(10),
944            cap_txn(1, chain),
945            eop_txn(11),
946        ];
947        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
948        assert_eq!(
949            extract(v),
950            vec![
951                "cap(10)".to_string(),
952                "eop(12)".to_string(),
953                "eop(10)".to_string(),
954                "cap(1)".to_string(),
955                "eop(11)".to_string(),
956            ]
957        );
958    }
959
960    fn extract(v: Vec<VerifiedSequencedConsensusTransaction>) -> Vec<String> {
961        v.into_iter().map(extract_one).collect()
962    }
963
964    fn extract_one(t: VerifiedSequencedConsensusTransaction) -> String {
965        match t.0.transaction {
966            SequencedConsensusTransactionKind::External(ext) => match ext.kind {
967                ConsensusTransactionKind::EndOfPublish(authority) => {
968                    format!("eop({})", authority.0[0])
969                }
970                ConsensusTransactionKind::CapabilityNotificationV1(cap) => {
971                    format!("cap({})", cap.generation)
972                }
973                ConsensusTransactionKind::CertifiedTransaction(txn) => {
974                    format!("user({})", txn.transaction_data().gas_price())
975                }
976                _ => unreachable!(),
977            },
978            SequencedConsensusTransactionKind::System(_) => unreachable!(),
979        }
980    }
981
982    fn eop_txn(a: u8) -> VerifiedSequencedConsensusTransaction {
983        let mut authority = AuthorityName::default();
984        authority.0[0] = a;
985        txn(ConsensusTransactionKind::EndOfPublish(authority))
986    }
987
988    fn cap_txn(generation: u64, chain: Chain) -> VerifiedSequencedConsensusTransaction {
989        txn(ConsensusTransactionKind::CapabilityNotificationV1(
990            // we don't use the "new" constructor because we need to set the generation
991            AuthorityCapabilitiesV1 {
992                authority: Default::default(),
993                generation,
994                supported_protocol_versions:
995                    SupportedProtocolVersionsWithHashes::from_supported_versions(
996                        SupportedProtocolVersions::SYSTEM_DEFAULT,
997                        chain,
998                    ),
999                available_system_packages: vec![],
1000            },
1001        ))
1002    }
1003
1004    fn user_txn(gas_price: u64) -> VerifiedSequencedConsensusTransaction {
1005        let (committee, keypairs) = Committee::new_simple_test_committee();
1006        let data = SenderSignedData::new(
1007            TransactionData::new_transfer(
1008                IotaAddress::default(),
1009                random_object_ref(),
1010                IotaAddress::default(),
1011                random_object_ref(),
1012                1000 * gas_price,
1013                gas_price,
1014            ),
1015            vec![],
1016        );
1017        txn(ConsensusTransactionKind::CertifiedTransaction(Box::new(
1018            CertifiedTransaction::new_from_keypairs_for_testing(data, &keypairs, &committee),
1019        )))
1020    }
1021
1022    fn txn(kind: ConsensusTransactionKind) -> VerifiedSequencedConsensusTransaction {
1023        VerifiedSequencedConsensusTransaction::new_test(ConsensusTransaction {
1024            kind,
1025            tracking_id: Default::default(),
1026        })
1027    }
1028}