iota_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    future::Future,
8    ops::Deref,
9    sync::{
10        Arc,
11        atomic::{AtomicU64, Ordering},
12    },
13    time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use dashmap::{DashMap, try_result::TryResult};
18use futures::{
19    FutureExt, StreamExt,
20    future::{self, Either, select},
21    pin_mut,
22    stream::FuturesUnordered,
23};
24use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task};
25use iota_simulator::anemo::PeerId;
26use iota_types::{
27    base_types::{AuthorityName, TransactionDigest},
28    committee::Committee,
29    error::{IotaError, IotaResult},
30    fp_ensure,
31    messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
32};
33use itertools::Itertools;
34use parking_lot::RwLockReadGuard;
35use prometheus::{
36    Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
37    register_histogram_vec_with_registry, register_histogram_with_registry,
38    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
39    register_int_gauge_with_registry,
40};
41use tokio::{
42    sync::{Semaphore, SemaphorePermit, oneshot},
43    task::JoinHandle,
44    time::{
45        Duration, {self},
46    },
47};
48use tracing::{debug, info, trace, warn};
49
50use crate::{
51    authority::authority_per_epoch_store::AuthorityPerEpochStore,
52    checkpoints::CheckpointStore,
53    connection_monitor::ConnectionStatus,
54    consensus_handler::{SequencedConsensusTransactionKey, classify},
55    epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
56    metrics::LatencyObserver,
57};
58
59#[cfg(test)]
60#[path = "unit_tests/consensus_tests.rs"]
61pub mod consensus_tests;
62
63const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
64    0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
65];
66
67pub struct ConsensusAdapterMetrics {
68    // Certificate sequencing metrics
69    pub sequencing_certificate_attempt: IntCounterVec,
70    pub sequencing_certificate_success: IntCounterVec,
71    pub sequencing_certificate_failures: IntCounterVec,
72    pub sequencing_certificate_status: IntCounterVec,
73    pub sequencing_certificate_inflight: IntGaugeVec,
74    pub sequencing_acknowledge_latency: HistogramVec,
75    pub sequencing_certificate_latency: HistogramVec,
76    pub sequencing_certificate_authority_position: Histogram,
77    pub sequencing_certificate_positions_moved: Histogram,
78    pub sequencing_certificate_preceding_disconnected: Histogram,
79    pub sequencing_certificate_processed: IntCounterVec,
80    pub sequencing_in_flight_semaphore_wait: IntGauge,
81    pub sequencing_in_flight_submissions: IntGauge,
82    pub sequencing_estimated_latency: IntGauge,
83    pub sequencing_resubmission_interval_ms: IntGauge,
84}
85
86impl ConsensusAdapterMetrics {
87    pub fn new(registry: &Registry) -> Self {
88        Self {
89            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
90                "sequencing_certificate_attempt",
91                "Counts the number of certificates the validator attempts to sequence.",
92                &["tx_type"],
93                registry,
94            )
95            .unwrap(),
96            sequencing_certificate_success: register_int_counter_vec_with_registry!(
97                "sequencing_certificate_success",
98                "Counts the number of successfully sequenced certificates.",
99                &["tx_type"],
100                registry,
101            )
102            .unwrap(),
103            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
104                "sequencing_certificate_failures",
105                "Counts the number of sequenced certificates that failed other than by timeout.",
106                &["tx_type"],
107                registry,
108            )
109            .unwrap(),
110            sequencing_certificate_status: register_int_counter_vec_with_registry!(
111                "sequencing_certificate_status",
112                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
113                &["tx_type", "status"],
114                registry,
115            )
116            .unwrap(),
117            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
118                "sequencing_certificate_inflight",
119                "The inflight requests to sequence certificates.",
120                &["tx_type"],
121                registry,
122            )
123            .unwrap(),
124            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
125                "sequencing_acknowledge_latency",
126                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
127                &["retry", "tx_type"],
128                LATENCY_SEC_BUCKETS.to_vec(),
129                registry,
130            )
131            .unwrap(),
132            sequencing_certificate_latency: register_histogram_vec_with_registry!(
133                "sequencing_certificate_latency",
134                "The latency for sequencing a certificate.",
135                &["position", "tx_type", "processed_method"],
136                LATENCY_SEC_BUCKETS.to_vec(),
137                registry,
138            )
139            .unwrap(),
140            sequencing_certificate_authority_position: register_histogram_with_registry!(
141                "sequencing_certificate_authority_position",
142                "The position of the authority when submitted a certificate to consensus.",
143                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
144                registry,
145            )
146            .unwrap(),
147            sequencing_certificate_positions_moved: register_histogram_with_registry!(
148                "sequencing_certificate_positions_moved",
149                "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
150                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
151                registry,
152            )
153            .unwrap(),
154            sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155                "sequencing_certificate_preceding_disconnected",
156                "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158                registry,
159            )
160            .unwrap(),
161            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162                "sequencing_certificate_processed",
163                "The number of certificates that have been processed either by consensus or checkpoint.",
164                &["source"],
165                registry
166            )
167            .unwrap(),
168            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169                "sequencing_in_flight_semaphore_wait",
170                "How many requests are blocked on submit_permit.",
171                registry,
172            )
173            .unwrap(),
174            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175                "sequencing_in_flight_submissions",
176                "Number of transactions submitted to local consensus instance and not yet sequenced",
177                registry,
178            )
179            .unwrap(),
180            sequencing_estimated_latency: register_int_gauge_with_registry!(
181                "sequencing_estimated_latency",
182                "Consensus latency estimated by consensus adapter in milliseconds",
183                registry,
184            )
185            .unwrap(),
186            sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187                "sequencing_resubmission_interval_ms",
188                "Resubmission interval used by consensus adapter in milliseconds",
189                registry,
190            )
191            .unwrap(),
192        }
193    }
194
195    pub fn new_test() -> Self {
196        Self::new(&Registry::default())
197    }
198}
199
200/// Block status for internal use in the consensus adapter that serves for both
201/// Mysticeti and Starfish.
202pub enum BlockStatusInternal {
203    Sequenced,
204    GarbageCollected,
205}
206
207impl From<consensus_core::BlockStatus> for BlockStatusInternal {
208    fn from(status: consensus_core::BlockStatus) -> Self {
209        match status {
210            consensus_core::BlockStatus::Sequenced(_) => BlockStatusInternal::Sequenced,
211            consensus_core::BlockStatus::GarbageCollected(_) => {
212                BlockStatusInternal::GarbageCollected
213            }
214        }
215    }
216}
217impl From<starfish_core::BlockStatus> for BlockStatusInternal {
218    fn from(status: starfish_core::BlockStatus) -> Self {
219        match status {
220            starfish_core::BlockStatus::Sequenced(_) => BlockStatusInternal::Sequenced,
221            starfish_core::BlockStatus::GarbageCollected(_) => {
222                BlockStatusInternal::GarbageCollected
223            }
224        }
225    }
226}
227
228pub type BlockStatusReceiver = oneshot::Receiver<BlockStatusInternal>;
229
230#[mockall::automock]
231pub trait SubmitToConsensus: Sync + Send + 'static {
232    fn submit_to_consensus(
233        &self,
234        transactions: &[ConsensusTransaction],
235        epoch_store: &Arc<AuthorityPerEpochStore>,
236    ) -> IotaResult;
237}
238
239#[mockall::automock]
240#[async_trait::async_trait]
241pub trait ConsensusClient: Sync + Send + 'static {
242    async fn submit(
243        &self,
244        transactions: &[ConsensusTransaction],
245        epoch_store: &Arc<AuthorityPerEpochStore>,
246    ) -> IotaResult<BlockStatusReceiver>;
247}
248
249/// Submit IOTA certificates to the consensus.
250pub struct ConsensusAdapter {
251    /// The network client connecting to the consensus node of this authority.
252    consensus_client: Arc<dyn ConsensusClient>,
253    /// The checkpoint store for the validator
254    checkpoint_store: Arc<CheckpointStore>,
255    /// Authority pubkey.
256    authority: AuthorityName,
257    /// The limit to number of inflight transactions at this node.
258    max_pending_transactions: usize,
259    /// Number of submitted transactions still inflight at this node.
260    num_inflight_transactions: AtomicU64,
261    /// Dictates the maximum position  from which will submit to consensus. Even
262    /// if the is elected to submit from a higher position than this, it
263    /// will "reset" to the max_submit_position.
264    max_submit_position: Option<usize>,
265    /// When provided it will override the current back off logic and will use
266    /// this value instead as delay step.
267    submit_delay_step_override: Option<Duration>,
268    /// A structure to check the connection statuses populated by the Connection
269    /// Monitor Listener
270    connection_monitor_status: Arc<dyn CheckConnection>,
271    /// A structure to check the reputation scores populated by Consensus
272    low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
273    /// A structure to register metrics
274    metrics: ConsensusAdapterMetrics,
275    /// Semaphore limiting parallel submissions to consensus
276    submit_semaphore: Semaphore,
277    latency_observer: LatencyObserver,
278}
279
280pub trait CheckConnection: Send + Sync {
281    fn check_connection(
282        &self,
283        ourself: &AuthorityName,
284        authority: &AuthorityName,
285    ) -> Option<ConnectionStatus>;
286    fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
287}
288
289pub struct ConnectionMonitorStatus {
290    /// Current connection statuses forwarded from the connection monitor
291    pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
292    /// A map from authority name to peer id
293    pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
294}
295
296pub struct ConnectionMonitorStatusForTests {}
297
298impl ConsensusAdapter {
299    /// Make a new Consensus adapter instance.
300    pub fn new(
301        consensus_client: Arc<dyn ConsensusClient>,
302        checkpoint_store: Arc<CheckpointStore>,
303        authority: AuthorityName,
304        connection_monitor_status: Arc<dyn CheckConnection>,
305        max_pending_transactions: usize,
306        max_pending_local_submissions: usize,
307        max_submit_position: Option<usize>,
308        submit_delay_step_override: Option<Duration>,
309        metrics: ConsensusAdapterMetrics,
310    ) -> Self {
311        let num_inflight_transactions = Default::default();
312        let low_scoring_authorities =
313            ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
314        Self {
315            consensus_client,
316            checkpoint_store,
317            authority,
318            max_pending_transactions,
319            max_submit_position,
320            submit_delay_step_override,
321            num_inflight_transactions,
322            connection_monitor_status,
323            low_scoring_authorities,
324            metrics,
325            submit_semaphore: Semaphore::new(max_pending_local_submissions),
326            latency_observer: LatencyObserver::new(),
327        }
328    }
329
330    pub fn swap_low_scoring_authorities(
331        &self,
332        new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
333    ) {
334        self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
335    }
336
337    pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
338        // Currently consensus worker might lose transactions on restart, so we need to
339        // resend them.
340        // TODO: get_all_pending_consensus_transactions is called
341        // twice when initializing AuthorityPerEpochStore and here, should not
342        // be a big deal but can be optimized
343        let mut recovered = epoch_store.get_all_pending_consensus_transactions();
344
345        #[expect(clippy::collapsible_if)] // This if can be collapsed but it will be ugly
346        if epoch_store
347            .get_reconfig_state_read_lock_guard()
348            .is_reject_user_certs()
349            && epoch_store.pending_consensus_certificates_empty()
350        {
351            if recovered
352                .iter()
353                .any(ConsensusTransaction::is_end_of_publish)
354            {
355                // There are two cases when this is needed
356                // (1) We send EndOfPublish message after removing pending certificates in
357                // submit_and_wait_inner It is possible that node will crash
358                // between those two steps, in which case we might need to
359                // re-introduce EndOfPublish message on restart
360                // (2) If node crashed inside ConsensusAdapter::close_epoch,
361                // after reconfig lock state was written to DB and before we persisted
362                // EndOfPublish message
363                recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
364            }
365        }
366        debug!(
367            "Submitting {:?} recovered pending consensus transactions to consensus",
368            recovered.len()
369        );
370        for transaction in recovered {
371            if transaction.is_end_of_publish() {
372                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
373            }
374            self.submit_unchecked(&[transaction], epoch_store);
375        }
376    }
377
378    fn await_submit_delay(
379        &self,
380        committee: &Committee,
381        transactions: &[ConsensusTransaction],
382    ) -> (impl Future<Output = ()>, usize, usize, usize) {
383        // Use the minimum digest to compute submit delay.
384        let min_digest = transactions
385            .iter()
386            .filter_map(|tx| match &tx.kind {
387                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
388                    Some(certificate.digest())
389                }
390                _ => None,
391            })
392            .min();
393
394        let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
395            Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
396            _ => (Duration::ZERO, 0, 0, 0),
397        };
398        (
399            tokio::time::sleep(duration),
400            position,
401            positions_moved,
402            preceding_disconnected,
403        )
404    }
405
406    fn await_submit_delay_user_transaction(
407        &self,
408        committee: &Committee,
409        tx_digest: &TransactionDigest,
410    ) -> (Duration, usize, usize, usize) {
411        let (position, positions_moved, preceding_disconnected) =
412            self.submission_position(committee, tx_digest);
413
414        const DEFAULT_LATENCY: Duration = Duration::from_secs(1); // > p50 consensus latency with global deployment
415        const MIN_LATENCY: Duration = Duration::from_millis(150);
416        const MAX_LATENCY: Duration = Duration::from_secs(3);
417
418        let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
419        self.metrics
420            .sequencing_estimated_latency
421            .set(latency.as_millis() as i64);
422
423        let latency = std::cmp::max(latency, MIN_LATENCY);
424        let latency = std::cmp::min(latency, MAX_LATENCY);
425        let latency = latency * 2;
426        let (delay_step, position) =
427            self.override_by_max_submit_position_settings(latency, position);
428
429        self.metrics
430            .sequencing_resubmission_interval_ms
431            .set(delay_step.as_millis() as i64);
432
433        (
434            delay_step * position as u32,
435            position,
436            positions_moved,
437            preceding_disconnected,
438        )
439    }
440
441    /// Overrides the latency and the position if there are defined settings for
442    /// `max_submit_position` and `submit_delay_step_override`. If the
443    /// `max_submit_position` has defined, then that will always be used
444    /// irrespective of any so far decision. Same for the
445    /// `submit_delay_step_override`.
446    fn override_by_max_submit_position_settings(
447        &self,
448        latency: Duration,
449        mut position: usize,
450    ) -> (Duration, usize) {
451        // Respect any manual override for position and latency from the settings
452        if let Some(max_submit_position) = self.max_submit_position {
453            position = std::cmp::min(position, max_submit_position);
454        }
455
456        let delay_step = self.submit_delay_step_override.unwrap_or(latency);
457        (delay_step, position)
458    }
459
460    /// Check when this authority should submit the certificate to consensus.
461    /// This sorts all authorities based on pseudo-random distribution derived
462    /// from transaction hash.
463    ///
464    /// The function targets having 1 consensus transaction submitted per user
465    /// transaction when system operates normally.
466    ///
467    /// The function returns the position of this authority when it is their
468    /// turn to submit the transaction to consensus.
469    fn submission_position(
470        &self,
471        committee: &Committee,
472        tx_digest: &TransactionDigest,
473    ) -> (usize, usize, usize) {
474        let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
475
476        self.check_submission_wrt_connectivity_and_scores(positions)
477    }
478
479    /// This function runs the following algorithm to decide whether or not to
480    /// submit a transaction to consensus.
481    ///
482    /// It takes in a deterministic list that represents positions of all the
483    /// authorities. The authority in the first position will be responsible
484    /// for submitting to consensus, and so we check if we are this
485    /// validator, and if so, return true.
486    ///
487    /// If we are not in that position, we check our connectivity to the
488    /// authority in that position. If we are connected to them, we can
489    /// assume that they are operational and will submit the transaction. If
490    /// we are not connected to them, we assume that they are not operational
491    /// and we will not rely on that authority to submit the transaction. So
492    /// we shift them out of the first position, and run this algorithm
493    /// again on the new set of positions.
494    ///
495    /// This can possibly result in a transaction being submitted twice if an
496    /// authority sees a false negative in connectivity to another, such as
497    /// in the case of a network partition.
498    ///
499    /// Recursively, if the authority further ahead of us in the positions is a
500    /// low performing authority, we will move our positions up one, and
501    /// submit the transaction. This allows maintaining performance overall.
502    /// We will only do this part for authorities that are not low performers
503    /// themselves to prevent extra amplification in the case that the
504    /// positions look like [low_scoring_a1, low_scoring_a2, a3]
505    fn check_submission_wrt_connectivity_and_scores(
506        &self,
507        positions: Vec<AuthorityName>,
508    ) -> (usize, usize, usize) {
509        let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
510        if low_scoring_authorities.get(&self.authority).is_some() {
511            return (positions.len(), 0, 0);
512        }
513        let initial_position = get_position_in_list(self.authority, positions.clone());
514        let mut preceding_disconnected = 0;
515        let mut before_our_position = true;
516
517        let filtered_positions: Vec<_> = positions
518            .into_iter()
519            .filter(|authority| {
520                let keep = self.authority == *authority; // don't filter ourself out
521                if keep {
522                    before_our_position = false;
523                }
524
525                // filter out any nodes that appear disconnected
526                let connected = self
527                    .connection_monitor_status
528                    .check_connection(&self.authority, authority)
529                    .unwrap_or(ConnectionStatus::Disconnected)
530                    == ConnectionStatus::Connected;
531                if !connected && before_our_position {
532                    preceding_disconnected += 1; // used for metrics
533                }
534
535                // Filter out low scoring nodes
536                let high_scoring = low_scoring_authorities.get(authority).is_none();
537
538                keep || (connected && high_scoring)
539            })
540            .collect();
541
542        let position = get_position_in_list(self.authority, filtered_positions);
543
544        (
545            position,
546            initial_position - position,
547            preceding_disconnected,
548        )
549    }
550
551    /// This method blocks until transaction is persisted in local database
552    /// It then returns handle to async task, user can join this handle to await
553    /// while transaction is processed by consensus
554    ///
555    /// This method guarantees that once submit(but not returned async handle)
556    /// returns, transaction is persisted and will eventually be sent to
557    /// consensus even after restart
558    ///
559    /// When submitting a certificate caller **must** provide a ReconfigState
560    /// lock guard
561    pub fn submit(
562        self: &Arc<Self>,
563        transaction: ConsensusTransaction,
564        lock: Option<&RwLockReadGuard<ReconfigState>>,
565        epoch_store: &Arc<AuthorityPerEpochStore>,
566    ) -> IotaResult<JoinHandle<()>> {
567        self.submit_batch(&[transaction], lock, epoch_store)
568    }
569
570    pub fn submit_batch(
571        self: &Arc<Self>,
572        transactions: &[ConsensusTransaction],
573        lock: Option<&RwLockReadGuard<ReconfigState>>,
574        epoch_store: &Arc<AuthorityPerEpochStore>,
575    ) -> IotaResult<JoinHandle<()>> {
576        if transactions.len() > 1 {
577            // In soft bundle, we need to check if all transactions are of UserTransaction
578            // kind. The check is required because we assume this in
579            // submit_and_wait_inner.
580            for transaction in transactions {
581                fp_ensure!(
582                    matches!(
583                        transaction.kind,
584                        ConsensusTransactionKind::CertifiedTransaction(_)
585                    ),
586                    IotaError::InvalidTxKindInSoftBundle
587                );
588            }
589        }
590
591        epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
592        Ok(self.submit_unchecked(transactions, epoch_store))
593    }
594
595    /// Performs weakly consistent checks on internal buffers to quickly
596    /// discard transactions if we are overloaded
597    pub fn check_limits(&self) -> bool {
598        // First check total transactions (waiting and in submission)
599        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
600            > self.max_pending_transactions
601        {
602            return false;
603        }
604        // Then check if submit_semaphore has permits
605        self.submit_semaphore.available_permits() > 0
606    }
607
608    pub(crate) fn check_consensus_overload(&self) -> IotaResult {
609        fp_ensure!(
610            self.check_limits(),
611            IotaError::TooManyTransactionsPendingConsensus
612        );
613        Ok(())
614    }
615
616    fn submit_unchecked(
617        self: &Arc<Self>,
618        transactions: &[ConsensusTransaction],
619        epoch_store: &Arc<AuthorityPerEpochStore>,
620    ) -> JoinHandle<()> {
621        // Reconfiguration lock is dropped when pending_consensus_transactions is
622        // persisted, before it is handled by consensus
623        let async_stage = self
624            .clone()
625            .submit_and_wait(transactions.to_vec(), epoch_store.clone());
626        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
627        // (Limit is not applied atomically, and only to user transactions.)
628        let join_handle = spawn_monitored_task!(async_stage);
629        join_handle
630    }
631
632    async fn submit_and_wait(
633        self: Arc<Self>,
634        transactions: Vec<ConsensusTransaction>,
635        epoch_store: Arc<AuthorityPerEpochStore>,
636    ) {
637        // When epoch_terminated signal is received all pending submit_and_wait_inner
638        // are dropped.
639        //
640        // This is needed because submit_and_wait_inner waits on read_notify for
641        // consensus message to be processed, which may never happen on epoch
642        // boundary.
643        //
644        // In addition to that, within_alive_epoch ensures that all pending consensus
645        // adapter tasks are stopped before reconfiguration can proceed.
646        //
647        // This is essential because consensus workers reuse same ports when consensus
648        // restarts, this means we might be sending transactions from previous
649        // epochs to consensus of new epoch if we have not had this barrier.
650        epoch_store
651            .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
652            .await
653            .ok(); // result here indicates if epoch ended earlier, we don't
654        // care about it
655    }
656
657    async fn submit_and_wait_inner(
658        self: Arc<Self>,
659        transactions: Vec<ConsensusTransaction>,
660        epoch_store: &Arc<AuthorityPerEpochStore>,
661    ) {
662        if transactions.is_empty() {
663            return;
664        }
665
666        // Current code path ensures:
667        // - If transactions.len() > 1, it is a soft bundle. Otherwise transactions
668        //   should have been submitted individually.
669        // - If is_soft_bundle, then all transactions are of UserTransaction kind.
670        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and
671        //   transactions[0] can be of any kind.
672        let is_soft_bundle = transactions.len() > 1;
673
674        let mut transaction_keys = Vec::new();
675
676        for transaction in &transactions {
677            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
678                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
679                epoch_store.record_epoch_pending_certs_process_time_metric();
680            }
681
682            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
683            transaction_keys.push(transaction_key);
684        }
685        let tx_type = if !is_soft_bundle {
686            classify(&transactions[0])
687        } else {
688            "soft_bundle"
689        };
690
691        let mut guard = InflightDropGuard::acquire(&self, tx_type);
692
693        // Create the waiter until the node's turn comes to submit to consensus
694        let (await_submit, position, positions_moved, preceding_disconnected) =
695            self.await_submit_delay(epoch_store.committee(), &transactions[..]);
696
697        // Create the waiter until the transaction is processed by consensus or via
698        // checkpoint
699        let processed_via_consensus_or_checkpoint =
700            self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
701        pin_mut!(processed_via_consensus_or_checkpoint);
702
703        let processed_waiter = tokio::select! {
704            // We need to wait for some delay until we submit transaction to the consensus
705            _ = await_submit => Some(processed_via_consensus_or_checkpoint),
706
707            // If epoch ends, don't wait for submit delay
708            _ = epoch_store.user_certs_closed_notify() => {
709                warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
710                Some(processed_via_consensus_or_checkpoint)
711            }
712
713            // If transaction is received by consensus or checkpoint while we wait, we are done.
714            _ = &mut processed_via_consensus_or_checkpoint => {
715                None
716            }
717        };
718
719        // Log warnings for administrative transactions that fail to get sequenced
720        let _monitor = if !is_soft_bundle
721            && matches!(
722                transactions[0].kind,
723                ConsensusTransactionKind::EndOfPublish(_)
724                    | ConsensusTransactionKind::CapabilityNotificationV1(_)
725                    | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
726                    | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
727            ) {
728            let transaction_keys = transaction_keys.clone();
729            Some(CancelOnDrop(spawn_monitored_task!(async {
730                let mut i = 0u64;
731                loop {
732                    i += 1;
733                    const WARN_DELAY_S: u64 = 30;
734                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
735                    let total_wait = i * WARN_DELAY_S;
736                    warn!(
737                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
738                        total_wait, transaction_keys
739                    );
740                }
741            })))
742        } else {
743            None
744        };
745
746        if let Some(processed_waiter) = processed_waiter {
747            debug!("Submitting {:?} to consensus", transaction_keys);
748
749            // populate the position only when this authority submits the transaction
750            // to consensus
751            guard.position = Some(position);
752            guard.positions_moved = Some(positions_moved);
753            guard.preceding_disconnected = Some(preceding_disconnected);
754
755            let _permit: SemaphorePermit = self
756                .submit_semaphore
757                .acquire()
758                .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
759                .await
760                .expect("Consensus adapter does not close semaphore");
761            let _in_flight_submission_guard =
762                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
763
764            // We enter this branch when in select above await_submit completed and
765            // processed_waiter is pending This means it is time for us to
766            // submit transaction to consensus
767            let submit_inner = async {
768                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
769
770                loop {
771                    // Submit the transaction to consensus and return the submit result with a
772                    // status waiter
773                    let status_waiter = self
774                        .submit_inner(
775                            &transactions,
776                            epoch_store,
777                            &transaction_keys,
778                            tx_type,
779                            is_soft_bundle,
780                        )
781                        .await;
782
783                    match status_waiter.await {
784                        Ok(BlockStatusInternal::Sequenced) => {
785                            self.metrics
786                                .sequencing_certificate_status
787                                .with_label_values(&[tx_type, "sequenced"])
788                                .inc();
789                            // Block has been sequenced. Nothing more to do, we do have guarantees
790                            // that the transaction will appear in consensus output.
791                            trace!(
792                                "Transaction {transaction_keys:?} has been sequenced by consensus."
793                            );
794                            break;
795                        }
796                        Ok(BlockStatusInternal::GarbageCollected) => {
797                            self.metrics
798                                .sequencing_certificate_status
799                                .with_label_values(&[tx_type, "garbage_collected"])
800                                .inc();
801                            // Block has been garbage collected and we have no guarantees that the
802                            // transaction will appear in consensus output. We'll
803                            // resubmit the transaction to consensus. If the transaction has been
804                            // already "processed", then probably someone else has submitted
805                            // the transaction and managed to get sequenced. Then this future will
806                            // have been cancelled anyways so no need to check here on the processed
807                            // output.
808                            debug!(
809                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
810                            );
811                            time::sleep(RETRY_DELAY_STEP).await;
812                            continue;
813                        }
814                        Err(err) => {
815                            warn!(
816                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
817                                err
818                            );
819                            time::sleep(RETRY_DELAY_STEP).await;
820                            continue;
821                        }
822                    }
823                }
824            };
825
826            guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
827                Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
828                Either::Right(((), processed_waiter)) => {
829                    debug!("Submitted {transaction_keys:?} to consensus");
830                    processed_waiter.await
831                }
832            };
833        }
834        debug!("{transaction_keys:?} processed by consensus");
835
836        let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
837        epoch_store
838            .remove_pending_consensus_transactions(&consensus_keys)
839            .expect("Storage error when removing consensus transaction");
840
841        let is_user_tx = is_soft_bundle
842            || matches!(
843                transactions[0].kind,
844                ConsensusTransactionKind::CertifiedTransaction(_)
845            );
846        let send_end_of_publish = if is_user_tx {
847            // If we are in RejectUserCerts state and we just drained the list we need to
848            // send EndOfPublish to signal other validators that we are not submitting more
849            // certificates to the epoch. Note that there could be a race
850            // condition here where we enter this check in RejectAllCerts state.
851            // In that case we don't need to send EndOfPublish because condition to enter
852            // RejectAllCerts is when 2f+1 other validators already sequenced their
853            // EndOfPublish message. Also note that we could sent multiple
854            // EndOfPublish due to that multiple tasks can enter here with
855            // pending_count == 0. This doesn't affect correctness.
856            if epoch_store
857                .get_reconfig_state_read_lock_guard()
858                .is_reject_user_certs()
859            {
860                let pending_count = epoch_store.pending_consensus_certificates_count();
861                debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
862                pending_count == 0 // send end of epoch if empty
863            } else {
864                false
865            }
866        } else {
867            false
868        };
869        if send_end_of_publish {
870            // sending message outside of any locks scope
871            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
872            if let Err(err) = self.submit(
873                ConsensusTransaction::new_end_of_publish(self.authority),
874                None,
875                epoch_store,
876            ) {
877                warn!("Error when sending end of publish message: {:?}", err);
878            }
879        }
880        self.metrics
881            .sequencing_certificate_success
882            .with_label_values(&[tx_type])
883            .inc();
884    }
885
886    async fn submit_inner(
887        self: &Arc<Self>,
888        transactions: &[ConsensusTransaction],
889        epoch_store: &Arc<AuthorityPerEpochStore>,
890        transaction_keys: &[SequencedConsensusTransactionKey],
891        tx_type: &str,
892        is_soft_bundle: bool,
893    ) -> BlockStatusReceiver {
894        let ack_start = Instant::now();
895        let mut retries: u32 = 0;
896
897        let status_waiter = loop {
898            match self
899                .consensus_client
900                .submit(transactions, epoch_store)
901                .await
902            {
903                Err(err) => {
904                    // This can happen during reconfig, or when consensus has full internal buffers
905                    // and needs to back pressure, so retry a few times before logging warnings.
906                    if retries > 30
907                        || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
908                    {
909                        warn!(
910                            "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
911                        );
912                    }
913                    self.metrics
914                        .sequencing_certificate_failures
915                        .with_label_values(&[tx_type])
916                        .inc();
917                    retries += 1;
918
919                    if !is_soft_bundle && transactions[0].kind.is_dkg() {
920                        // Shorter delay for DKG messages, which are time-sensitive and happen at
921                        // start-of-epoch when submit errors due to active reconfig are likely.
922                        time::sleep(Duration::from_millis(100)).await;
923                    } else {
924                        time::sleep(Duration::from_secs(10)).await;
925                    };
926                }
927                Ok(status_waiter) => {
928                    break status_waiter;
929                }
930            }
931        };
932
933        // we want to record the num of retries when reporting latency but to avoid
934        // label cardinality we do some simple bucketing to give us a good
935        // enough idea of how many retries happened associated with the latency.
936        let bucket = match retries {
937            0..=10 => retries.to_string(), // just report the retry count as is
938            11..=20 => "between_10_and_20".to_string(),
939            21..=50 => "between_20_and_50".to_string(),
940            51..=100 => "between_50_and_100".to_string(),
941            _ => "over_100".to_string(),
942        };
943
944        self.metrics
945            .sequencing_acknowledge_latency
946            .with_label_values(&[bucket.as_str(), tx_type])
947            .observe(ack_start.elapsed().as_secs_f64());
948
949        status_waiter
950    }
951
952    /// Waits for transactions to appear either to consensus output or been
953    /// executed via a checkpoint (state sync).
954    /// Returns the processed method, whether the transactions have been
955    /// processed via consensus, or have been synced via checkpoint.
956    async fn await_consensus_or_checkpoint(
957        self: &Arc<Self>,
958        transaction_keys: Vec<SequencedConsensusTransactionKey>,
959        epoch_store: &Arc<AuthorityPerEpochStore>,
960    ) -> ProcessedMethod {
961        let notifications = FuturesUnordered::new();
962        for transaction_key in transaction_keys {
963            let transaction_digests = if let SequencedConsensusTransactionKey::External(
964                ConsensusTransactionKey::Certificate(digest),
965            ) = transaction_key
966            {
967                vec![digest]
968            } else {
969                vec![]
970            };
971
972            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
973                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
974            ) = transaction_key
975            {
976                // If the transaction is a checkpoint signature, we can also wait to get
977                // notified when a checkpoint with equal or higher sequence
978                // number has been already synced. This way we don't try to unnecessarily
979                // sequence the signature for an already verified checkpoint.
980                Either::Left(
981                    self.checkpoint_store
982                        .notify_read_synced_checkpoint(checkpoint_sequence_number),
983                )
984            } else {
985                Either::Right(future::pending())
986            };
987
988            // We wait for each transaction individually to be processed by consensus or
989            // executed in a checkpoint. We could equally just get notified in
990            // aggregate when all transactions are processed, but with this approach can get
991            // notified in a more fine-grained way as transactions can be marked
992            // as processed in different ways. This is mostly a concern for the soft-bundle
993            // transactions.
994            notifications.push(async move {
995                tokio::select! {
996                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
997                        processed.expect("Storage error when waiting for consensus message processed");
998                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
999                        return ProcessedMethod::Consensus;
1000                    },
1001                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1002                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
1003                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1004                    }
1005                    _ = checkpoint_synced_future => {
1006                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1007                    }
1008                }
1009                ProcessedMethod::Checkpoint
1010            });
1011        }
1012
1013        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1014        for method in processed_methods {
1015            if method == ProcessedMethod::Checkpoint {
1016                return ProcessedMethod::Checkpoint;
1017            }
1018        }
1019        ProcessedMethod::Consensus
1020    }
1021}
1022
1023impl CheckConnection for ConnectionMonitorStatus {
1024    fn check_connection(
1025        &self,
1026        ourself: &AuthorityName,
1027        authority: &AuthorityName,
1028    ) -> Option<ConnectionStatus> {
1029        if ourself == authority {
1030            return Some(ConnectionStatus::Connected);
1031        }
1032
1033        let mapping = self.authority_names_to_peer_ids.load_full();
1034        let peer_id = match mapping.get(authority) {
1035            Some(p) => p,
1036            None => {
1037                warn!(
1038                    "failed to find peer {:?} in connection monitor listener",
1039                    authority
1040                );
1041                return None;
1042            }
1043        };
1044
1045        let res = match self.connection_statuses.try_get(peer_id) {
1046            TryResult::Present(c) => Some(c.value().clone()),
1047            TryResult::Absent => None,
1048            TryResult::Locked => {
1049                // update is in progress, assume the status is still or becoming disconnected
1050                Some(ConnectionStatus::Disconnected)
1051            }
1052        };
1053        res
1054    }
1055    fn update_mapping_for_epoch(
1056        &self,
1057        authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1058    ) {
1059        self.authority_names_to_peer_ids
1060            .swap(Arc::new(authority_names_to_peer_ids));
1061    }
1062}
1063
1064impl CheckConnection for ConnectionMonitorStatusForTests {
1065    fn check_connection(
1066        &self,
1067        _ourself: &AuthorityName,
1068        _authority: &AuthorityName,
1069    ) -> Option<ConnectionStatus> {
1070        Some(ConnectionStatus::Connected)
1071    }
1072    fn update_mapping_for_epoch(
1073        &self,
1074        _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1075    ) {
1076    }
1077}
1078
1079pub fn get_position_in_list(
1080    search_authority: AuthorityName,
1081    positions: Vec<AuthorityName>,
1082) -> usize {
1083    positions
1084        .into_iter()
1085        .find_position(|authority| *authority == search_authority)
1086        .expect("Couldn't find ourselves in shuffled committee")
1087        .0
1088}
1089
1090impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1091    /// This method is called externally to begin reconfiguration
1092    /// It transition reconfig state to reject new certificates from user
1093    /// ConsensusAdapter will send EndOfPublish message once pending certificate
1094    /// queue is drained.
1095    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1096        let send_end_of_publish = {
1097            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1098            if !reconfig_guard.should_accept_user_certs() {
1099                // Allow caller to call this method multiple times
1100                return;
1101            }
1102            let pending_count = epoch_store.pending_consensus_certificates_count();
1103            debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1104            let send_end_of_publish = pending_count == 0;
1105            epoch_store.close_user_certs(reconfig_guard);
1106            send_end_of_publish
1107            // reconfig_guard lock is dropped here.
1108        };
1109        if send_end_of_publish {
1110            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1111            if let Err(err) = self.submit(
1112                ConsensusTransaction::new_end_of_publish(self.authority),
1113                None,
1114                epoch_store,
1115            ) {
1116                warn!("Error when sending end of publish message: {:?}", err);
1117            }
1118        }
1119    }
1120}
1121
1122struct CancelOnDrop<T>(JoinHandle<T>);
1123
1124impl<T> Deref for CancelOnDrop<T> {
1125    type Target = JoinHandle<T>;
1126
1127    fn deref(&self) -> &Self::Target {
1128        &self.0
1129    }
1130}
1131
1132impl<T> Drop for CancelOnDrop<T> {
1133    fn drop(&mut self) {
1134        self.0.abort();
1135    }
1136}
1137
1138/// Tracks number of inflight consensus requests and relevant metrics
1139struct InflightDropGuard<'a> {
1140    adapter: &'a ConsensusAdapter,
1141    start: Instant,
1142    position: Option<usize>,
1143    positions_moved: Option<usize>,
1144    preceding_disconnected: Option<usize>,
1145    tx_type: &'static str,
1146    processed_method: ProcessedMethod,
1147}
1148
1149#[derive(PartialEq, Eq)]
1150enum ProcessedMethod {
1151    Consensus,
1152    Checkpoint,
1153}
1154
1155impl<'a> InflightDropGuard<'a> {
1156    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1157        adapter
1158            .num_inflight_transactions
1159            .fetch_add(1, Ordering::SeqCst);
1160        adapter
1161            .metrics
1162            .sequencing_certificate_inflight
1163            .with_label_values(&[tx_type])
1164            .inc();
1165        adapter
1166            .metrics
1167            .sequencing_certificate_attempt
1168            .with_label_values(&[tx_type])
1169            .inc();
1170        Self {
1171            adapter,
1172            start: Instant::now(),
1173            position: None,
1174            positions_moved: None,
1175            preceding_disconnected: None,
1176            tx_type,
1177            processed_method: ProcessedMethod::Consensus,
1178        }
1179    }
1180}
1181
1182impl Drop for InflightDropGuard<'_> {
1183    fn drop(&mut self) {
1184        self.adapter
1185            .num_inflight_transactions
1186            .fetch_sub(1, Ordering::SeqCst);
1187        self.adapter
1188            .metrics
1189            .sequencing_certificate_inflight
1190            .with_label_values(&[self.tx_type])
1191            .dec();
1192
1193        let position = if let Some(position) = self.position {
1194            self.adapter
1195                .metrics
1196                .sequencing_certificate_authority_position
1197                .observe(position as f64);
1198            position.to_string()
1199        } else {
1200            "not_submitted".to_string()
1201        };
1202
1203        if let Some(positions_moved) = self.positions_moved {
1204            self.adapter
1205                .metrics
1206                .sequencing_certificate_positions_moved
1207                .observe(positions_moved as f64);
1208        };
1209
1210        if let Some(preceding_disconnected) = self.preceding_disconnected {
1211            self.adapter
1212                .metrics
1213                .sequencing_certificate_preceding_disconnected
1214                .observe(preceding_disconnected as f64);
1215        };
1216
1217        let latency = self.start.elapsed();
1218        let processed_method = match self.processed_method {
1219            ProcessedMethod::Consensus => "processed_via_consensus",
1220            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1221        };
1222        self.adapter
1223            .metrics
1224            .sequencing_certificate_latency
1225            .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1226            .observe(latency.as_secs_f64());
1227
1228        // Only sample latency after consensus quorum is up. Otherwise, the wait for
1229        // consensus quorum at the beginning of an epoch can distort the sampled
1230        // latencies. Technically there are more system transaction types that
1231        // can be included in samples after the first consensus commit, but this
1232        // set of types should be enough.
1233        if self.position == Some(0) {
1234            // Transaction types below require quorum existed in the current epoch.
1235            // TODO: refactor tx_type to enum.
1236            let sampled = matches!(
1237                self.tx_type,
1238                "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1239            );
1240            // if tx has been processed by checkpoint state sync, then exclude from the
1241            // latency calculations as this can introduce to misleading results.
1242            if sampled && self.processed_method == ProcessedMethod::Consensus {
1243                self.adapter.latency_observer.report(latency);
1244            }
1245        }
1246    }
1247}
1248
1249impl SubmitToConsensus for Arc<ConsensusAdapter> {
1250    fn submit_to_consensus(
1251        &self,
1252        transactions: &[ConsensusTransaction],
1253        epoch_store: &Arc<AuthorityPerEpochStore>,
1254    ) -> IotaResult {
1255        self.submit_batch(transactions, None, epoch_store)
1256            .map(|_| ())
1257    }
1258}
1259
1260pub fn position_submit_certificate(
1261    committee: &Committee,
1262    ourselves: &AuthorityName,
1263    tx_digest: &TransactionDigest,
1264) -> usize {
1265    let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1266    get_position_in_list(*ourselves, validators)
1267}
1268
1269#[cfg(test)]
1270mod adapter_tests {
1271    use std::{sync::Arc, time::Duration};
1272
1273    use fastcrypto::traits::KeyPair;
1274    use iota_types::{
1275        base_types::TransactionDigest,
1276        committee::Committee,
1277        crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1278    };
1279    use rand::{Rng, SeedableRng, rngs::StdRng};
1280
1281    use super::position_submit_certificate;
1282    use crate::{
1283        checkpoints::CheckpointStore,
1284        consensus_adapter::{
1285            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1286        },
1287        mysticeti_adapter::LazyMysticetiClient,
1288    };
1289
1290    fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1291        let authorities = (0..size)
1292            .map(|_k| {
1293                (
1294                    AuthorityPublicKeyBytes::from(
1295                        get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1296                    ),
1297                    rng.gen_range(0u64..10u64),
1298                )
1299            })
1300            .collect::<Vec<_>>();
1301        Committee::new_for_testing_with_normalized_voting_power(
1302            0,
1303            authorities.iter().cloned().collect(),
1304        )
1305    }
1306
1307    #[tokio::test]
1308    async fn test_await_submit_delay_user_transaction() {
1309        // grab a random committee and a random stake distribution
1310        let mut rng = StdRng::from_seed([0; 32]);
1311        let committee = test_committee(&mut rng, 10);
1312
1313        // When we define max submit position and delay step
1314        let consensus_adapter = ConsensusAdapter::new(
1315            Arc::new(LazyMysticetiClient::new()),
1316            CheckpointStore::new_for_tests(),
1317            *committee.authority_by_index(0).unwrap(),
1318            Arc::new(ConnectionMonitorStatusForTests {}),
1319            100_000,
1320            100_000,
1321            Some(1),
1322            Some(Duration::from_secs(2)),
1323            ConsensusAdapterMetrics::new_test(),
1324        );
1325
1326        // transaction to submit
1327        let tx_digest = TransactionDigest::generate(&mut rng);
1328
1329        // Ensure that the original position is higher
1330        let (position, positions_moved, _) =
1331            consensus_adapter.submission_position(&committee, &tx_digest);
1332        assert_eq!(position, 7);
1333        assert!(!positions_moved > 0);
1334
1335        // Make sure that position is set to max value 0
1336        let (delay_step, position, positions_moved, _) =
1337            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1338
1339        assert_eq!(position, 1);
1340        assert_eq!(delay_step, Duration::from_secs(2));
1341        assert!(!positions_moved > 0);
1342
1343        // Without submit position and delay step
1344        let consensus_adapter = ConsensusAdapter::new(
1345            Arc::new(LazyMysticetiClient::new()),
1346            CheckpointStore::new_for_tests(),
1347            *committee.authority_by_index(0).unwrap(),
1348            Arc::new(ConnectionMonitorStatusForTests {}),
1349            100_000,
1350            100_000,
1351            None,
1352            None,
1353            ConsensusAdapterMetrics::new_test(),
1354        );
1355
1356        let (delay_step, position, positions_moved, _) =
1357            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1358
1359        assert_eq!(position, 7);
1360
1361        // delay_step * position * 2 = 1 * 7 * 2 = 14
1362        assert_eq!(delay_step, Duration::from_secs(14));
1363        assert!(!positions_moved > 0);
1364    }
1365
1366    #[test]
1367    fn test_position_submit_certificate() {
1368        // grab a random committee and a random stake distribution
1369        let mut rng = StdRng::from_seed([0; 32]);
1370        let committee = test_committee(&mut rng, 10);
1371
1372        // generate random transaction digests, and account for validator selection
1373        const NUM_TEST_TRANSACTIONS: usize = 1000;
1374
1375        for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1376            let tx_digest = TransactionDigest::generate(&mut rng);
1377
1378            let mut zero_found = false;
1379            for (name, _) in committee.members() {
1380                let f = position_submit_certificate(&committee, name, &tx_digest);
1381                assert!(f < committee.num_members());
1382                if f == 0 {
1383                    // One and only one validator gets position 0
1384                    assert!(!zero_found);
1385                    zero_found = true;
1386                }
1387            }
1388            assert!(zero_found);
1389        }
1390    }
1391}