Skip to main content

iota_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    future::Future,
8    ops::Deref,
9    sync::{
10        Arc,
11        atomic::{AtomicU64, Ordering},
12    },
13    time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use dashmap::{DashMap, try_result::TryResult};
18use futures::{
19    FutureExt, StreamExt,
20    future::{self, Either, select},
21    pin_mut,
22    stream::FuturesUnordered,
23};
24use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task};
25use iota_simulator::anemo::PeerId;
26use iota_types::{
27    base_types::{AuthorityName, TransactionDigest},
28    committee::Committee,
29    error::{IotaError, IotaResult},
30    fp_ensure,
31    messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
32};
33use itertools::Itertools;
34use parking_lot::RwLockReadGuard;
35use prometheus::{
36    Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
37    register_histogram_vec_with_registry, register_histogram_with_registry,
38    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
39    register_int_gauge_with_registry,
40};
41use tokio::{
42    sync::{Semaphore, SemaphorePermit, oneshot},
43    task::JoinHandle,
44    time::{
45        Duration, {self},
46    },
47};
48use tracing::{debug, info, trace, warn};
49
50use crate::{
51    authority::authority_per_epoch_store::AuthorityPerEpochStore,
52    checkpoints::CheckpointStore,
53    connection_monitor::ConnectionStatus,
54    consensus_handler::{SequencedConsensusTransactionKey, classify},
55    epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
56    metrics::LatencyObserver,
57};
58
59#[cfg(test)]
60#[path = "unit_tests/consensus_tests.rs"]
61pub mod consensus_tests;
62
63const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
64    0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
65];
66
67pub struct ConsensusAdapterMetrics {
68    // Certificate sequencing metrics
69    pub sequencing_certificate_attempt: IntCounterVec,
70    pub sequencing_certificate_success: IntCounterVec,
71    pub sequencing_certificate_failures: IntCounterVec,
72    pub sequencing_certificate_status: IntCounterVec,
73    pub sequencing_certificate_inflight: IntGaugeVec,
74    pub sequencing_acknowledge_latency: HistogramVec,
75    pub sequencing_certificate_latency: HistogramVec,
76    pub sequencing_certificate_authority_position: Histogram,
77    pub sequencing_certificate_positions_moved: Histogram,
78    pub sequencing_certificate_preceding_disconnected: Histogram,
79    pub sequencing_certificate_processed: IntCounterVec,
80    pub sequencing_in_flight_semaphore_wait: IntGauge,
81    pub sequencing_in_flight_submissions: IntGauge,
82    pub sequencing_estimated_latency: IntGauge,
83    pub sequencing_resubmission_interval_ms: IntGauge,
84}
85
86impl ConsensusAdapterMetrics {
87    pub fn new(registry: &Registry) -> Self {
88        Self {
89            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
90                "sequencing_certificate_attempt",
91                "Counts the number of certificates the validator attempts to sequence.",
92                &["tx_type"],
93                registry,
94            )
95            .unwrap(),
96            sequencing_certificate_success: register_int_counter_vec_with_registry!(
97                "sequencing_certificate_success",
98                "Counts the number of successfully sequenced certificates.",
99                &["tx_type"],
100                registry,
101            )
102            .unwrap(),
103            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
104                "sequencing_certificate_failures",
105                "Counts the number of sequenced certificates that failed other than by timeout.",
106                &["tx_type"],
107                registry,
108            )
109            .unwrap(),
110            sequencing_certificate_status: register_int_counter_vec_with_registry!(
111                "sequencing_certificate_status",
112                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
113                &["tx_type", "status"],
114                registry,
115            )
116            .unwrap(),
117            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
118                "sequencing_certificate_inflight",
119                "The inflight requests to sequence certificates.",
120                &["tx_type"],
121                registry,
122            )
123            .unwrap(),
124            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
125                "sequencing_acknowledge_latency",
126                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
127                &["retry", "tx_type"],
128                LATENCY_SEC_BUCKETS.to_vec(),
129                registry,
130            )
131            .unwrap(),
132            sequencing_certificate_latency: register_histogram_vec_with_registry!(
133                "sequencing_certificate_latency",
134                "The latency for sequencing a certificate.",
135                &["position", "tx_type", "processed_method"],
136                LATENCY_SEC_BUCKETS.to_vec(),
137                registry,
138            )
139            .unwrap(),
140            sequencing_certificate_authority_position: register_histogram_with_registry!(
141                "sequencing_certificate_authority_position",
142                "The position of the authority when submitted a certificate to consensus.",
143                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
144                registry,
145            )
146            .unwrap(),
147            sequencing_certificate_positions_moved: register_histogram_with_registry!(
148                "sequencing_certificate_positions_moved",
149                "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
150                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
151                registry,
152            )
153            .unwrap(),
154            sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155                "sequencing_certificate_preceding_disconnected",
156                "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158                registry,
159            )
160            .unwrap(),
161            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162                "sequencing_certificate_processed",
163                "The number of certificates that have been processed either by consensus or checkpoint.",
164                &["source"],
165                registry
166            )
167            .unwrap(),
168            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169                "sequencing_in_flight_semaphore_wait",
170                "How many requests are blocked on submit_permit.",
171                registry,
172            )
173            .unwrap(),
174            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175                "sequencing_in_flight_submissions",
176                "Number of transactions submitted to local consensus instance and not yet sequenced",
177                registry,
178            )
179            .unwrap(),
180            sequencing_estimated_latency: register_int_gauge_with_registry!(
181                "sequencing_estimated_latency",
182                "Consensus latency estimated by consensus adapter in milliseconds",
183                registry,
184            )
185            .unwrap(),
186            sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187                "sequencing_resubmission_interval_ms",
188                "Resubmission interval used by consensus adapter in milliseconds",
189                registry,
190            )
191            .unwrap(),
192        }
193    }
194
195    pub fn new_test() -> Self {
196        Self::new(&Registry::default())
197    }
198}
199
200/// Block status for internal use in the consensus adapter.
201pub enum BlockStatusInternal {
202    Sequenced,
203    GarbageCollected,
204}
205
206impl From<starfish_core::BlockStatus> for BlockStatusInternal {
207    fn from(status: starfish_core::BlockStatus) -> Self {
208        match status {
209            starfish_core::BlockStatus::Sequenced(_) => BlockStatusInternal::Sequenced,
210            starfish_core::BlockStatus::GarbageCollected(_) => {
211                BlockStatusInternal::GarbageCollected
212            }
213        }
214    }
215}
216
217pub type BlockStatusReceiver = oneshot::Receiver<BlockStatusInternal>;
218
219#[mockall::automock]
220pub trait SubmitToConsensus: Sync + Send + 'static {
221    fn submit_to_consensus(
222        &self,
223        transactions: &[ConsensusTransaction],
224        epoch_store: &Arc<AuthorityPerEpochStore>,
225    ) -> IotaResult;
226}
227
228#[mockall::automock]
229#[async_trait::async_trait]
230pub trait ConsensusClient: Sync + Send + 'static {
231    async fn submit(
232        &self,
233        transactions: &[ConsensusTransaction],
234        epoch_store: &Arc<AuthorityPerEpochStore>,
235    ) -> IotaResult<BlockStatusReceiver>;
236}
237
238/// Submit IOTA certificates to the consensus.
239pub struct ConsensusAdapter {
240    /// The network client connecting to the consensus node of this authority.
241    consensus_client: Arc<dyn ConsensusClient>,
242    /// The checkpoint store for the validator
243    checkpoint_store: Arc<CheckpointStore>,
244    /// Authority pubkey.
245    authority: AuthorityName,
246    /// The limit to number of inflight transactions at this node.
247    max_pending_transactions: usize,
248    /// Number of submitted transactions still inflight at this node.
249    num_inflight_transactions: AtomicU64,
250    /// Dictates the maximum position  from which will submit to consensus. Even
251    /// if the is elected to submit from a higher position than this, it
252    /// will "reset" to the max_submit_position.
253    max_submit_position: Option<usize>,
254    /// When provided it will override the current back off logic and will use
255    /// this value instead as delay step.
256    submit_delay_step_override: Option<Duration>,
257    /// A structure to check the connection statuses populated by the Connection
258    /// Monitor Listener
259    connection_monitor_status: Arc<dyn CheckConnection>,
260    /// A structure to check the reputation scores populated by Consensus
261    low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
262    /// A structure to register metrics
263    metrics: ConsensusAdapterMetrics,
264    /// Semaphore limiting parallel submissions to consensus
265    submit_semaphore: Semaphore,
266    latency_observer: LatencyObserver,
267}
268
269pub trait CheckConnection: Send + Sync {
270    fn check_connection(
271        &self,
272        ourself: &AuthorityName,
273        authority: &AuthorityName,
274    ) -> Option<ConnectionStatus>;
275    fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
276}
277
278pub struct ConnectionMonitorStatus {
279    /// Current connection statuses forwarded from the connection monitor
280    pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
281    /// A map from authority name to peer id
282    pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
283}
284
285pub struct ConnectionMonitorStatusForTests {}
286
287impl ConsensusAdapter {
288    /// Make a new Consensus adapter instance.
289    pub fn new(
290        consensus_client: Arc<dyn ConsensusClient>,
291        checkpoint_store: Arc<CheckpointStore>,
292        authority: AuthorityName,
293        connection_monitor_status: Arc<dyn CheckConnection>,
294        max_pending_transactions: usize,
295        max_pending_local_submissions: usize,
296        max_submit_position: Option<usize>,
297        submit_delay_step_override: Option<Duration>,
298        metrics: ConsensusAdapterMetrics,
299    ) -> Self {
300        let num_inflight_transactions = Default::default();
301        let low_scoring_authorities =
302            ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
303        Self {
304            consensus_client,
305            checkpoint_store,
306            authority,
307            max_pending_transactions,
308            max_submit_position,
309            submit_delay_step_override,
310            num_inflight_transactions,
311            connection_monitor_status,
312            low_scoring_authorities,
313            metrics,
314            submit_semaphore: Semaphore::new(max_pending_local_submissions),
315            latency_observer: LatencyObserver::new(),
316        }
317    }
318
319    pub fn swap_low_scoring_authorities(
320        &self,
321        new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
322    ) {
323        self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
324    }
325
326    pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
327        // Currently consensus worker might lose transactions on restart, so we need to
328        // resend them.
329        // TODO: get_all_pending_consensus_transactions is called
330        // twice when initializing AuthorityPerEpochStore and here, should not
331        // be a big deal but can be optimized
332        let mut recovered = epoch_store.get_all_pending_consensus_transactions();
333
334        let is_pending_consensus_certificates_empty =
335            if epoch_store.protocol_config().enable_pcool_flow() {
336                // In the P-COOL flow, the list of pending consensus
337                // certificates is always empty.
338                true
339            } else {
340                epoch_store.pending_consensus_certificates_empty()
341            };
342
343        if epoch_store
344            .get_reconfig_state_read_lock_guard()
345            .is_reject_user_certs()
346            && is_pending_consensus_certificates_empty
347        {
348            // If `recovered` does not contain `EndOfPublish` yet, we need to insert it.
349            if !recovered
350                .iter()
351                .any(ConsensusTransaction::is_end_of_publish)
352            {
353                // There are two cases when this is needed:
354                // (1) We send `EndOfPublish` message after removing pending certificates in
355                // `submit_and_wait_inner`. It is possible that node will crash
356                // between those two steps, in which case we might need to
357                // re-introduce `EndOfPublish` message on restart.
358                // (2) If node crashed inside `ConsensusAdapter::close_epoch`,
359                // after reconfig lock state was written to DB and before we persisted
360                // `EndOfPublish` message.
361                recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
362            }
363        }
364        debug!(
365            "Submitting {:?} recovered pending consensus transactions to consensus",
366            recovered.len()
367        );
368        for transaction in recovered {
369            if transaction.is_end_of_publish() {
370                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
371            }
372            self.submit_unchecked(&[transaction], epoch_store);
373        }
374    }
375
376    fn await_submit_delay(
377        &self,
378        committee: &Committee,
379        transactions: &[ConsensusTransaction],
380    ) -> (impl Future<Output = ()>, usize, usize, usize) {
381        // Use the minimum digest to compute submit delay.
382        let min_digest = transactions
383            .iter()
384            .filter_map(|tx| match &tx.kind {
385                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
386                    Some(certificate.digest())
387                }
388                ConsensusTransactionKind::UserTransactionV1(_) => {
389                    // P-COOL: no submit delay needed (number of submitting validators
390                    // controlled through another mechanism)
391                    None
392                }
393                _ => None,
394            })
395            .min();
396
397        let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
398            Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
399            _ => (Duration::ZERO, 0, 0, 0),
400        };
401        (
402            tokio::time::sleep(duration),
403            position,
404            positions_moved,
405            preceding_disconnected,
406        )
407    }
408
409    fn await_submit_delay_user_transaction(
410        &self,
411        committee: &Committee,
412        tx_digest: &TransactionDigest,
413    ) -> (Duration, usize, usize, usize) {
414        let (position, positions_moved, preceding_disconnected) =
415            self.submission_position(committee, tx_digest);
416
417        const DEFAULT_LATENCY: Duration = Duration::from_secs(1); // > p50 consensus latency with global deployment
418        const MIN_LATENCY: Duration = Duration::from_millis(150);
419        const MAX_LATENCY: Duration = Duration::from_secs(3);
420
421        let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
422        self.metrics
423            .sequencing_estimated_latency
424            .set(latency.as_millis() as i64);
425
426        let latency = std::cmp::max(latency, MIN_LATENCY);
427        let latency = std::cmp::min(latency, MAX_LATENCY);
428        let latency = latency * 2;
429        let (delay_step, position) =
430            self.override_by_max_submit_position_settings(latency, position);
431
432        self.metrics
433            .sequencing_resubmission_interval_ms
434            .set(delay_step.as_millis() as i64);
435
436        (
437            delay_step * position as u32,
438            position,
439            positions_moved,
440            preceding_disconnected,
441        )
442    }
443
444    /// Overrides the latency and the position if there are defined settings for
445    /// `max_submit_position` and `submit_delay_step_override`. If the
446    /// `max_submit_position` has defined, then that will always be used
447    /// irrespective of any so far decision. Same for the
448    /// `submit_delay_step_override`.
449    fn override_by_max_submit_position_settings(
450        &self,
451        latency: Duration,
452        mut position: usize,
453    ) -> (Duration, usize) {
454        // Respect any manual override for position and latency from the settings
455        if let Some(max_submit_position) = self.max_submit_position {
456            position = std::cmp::min(position, max_submit_position);
457        }
458
459        let delay_step = self.submit_delay_step_override.unwrap_or(latency);
460        (delay_step, position)
461    }
462
463    /// Check when this authority should submit the certificate to consensus.
464    /// This sorts all authorities based on pseudo-random distribution derived
465    /// from transaction hash.
466    ///
467    /// The function targets having 1 consensus transaction submitted per user
468    /// transaction when system operates normally.
469    ///
470    /// The function returns the position of this authority when it is their
471    /// turn to submit the transaction to consensus.
472    fn submission_position(
473        &self,
474        committee: &Committee,
475        tx_digest: &TransactionDigest,
476    ) -> (usize, usize, usize) {
477        let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
478
479        self.check_submission_wrt_connectivity_and_scores(positions)
480    }
481
482    /// This function runs the following algorithm to decide whether or not to
483    /// submit a transaction to consensus.
484    ///
485    /// It takes in a deterministic list that represents positions of all the
486    /// authorities. The authority in the first position will be responsible
487    /// for submitting to consensus, and so we check if we are this
488    /// validator, and if so, return true.
489    ///
490    /// If we are not in that position, we check our connectivity to the
491    /// authority in that position. If we are connected to them, we can
492    /// assume that they are operational and will submit the transaction. If
493    /// we are not connected to them, we assume that they are not operational
494    /// and we will not rely on that authority to submit the transaction. So
495    /// we shift them out of the first position, and run this algorithm
496    /// again on the new set of positions.
497    ///
498    /// This can possibly result in a transaction being submitted twice if an
499    /// authority sees a false negative in connectivity to another, such as
500    /// in the case of a network partition.
501    ///
502    /// Recursively, if the authority further ahead of us in the positions is a
503    /// low performing authority, we will move our positions up one, and
504    /// submit the transaction. This allows maintaining performance overall.
505    /// We will only do this part for authorities that are not low performers
506    /// themselves to prevent extra amplification in the case that the
507    /// positions look like [low_scoring_a1, low_scoring_a2, a3]
508    fn check_submission_wrt_connectivity_and_scores(
509        &self,
510        positions: Vec<AuthorityName>,
511    ) -> (usize, usize, usize) {
512        let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
513        if low_scoring_authorities.get(&self.authority).is_some() {
514            return (positions.len(), 0, 0);
515        }
516        let initial_position = get_position_in_list(self.authority, positions.clone());
517        let mut preceding_disconnected = 0;
518        let mut before_our_position = true;
519
520        let filtered_positions: Vec<_> = positions
521            .into_iter()
522            .filter(|authority| {
523                let keep = self.authority == *authority; // don't filter ourself out
524                if keep {
525                    before_our_position = false;
526                }
527
528                // filter out any nodes that appear disconnected
529                let connected = self
530                    .connection_monitor_status
531                    .check_connection(&self.authority, authority)
532                    .unwrap_or(ConnectionStatus::Disconnected)
533                    == ConnectionStatus::Connected;
534                if !connected && before_our_position {
535                    preceding_disconnected += 1; // used for metrics
536                }
537
538                // Filter out low scoring nodes
539                let high_scoring = low_scoring_authorities.get(authority).is_none();
540
541                keep || (connected && high_scoring)
542            })
543            .collect();
544
545        let position = get_position_in_list(self.authority, filtered_positions);
546
547        (
548            position,
549            initial_position - position,
550            preceding_disconnected,
551        )
552    }
553
554    /// This method blocks until transaction is persisted in local database
555    /// It then returns handle to async task, user can join this handle to await
556    /// while transaction is processed by consensus
557    ///
558    /// This method guarantees that once submit(but not returned async handle)
559    /// returns, transaction is persisted and will eventually be sent to
560    /// consensus even after restart
561    ///
562    /// When submitting a certificate caller **must** provide a ReconfigState
563    /// lock guard
564    pub fn submit(
565        self: &Arc<Self>,
566        transaction: ConsensusTransaction,
567        lock: Option<&RwLockReadGuard<ReconfigState>>,
568        epoch_store: &Arc<AuthorityPerEpochStore>,
569    ) -> IotaResult<JoinHandle<()>> {
570        self.submit_batch(&[transaction], lock, epoch_store)
571    }
572
573    pub fn submit_batch(
574        self: &Arc<Self>,
575        transactions: &[ConsensusTransaction],
576        lock: Option<&RwLockReadGuard<ReconfigState>>,
577        epoch_store: &Arc<AuthorityPerEpochStore>,
578    ) -> IotaResult<JoinHandle<()>> {
579        if transactions.len() > 1 {
580            // Soft-bundle batches must be homogeneous: either all
581            // CertifiedTransaction (certificate flow) or all
582            // UserTransactionV1 (P-COOL flow). submit_and_wait_inner
583            // assumes a single transaction kind across the batch.
584            for transaction in transactions {
585                fp_ensure!(
586                    matches!(
587                        transaction.kind,
588                        ConsensusTransactionKind::CertifiedTransaction(_)
589                            | ConsensusTransactionKind::UserTransactionV1(_)
590                    ),
591                    IotaError::InvalidTxKindInSoftBundle
592                );
593            }
594        }
595
596        epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
597
598        Ok(self.submit_unchecked(transactions, epoch_store))
599    }
600
601    /// Returns the number of transactions currently in-flight in consensus.
602    pub fn num_inflight_transactions(&self) -> u64 {
603        self.num_inflight_transactions.load(Ordering::Relaxed)
604    }
605
606    /// Performs weakly consistent checks on internal buffers to quickly
607    /// discard transactions if we are overloaded
608    pub fn check_limits(&self) -> bool {
609        // First check total transactions (waiting and in submission)
610        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
611            > self.max_pending_transactions
612        {
613            return false;
614        }
615        // Then check if submit_semaphore has permits
616        self.submit_semaphore.available_permits() > 0
617    }
618
619    pub(crate) fn check_consensus_overload(&self) -> IotaResult {
620        fp_ensure!(
621            self.check_limits(),
622            IotaError::TooManyTransactionsPendingConsensus
623        );
624        Ok(())
625    }
626
627    fn submit_unchecked(
628        self: &Arc<Self>,
629        transactions: &[ConsensusTransaction],
630        epoch_store: &Arc<AuthorityPerEpochStore>,
631    ) -> JoinHandle<()> {
632        // Reconfiguration lock is dropped when pending_consensus_transactions is
633        // persisted, before it is handled by consensus
634        let async_stage = self
635            .clone()
636            .submit_and_wait(transactions.to_vec(), epoch_store.clone());
637        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
638        // (Limit is not applied atomically, and only to user transactions.)
639        let join_handle = spawn_monitored_task!(async_stage);
640        join_handle
641    }
642
643    async fn submit_and_wait(
644        self: Arc<Self>,
645        transactions: Vec<ConsensusTransaction>,
646        epoch_store: Arc<AuthorityPerEpochStore>,
647    ) {
648        // When epoch_terminated signal is received all pending submit_and_wait_inner
649        // are dropped.
650        //
651        // This is needed because submit_and_wait_inner waits on read_notify for
652        // consensus message to be processed, which may never happen on epoch
653        // boundary.
654        //
655        // In addition to that, within_alive_epoch ensures that all pending consensus
656        // adapter tasks are stopped before reconfiguration can proceed.
657        //
658        // This is essential because consensus workers reuse same ports when consensus
659        // restarts, this means we might be sending transactions from previous
660        // epochs to consensus of new epoch if we have not had this barrier.
661        epoch_store
662            .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
663            .await
664            .ok(); // result here indicates if epoch ended earlier, we don't
665        // care about it
666    }
667
668    async fn submit_and_wait_inner(
669        self: Arc<Self>,
670        transactions: Vec<ConsensusTransaction>,
671        epoch_store: &Arc<AuthorityPerEpochStore>,
672    ) {
673        if transactions.is_empty() {
674            return;
675        }
676
677        // submit_batch enforces that multi-tx batches (soft bundles) are
678        // homogeneous: either all CertifiedTransaction or all UserTransactionV1.
679        // Single-tx submits can be any kind.
680        let is_soft_bundle = transactions.len() > 1;
681
682        let mut transaction_keys = Vec::new();
683
684        for transaction in &transactions {
685            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
686                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
687                epoch_store.record_epoch_pending_certs_process_time_metric();
688            }
689
690            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
691            transaction_keys.push(transaction_key);
692        }
693        let tx_type = if !is_soft_bundle {
694            classify(&transactions[0])
695        } else {
696            "soft_bundle"
697        };
698
699        let mut guard = InflightDropGuard::acquire(&self, tx_type);
700
701        // Create the waiter until the node's turn comes to submit to consensus
702        let (await_submit, position, positions_moved, preceding_disconnected) =
703            self.await_submit_delay(epoch_store.committee(), &transactions[..]);
704
705        // Create the waiter until the transaction is processed by consensus or via
706        // checkpoint
707        let processed_via_consensus_or_checkpoint =
708            self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
709        pin_mut!(processed_via_consensus_or_checkpoint);
710
711        let processed_waiter = tokio::select! {
712            // We need to wait for some delay until we submit transaction to the consensus
713            _ = await_submit => Some(processed_via_consensus_or_checkpoint),
714
715            // If epoch ends, don't wait for submit delay
716            _ = epoch_store.user_certs_closed_notify() => {
717                warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
718                Some(processed_via_consensus_or_checkpoint)
719            }
720
721            // If transaction is received by consensus or checkpoint while we wait, we are done.
722            // Capture the resolved `ProcessedMethod` so the latency metric and
723            // `latency_observer` accurately reflect whether the early-fire was a
724            // consensus processing, a checkpoint sync, or a `dropped_tx_status_cache`
725            // hit. Without this, the guard would stay at its default `Consensus`,
726            // mislabeling cache-hit retries.
727            method = &mut processed_via_consensus_or_checkpoint => {
728                guard.processed_method = method;
729                None
730            }
731        };
732
733        // Log warnings for administrative transactions that fail to get sequenced
734        let _monitor = if !is_soft_bundle
735            && matches!(
736                transactions[0].kind,
737                ConsensusTransactionKind::EndOfPublish(_)
738                    | ConsensusTransactionKind::CapabilityNotificationV1(_)
739                    | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
740                    | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
741            ) {
742            let transaction_keys = transaction_keys.clone();
743            Some(CancelOnDrop(spawn_monitored_task!(async {
744                let mut i = 0u64;
745                loop {
746                    i += 1;
747                    const WARN_DELAY_S: u64 = 30;
748                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
749                    let total_wait = i * WARN_DELAY_S;
750                    warn!(
751                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
752                        total_wait, transaction_keys
753                    );
754                }
755            })))
756        } else {
757            None
758        };
759
760        if let Some(processed_waiter) = processed_waiter {
761            debug!("Submitting {:?} to consensus", transaction_keys);
762
763            // populate the position only when this authority submits the transaction
764            // to consensus
765            guard.position = Some(position);
766            guard.positions_moved = Some(positions_moved);
767            guard.preceding_disconnected = Some(preceding_disconnected);
768
769            let _permit: SemaphorePermit = self
770                .submit_semaphore
771                .acquire()
772                .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
773                .await
774                .expect("Consensus adapter does not close semaphore");
775            let _in_flight_submission_guard =
776                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
777
778            // We enter this branch when in select above await_submit completed and
779            // processed_waiter is pending This means it is time for us to
780            // submit transaction to consensus
781            let submit_inner = async {
782                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
783
784                loop {
785                    // Submit the transaction to consensus and return the submit result with a
786                    // status waiter
787                    let status_waiter = self
788                        .submit_inner(
789                            &transactions,
790                            epoch_store,
791                            &transaction_keys,
792                            tx_type,
793                            is_soft_bundle,
794                        )
795                        .await;
796
797                    match status_waiter.await {
798                        Ok(BlockStatusInternal::Sequenced) => {
799                            self.metrics
800                                .sequencing_certificate_status
801                                .with_label_values(&[tx_type, "sequenced"])
802                                .inc();
803                            // Block has been sequenced. Nothing more to do, we do have guarantees
804                            // that the transaction will appear in consensus output.
805                            trace!(
806                                "Transaction {transaction_keys:?} has been sequenced by consensus."
807                            );
808                            break;
809                        }
810                        Ok(BlockStatusInternal::GarbageCollected) => {
811                            self.metrics
812                                .sequencing_certificate_status
813                                .with_label_values(&[tx_type, "garbage_collected"])
814                                .inc();
815                            // Block has been garbage collected and we have no guarantees that the
816                            // transaction will appear in consensus output. We'll
817                            // resubmit the transaction to consensus. If the transaction has been
818                            // already "processed", then probably someone else has submitted
819                            // the transaction and managed to get sequenced. Then this future will
820                            // have been cancelled anyways so no need to check here on the processed
821                            // output.
822                            debug!(
823                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
824                            );
825                            time::sleep(RETRY_DELAY_STEP).await;
826                            continue;
827                        }
828                        Err(err) => {
829                            warn!(
830                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
831                                err
832                            );
833                            time::sleep(RETRY_DELAY_STEP).await;
834                            continue;
835                        }
836                    }
837                }
838            };
839
840            guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
841                Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
842                Either::Right(((), processed_waiter)) => {
843                    debug!("Submitted {transaction_keys:?} to consensus");
844                    processed_waiter.await
845                }
846            };
847        }
848        debug!("{transaction_keys:?} processed by consensus");
849
850        let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
851        epoch_store
852            .remove_pending_consensus_transactions(&consensus_keys)
853            .expect("Storage error when removing consensus transaction");
854
855        let is_user_tx = is_soft_bundle
856            || if epoch_store.protocol_config().enable_pcool_flow() {
857                // In the P-COOL flow, `UserTransactionV1` kind corresponds
858                // to user transactions.
859                matches!(
860                    transactions[0].kind,
861                    ConsensusTransactionKind::UserTransactionV1(_)
862                )
863            } else {
864                // In the certificate mode, `CertifiedTransaction` kind corresponds
865                // to user transactions.
866                matches!(
867                    transactions[0].kind,
868                    ConsensusTransactionKind::CertifiedTransaction(_)
869                )
870            };
871        let send_end_of_publish = if is_user_tx {
872            if epoch_store.protocol_config().enable_pcool_flow() {
873                // In the P-COOL flow, `EndOfPublish` is sent solely from
874                // `close_epoch`. There is no pending certificate drain to
875                // monitor here, and sending from this per-transaction callback
876                // would produce N duplicate EndOfPublish messages (one per
877                // in-flight user transaction completing after `RejectUserCerts`
878                // is set).
879                false
880            } else {
881                // In certificate mode, `EndOfPublish` is sent once the list of
882                // pending consensus certificates is drained. Multiple tasks can
883                // enter here concurrently with `pending_count == 0`, producing
884                // duplicate messages — this is rare and does not affect
885                // correctness.
886                //
887                // Note: there could be a race condition here where we enter
888                // this check in `RejectAllCerts` state. In that case we don't
889                // need to send `EndOfPublish` because the condition to enter
890                // `RejectAllCerts` is when 2f+1 other validators already
891                // sequenced their `EndOfPublish` message.
892                //
893                // TODO: This entire certificate-mode drain logic can be removed
894                // once the certificate flow is fully cleaned up.
895                if epoch_store
896                    .get_reconfig_state_read_lock_guard()
897                    .is_reject_user_certs()
898                {
899                    let pending_count = epoch_store.pending_consensus_certificates_count();
900                    debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
901
902                    pending_count == 0 // send end of epoch if no pending certificates
903                } else {
904                    false
905                }
906            }
907        } else {
908            false
909        };
910        if send_end_of_publish {
911            // Spawn a separate task for EndOfPublish so that
912            // submit_and_wait_inner returns promptly after the original
913            // transaction is processed. Awaiting the retry loop inline
914            // would hold the InflightDropGuard and inflate in-flight
915            // metrics for the duration of retries.
916            let adapter = self.clone();
917            let epoch_store = epoch_store.clone();
918            spawn_monitored_task!(async move {
919                if epoch_store
920                    .within_alive_epoch(adapter.submit_end_of_publish_with_retry(&epoch_store))
921                    .await
922                    .is_err()
923                {
924                    warn!(
925                        epoch = ?epoch_store.epoch(),
926                        "EndOfPublish submission cancelled: epoch has ended",
927                    );
928                }
929            });
930        }
931        self.metrics
932            .sequencing_certificate_success
933            .with_label_values(&[tx_type])
934            .inc();
935    }
936
937    async fn submit_inner(
938        self: &Arc<Self>,
939        transactions: &[ConsensusTransaction],
940        epoch_store: &Arc<AuthorityPerEpochStore>,
941        transaction_keys: &[SequencedConsensusTransactionKey],
942        tx_type: &str,
943        is_soft_bundle: bool,
944    ) -> BlockStatusReceiver {
945        let ack_start = Instant::now();
946        let mut retries: u32 = 0;
947
948        let status_waiter = loop {
949            match self
950                .consensus_client
951                .submit(transactions, epoch_store)
952                .await
953            {
954                Err(err) => {
955                    // This can happen during reconfig, or when consensus has full internal buffers
956                    // and needs to back pressure, so retry a few times before logging warnings.
957                    if retries > 30
958                        || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
959                    {
960                        warn!(
961                            "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
962                        );
963                    }
964                    self.metrics
965                        .sequencing_certificate_failures
966                        .with_label_values(&[tx_type])
967                        .inc();
968                    retries += 1;
969
970                    if !is_soft_bundle && transactions[0].kind.is_dkg() {
971                        // Shorter delay for DKG messages, which are time-sensitive and happen at
972                        // start-of-epoch when submit errors due to active reconfig are likely.
973                        time::sleep(Duration::from_millis(100)).await;
974                    } else {
975                        time::sleep(Duration::from_secs(10)).await;
976                    };
977                }
978                Ok(status_waiter) => {
979                    break status_waiter;
980                }
981            }
982        };
983
984        // we want to record the num of retries when reporting latency but to avoid
985        // label cardinality we do some simple bucketing to give us a good
986        // enough idea of how many retries happened associated with the latency.
987        let bucket = match retries {
988            0..=10 => retries.to_string(), // just report the retry count as is
989            11..=20 => "between_10_and_20".to_string(),
990            21..=50 => "between_20_and_50".to_string(),
991            51..=100 => "between_50_and_100".to_string(),
992            _ => "over_100".to_string(),
993        };
994
995        self.metrics
996            .sequencing_acknowledge_latency
997            .with_label_values(&[bucket.as_str(), tx_type])
998            .observe(ack_start.elapsed().as_secs_f64());
999
1000        status_waiter
1001    }
1002
1003    /// Waits for transactions to appear either to consensus output or been
1004    /// executed via a checkpoint (state sync).
1005    /// Returns the processed method, whether the transactions have been
1006    /// processed via consensus, or have been synced via checkpoint.
1007    async fn await_consensus_or_checkpoint(
1008        self: &Arc<Self>,
1009        transaction_keys: Vec<SequencedConsensusTransactionKey>,
1010        epoch_store: &Arc<AuthorityPerEpochStore>,
1011    ) -> ProcessedMethod {
1012        let notifications = FuturesUnordered::new();
1013        for transaction_key in transaction_keys {
1014            let transaction_digests = match transaction_key {
1015                SequencedConsensusTransactionKey::External(
1016                    ConsensusTransactionKey::Certificate(digest),
1017                )
1018                | SequencedConsensusTransactionKey::External(
1019                    ConsensusTransactionKey::UserTransaction(digest),
1020                ) => vec![digest],
1021                _ => vec![],
1022            };
1023
1024            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
1025                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
1026            ) = transaction_key
1027            {
1028                // If the transaction is a checkpoint signature, we can also wait to get
1029                // notified when a checkpoint with equal or higher sequence
1030                // number has been already synced. This way we don't try to unnecessarily
1031                // sequence the signature for an already verified checkpoint.
1032                Either::Left(
1033                    self.checkpoint_store
1034                        .notify_read_synced_checkpoint(checkpoint_sequence_number),
1035                )
1036            } else {
1037                Either::Right(future::pending())
1038            };
1039
1040            // We wait for each transaction individually to be processed by consensus,
1041            // executed in a checkpoint or dropped. We could equally just get
1042            // notified in aggregate when all transactions are processed, but
1043            // with this approach can get notified in a more fine-grained way as
1044            // transactions can be marked as processed in different ways. This
1045            // is mostly a concern for the soft-bundle transactions.
1046            let dropped_digest = transaction_digests.first().copied();
1047            notifications.push(async move {
1048                tokio::select! {
1049                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
1050                        processed.expect("Storage error when waiting for consensus message processed");
1051                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
1052                        return ProcessedMethod::Consensus;
1053                    },
1054                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1055                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
1056                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1057                    }
1058                    _ = checkpoint_synced_future => {
1059                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1060                    }
1061                    _ = async move {
1062                        if let Some(d) = dropped_digest {
1063                            let _err = epoch_store.notify_read_dropped_digests(d).await;
1064                        } else {
1065                            future::pending::<()>().await;
1066                        }
1067                    } => {
1068                        self.metrics.sequencing_certificate_processed.with_label_values(&["dropped"]).inc();
1069                        return ProcessedMethod::Dropped;
1070                    }
1071                }
1072                ProcessedMethod::Checkpoint
1073            });
1074        }
1075
1076        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1077        if processed_methods.contains(&ProcessedMethod::Dropped) {
1078            ProcessedMethod::Dropped
1079        } else if processed_methods.contains(&ProcessedMethod::Checkpoint) {
1080            ProcessedMethod::Checkpoint
1081        } else {
1082            ProcessedMethod::Consensus
1083        }
1084    }
1085
1086    /// Submits an `EndOfPublish` message to consensus with exponential
1087    /// backoff (capped at `MAX_BACKOFF`). Retries indefinitely on any
1088    /// error — both transient failures (e.g. DB write errors in
1089    /// `insert_pending_consensus_transactions`) and permanent ones (e.g.
1090    /// `EpochEnded` from `tables()`). A missing `EndOfPublish` would
1091    /// stall the epoch, so the loop never gives up on its own.
1092    ///
1093    /// Callers **must** wrap this with `epoch_store.within_alive_epoch()`
1094    /// to cancel retries when the epoch terminates — this is the
1095    /// mechanism that stops the loop on permanent `EpochEnded` errors.
1096    async fn submit_end_of_publish_with_retry(
1097        self: &Arc<Self>,
1098        epoch_store: &Arc<AuthorityPerEpochStore>,
1099    ) {
1100        const INITIAL_BACKOFF: Duration = Duration::from_millis(100);
1101        const MAX_BACKOFF: Duration = Duration::from_secs(10);
1102
1103        info!(
1104            epoch = ?epoch_store.epoch(),
1105            authority = ?self.authority,
1106            "Sending EndOfPublish message to consensus",
1107        );
1108
1109        let mut attempt: u32 = 0;
1110        loop {
1111            match self.submit(
1112                ConsensusTransaction::new_end_of_publish(self.authority),
1113                None,
1114                epoch_store,
1115            ) {
1116                Ok(_) => return,
1117                Err(IotaError::EpochEnded(_)) => {
1118                    warn!(
1119                        epoch = ?epoch_store.epoch(),
1120                        authority = ?self.authority,
1121                        "EndOfPublish submission stopped: epoch has ended",
1122                    );
1123                    return;
1124                }
1125                Err(err) => {
1126                    let backoff = (INITIAL_BACKOFF * 2u32.pow(attempt.min(10))).min(MAX_BACKOFF);
1127                    warn!(
1128                        epoch = ?epoch_store.epoch(),
1129                        authority = ?self.authority,
1130                        attempt,
1131                        "Failed to submit EndOfPublish, retrying in {:?}: {:?}",
1132                        backoff,
1133                        err,
1134                    );
1135                    tokio::time::sleep(backoff).await;
1136                    attempt = attempt.saturating_add(1);
1137                }
1138            }
1139        }
1140    }
1141}
1142
1143impl CheckConnection for ConnectionMonitorStatus {
1144    fn check_connection(
1145        &self,
1146        ourself: &AuthorityName,
1147        authority: &AuthorityName,
1148    ) -> Option<ConnectionStatus> {
1149        if ourself == authority {
1150            return Some(ConnectionStatus::Connected);
1151        }
1152
1153        let mapping = self.authority_names_to_peer_ids.load_full();
1154        let peer_id = match mapping.get(authority) {
1155            Some(p) => p,
1156            None => {
1157                warn!(
1158                    "failed to find peer {:?} in connection monitor listener",
1159                    authority
1160                );
1161                return None;
1162            }
1163        };
1164
1165        let res = match self.connection_statuses.try_get(peer_id) {
1166            TryResult::Present(c) => Some(c.value().clone()),
1167            TryResult::Absent => None,
1168            TryResult::Locked => {
1169                // update is in progress, assume the status is still or becoming disconnected
1170                Some(ConnectionStatus::Disconnected)
1171            }
1172        };
1173        res
1174    }
1175    fn update_mapping_for_epoch(
1176        &self,
1177        authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1178    ) {
1179        self.authority_names_to_peer_ids
1180            .swap(Arc::new(authority_names_to_peer_ids));
1181    }
1182}
1183
1184impl CheckConnection for ConnectionMonitorStatusForTests {
1185    fn check_connection(
1186        &self,
1187        _ourself: &AuthorityName,
1188        _authority: &AuthorityName,
1189    ) -> Option<ConnectionStatus> {
1190        Some(ConnectionStatus::Connected)
1191    }
1192    fn update_mapping_for_epoch(
1193        &self,
1194        _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1195    ) {
1196    }
1197}
1198
1199pub fn get_position_in_list(
1200    search_authority: AuthorityName,
1201    positions: Vec<AuthorityName>,
1202) -> usize {
1203    positions
1204        .into_iter()
1205        .find_position(|authority| *authority == search_authority)
1206        .expect("Couldn't find ourselves in shuffled committee")
1207        .0
1208}
1209
1210impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1211    /// This method is called externally to begin reconfiguration.
1212    /// It transitions the reconfig state to reject new user transactions.
1213    /// `ConsensusAdapter` will send `EndOfPublish` once all pending
1214    /// transactions are drained (in the certificate mode) or right away
1215    /// (in the P-COOL flow). Submission is asynchronous —
1216    /// a background task handles retries so this method returns promptly.
1217    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1218        let send_end_of_publish = {
1219            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1220            if !reconfig_guard.should_accept_user_certs() {
1221                // Allow caller to call this method multiple times
1222                return;
1223            }
1224
1225            let send_end_of_publish = if epoch_store.protocol_config().enable_pcool_flow() {
1226                // In the P-COOL flow, there are no pending consensus
1227                // certificates, so `EndOfPublish` is always sent immediately.
1228                debug!(epoch=?epoch_store.epoch(), "Closing epoch in P-COOL mode");
1229
1230                true
1231            } else {
1232                // In certificate mode, `EndOfPublish` is sent only once the list
1233                // of pending consensus certificates is drained.
1234                let pending_count = epoch_store.pending_consensus_certificates_count();
1235                debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1236
1237                pending_count == 0 // send end of epoch if no pending certificates
1238            };
1239
1240            epoch_store.close_user_certs(reconfig_guard);
1241
1242            send_end_of_publish
1243            // reconfig_guard lock is dropped here.
1244        };
1245
1246        if send_end_of_publish {
1247            // Spawned because ReconfigurationInitiator::close_epoch is
1248            // sync — it cannot await. This is safe: by this point
1249            // close_user_certs() has already set the reconfig state to
1250            // reject new transactions, so no further user work depends
1251            // on this method returning. The background task retries
1252            // until the message is delivered or the epoch terminates.
1253            let adapter = self.clone();
1254            let epoch_store = epoch_store.clone();
1255            spawn_monitored_task!(async move {
1256                if epoch_store
1257                    .within_alive_epoch(adapter.submit_end_of_publish_with_retry(&epoch_store))
1258                    .await
1259                    .is_err()
1260                {
1261                    warn!(
1262                        epoch = ?epoch_store.epoch(),
1263                        "EndOfPublish submission cancelled: epoch has ended",
1264                    );
1265                }
1266            });
1267        }
1268    }
1269}
1270
1271struct CancelOnDrop<T>(JoinHandle<T>);
1272
1273impl<T> Deref for CancelOnDrop<T> {
1274    type Target = JoinHandle<T>;
1275
1276    fn deref(&self) -> &Self::Target {
1277        &self.0
1278    }
1279}
1280
1281impl<T> Drop for CancelOnDrop<T> {
1282    fn drop(&mut self) {
1283        self.0.abort();
1284    }
1285}
1286
1287/// Tracks number of inflight consensus requests and relevant metrics
1288struct InflightDropGuard<'a> {
1289    adapter: &'a ConsensusAdapter,
1290    start: Instant,
1291    position: Option<usize>,
1292    positions_moved: Option<usize>,
1293    preceding_disconnected: Option<usize>,
1294    tx_type: &'static str,
1295    processed_method: ProcessedMethod,
1296}
1297
1298#[derive(PartialEq, Eq, Copy, Clone)]
1299enum ProcessedMethod {
1300    Consensus,
1301    Checkpoint,
1302    Dropped,
1303}
1304
1305impl<'a> InflightDropGuard<'a> {
1306    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1307        adapter
1308            .num_inflight_transactions
1309            .fetch_add(1, Ordering::SeqCst);
1310        adapter
1311            .metrics
1312            .sequencing_certificate_inflight
1313            .with_label_values(&[tx_type])
1314            .inc();
1315        adapter
1316            .metrics
1317            .sequencing_certificate_attempt
1318            .with_label_values(&[tx_type])
1319            .inc();
1320        Self {
1321            adapter,
1322            start: Instant::now(),
1323            position: None,
1324            positions_moved: None,
1325            preceding_disconnected: None,
1326            tx_type,
1327            processed_method: ProcessedMethod::Consensus,
1328        }
1329    }
1330}
1331
1332impl Drop for InflightDropGuard<'_> {
1333    fn drop(&mut self) {
1334        self.adapter
1335            .num_inflight_transactions
1336            .fetch_sub(1, Ordering::SeqCst);
1337        self.adapter
1338            .metrics
1339            .sequencing_certificate_inflight
1340            .with_label_values(&[self.tx_type])
1341            .dec();
1342
1343        let position = if let Some(position) = self.position {
1344            self.adapter
1345                .metrics
1346                .sequencing_certificate_authority_position
1347                .observe(position as f64);
1348            position.to_string()
1349        } else {
1350            "not_submitted".to_string()
1351        };
1352
1353        if let Some(positions_moved) = self.positions_moved {
1354            self.adapter
1355                .metrics
1356                .sequencing_certificate_positions_moved
1357                .observe(positions_moved as f64);
1358        };
1359
1360        if let Some(preceding_disconnected) = self.preceding_disconnected {
1361            self.adapter
1362                .metrics
1363                .sequencing_certificate_preceding_disconnected
1364                .observe(preceding_disconnected as f64);
1365        };
1366
1367        let latency = self.start.elapsed();
1368        let processed_method = match self.processed_method {
1369            ProcessedMethod::Consensus => "processed_via_consensus",
1370            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1371            ProcessedMethod::Dropped => "dropped",
1372        };
1373        self.adapter
1374            .metrics
1375            .sequencing_certificate_latency
1376            .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1377            .observe(latency.as_secs_f64());
1378
1379        // Only sample latency after consensus quorum is up. Otherwise, the wait for
1380        // consensus quorum at the beginning of an epoch can distort the sampled
1381        // latencies. Technically there are more system transaction types that
1382        // can be included in samples after the first consensus commit, but this
1383        // set of types should be enough.
1384        if self.position == Some(0) {
1385            // Transaction types below require quorum existed in the current epoch.
1386            // TODO: refactor tx_type to enum.
1387            let sampled = matches!(
1388                self.tx_type,
1389                "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1390            );
1391            // Exclude checkpoint-synced and dropped txs from the latency observer:
1392            // their latency reflects state-sync timing or rejection, not consensus
1393            // throughput, and would skew the observed consensus latency.
1394            if sampled && self.processed_method == ProcessedMethod::Consensus {
1395                self.adapter.latency_observer.report(latency);
1396            }
1397        }
1398    }
1399}
1400
1401impl SubmitToConsensus for Arc<ConsensusAdapter> {
1402    fn submit_to_consensus(
1403        &self,
1404        transactions: &[ConsensusTransaction],
1405        epoch_store: &Arc<AuthorityPerEpochStore>,
1406    ) -> IotaResult {
1407        self.submit_batch(transactions, None, epoch_store)
1408            .map(|_| ())
1409    }
1410}
1411
1412pub fn position_submit_certificate(
1413    committee: &Committee,
1414    ourselves: &AuthorityName,
1415    tx_digest: &TransactionDigest,
1416) -> usize {
1417    let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1418    get_position_in_list(*ourselves, validators)
1419}
1420
1421#[cfg(test)]
1422mod adapter_tests {
1423    use std::{sync::Arc, time::Duration};
1424
1425    use fastcrypto::traits::KeyPair;
1426    use iota_types::{
1427        base_types::TransactionDigest,
1428        committee::Committee,
1429        crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1430    };
1431    use rand::{Rng, SeedableRng, rngs::StdRng};
1432
1433    use super::position_submit_certificate;
1434    use crate::{
1435        checkpoints::CheckpointStore,
1436        consensus_adapter::{
1437            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1438        },
1439        starfish_adapter::LazyStarfishClient,
1440    };
1441
1442    fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1443        let authorities = (0..size)
1444            .map(|_k| {
1445                (
1446                    AuthorityPublicKeyBytes::from(
1447                        get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1448                    ),
1449                    rng.gen_range(0u64..10u64),
1450                )
1451            })
1452            .collect::<Vec<_>>();
1453        Committee::new_for_testing_with_normalized_voting_power(
1454            0,
1455            authorities.iter().cloned().collect(),
1456        )
1457    }
1458
1459    #[tokio::test]
1460    async fn test_await_submit_delay_user_transaction() {
1461        // grab a random committee and a random stake distribution
1462        let mut rng = StdRng::from_seed([0; 32]);
1463        let committee = test_committee(&mut rng, 10);
1464
1465        // When we define max submit position and delay step
1466        let consensus_adapter = ConsensusAdapter::new(
1467            Arc::new(LazyStarfishClient::new()),
1468            CheckpointStore::new_for_tests(),
1469            *committee.authority_by_index(0).unwrap(),
1470            Arc::new(ConnectionMonitorStatusForTests {}),
1471            100_000,
1472            100_000,
1473            Some(1),
1474            Some(Duration::from_secs(2)),
1475            ConsensusAdapterMetrics::new_test(),
1476        );
1477
1478        // transaction to submit
1479        let tx_digest = TransactionDigest::generate(&mut rng);
1480
1481        // Ensure that the original position is higher
1482        let (position, positions_moved, _) =
1483            consensus_adapter.submission_position(&committee, &tx_digest);
1484        assert_eq!(position, 7);
1485        assert!(!positions_moved > 0);
1486
1487        // Make sure that position is set to max value 0
1488        let (delay_step, position, positions_moved, _) =
1489            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1490
1491        assert_eq!(position, 1);
1492        assert_eq!(delay_step, Duration::from_secs(2));
1493        assert!(!positions_moved > 0);
1494
1495        // Without submit position and delay step
1496        let consensus_adapter = ConsensusAdapter::new(
1497            Arc::new(LazyStarfishClient::new()),
1498            CheckpointStore::new_for_tests(),
1499            *committee.authority_by_index(0).unwrap(),
1500            Arc::new(ConnectionMonitorStatusForTests {}),
1501            100_000,
1502            100_000,
1503            None,
1504            None,
1505            ConsensusAdapterMetrics::new_test(),
1506        );
1507
1508        let (delay_step, position, positions_moved, _) =
1509            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1510
1511        assert_eq!(position, 7);
1512
1513        // delay_step * position * 2 = 1 * 7 * 2 = 14
1514        assert_eq!(delay_step, Duration::from_secs(14));
1515        assert!(!positions_moved > 0);
1516    }
1517
1518    #[test]
1519    fn test_position_submit_certificate() {
1520        // grab a random committee and a random stake distribution
1521        let mut rng = StdRng::from_seed([0; 32]);
1522        let committee = test_committee(&mut rng, 10);
1523
1524        // generate random transaction digests, and account for validator selection
1525        const NUM_TEST_TRANSACTIONS: usize = 1000;
1526
1527        for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1528            let tx_digest = TransactionDigest::generate(&mut rng);
1529
1530            let mut zero_found = false;
1531            for (name, _) in committee.members() {
1532                let f = position_submit_certificate(&committee, name, &tx_digest);
1533                assert!(f < committee.num_members());
1534                if f == 0 {
1535                    // One and only one validator gets position 0
1536                    assert!(!zero_found);
1537                    zero_found = true;
1538                }
1539            }
1540            assert!(zero_found);
1541        }
1542    }
1543}