iota_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    future::Future,
8    ops::Deref,
9    sync::{
10        Arc,
11        atomic::{AtomicU64, Ordering},
12    },
13    time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use consensus_core::BlockStatus;
18use dashmap::{DashMap, try_result::TryResult};
19use futures::{
20    FutureExt, StreamExt,
21    future::{self, Either, select},
22    pin_mut,
23    stream::FuturesUnordered,
24};
25use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task};
26use iota_simulator::anemo::PeerId;
27use iota_types::{
28    base_types::{AuthorityName, TransactionDigest},
29    committee::Committee,
30    error::{IotaError, IotaResult},
31    fp_ensure,
32    messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
33};
34use itertools::Itertools;
35use parking_lot::RwLockReadGuard;
36use prometheus::{
37    Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
38    register_histogram_vec_with_registry, register_histogram_with_registry,
39    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
40    register_int_gauge_with_registry,
41};
42use tokio::{
43    sync::{Semaphore, SemaphorePermit, oneshot},
44    task::JoinHandle,
45    time::{
46        Duration, {self},
47    },
48};
49use tracing::{debug, info, trace, warn};
50
51use crate::{
52    authority::authority_per_epoch_store::AuthorityPerEpochStore,
53    connection_monitor::ConnectionStatus,
54    consensus_handler::{SequencedConsensusTransactionKey, classify},
55    epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
56    metrics::LatencyObserver,
57};
58
59#[cfg(test)]
60#[path = "unit_tests/consensus_tests.rs"]
61pub mod consensus_tests;
62
63const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
64    0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
65];
66
67pub struct ConsensusAdapterMetrics {
68    // Certificate sequencing metrics
69    pub sequencing_certificate_attempt: IntCounterVec,
70    pub sequencing_certificate_success: IntCounterVec,
71    pub sequencing_certificate_failures: IntCounterVec,
72    pub sequencing_certificate_status: IntCounterVec,
73    pub sequencing_certificate_inflight: IntGaugeVec,
74    pub sequencing_acknowledge_latency: HistogramVec,
75    pub sequencing_certificate_latency: HistogramVec,
76    pub sequencing_certificate_authority_position: Histogram,
77    pub sequencing_certificate_positions_moved: Histogram,
78    pub sequencing_certificate_preceding_disconnected: Histogram,
79    pub sequencing_certificate_processed: IntCounterVec,
80    pub sequencing_in_flight_semaphore_wait: IntGauge,
81    pub sequencing_in_flight_submissions: IntGauge,
82    pub sequencing_estimated_latency: IntGauge,
83    pub sequencing_resubmission_interval_ms: IntGauge,
84}
85
86impl ConsensusAdapterMetrics {
87    pub fn new(registry: &Registry) -> Self {
88        Self {
89            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
90                "sequencing_certificate_attempt",
91                "Counts the number of certificates the validator attempts to sequence.",
92                &["tx_type"],
93                registry,
94            )
95            .unwrap(),
96            sequencing_certificate_success: register_int_counter_vec_with_registry!(
97                "sequencing_certificate_success",
98                "Counts the number of successfully sequenced certificates.",
99                &["tx_type"],
100                registry,
101            )
102            .unwrap(),
103            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
104                "sequencing_certificate_failures",
105                "Counts the number of sequenced certificates that failed other than by timeout.",
106                &["tx_type"],
107                registry,
108            )
109            .unwrap(),
110            sequencing_certificate_status: register_int_counter_vec_with_registry!(
111                "sequencing_certificate_status",
112                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
113                &["tx_type", "status"],
114                registry,
115            )
116            .unwrap(),
117            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
118                "sequencing_certificate_inflight",
119                "The inflight requests to sequence certificates.",
120                &["tx_type"],
121                registry,
122            )
123            .unwrap(),
124            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
125                "sequencing_acknowledge_latency",
126                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
127                &["retry", "tx_type"],
128                LATENCY_SEC_BUCKETS.to_vec(),
129                registry,
130            )
131            .unwrap(),
132            sequencing_certificate_latency: register_histogram_vec_with_registry!(
133                "sequencing_certificate_latency",
134                "The latency for sequencing a certificate.",
135                &["position", "tx_type", "processed_method"],
136                LATENCY_SEC_BUCKETS.to_vec(),
137                registry,
138            )
139            .unwrap(),
140            sequencing_certificate_authority_position: register_histogram_with_registry!(
141                "sequencing_certificate_authority_position",
142                "The position of the authority when submitted a certificate to consensus.",
143                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
144                registry,
145            )
146            .unwrap(),
147            sequencing_certificate_positions_moved: register_histogram_with_registry!(
148                "sequencing_certificate_positions_moved",
149                "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
150                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
151                registry,
152            )
153            .unwrap(),
154            sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155                "sequencing_certificate_preceding_disconnected",
156                "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158                registry,
159            )
160            .unwrap(),
161            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162                "sequencing_certificate_processed",
163                "The number of certificates that have been processed either by consensus or checkpoint.",
164                &["source"],
165                registry
166            )
167            .unwrap(),
168            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169                "sequencing_in_flight_semaphore_wait",
170                "How many requests are blocked on submit_permit.",
171                registry,
172            )
173            .unwrap(),
174            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175                "sequencing_in_flight_submissions",
176                "Number of transactions submitted to local consensus instance and not yet sequenced",
177                registry,
178            )
179            .unwrap(),
180            sequencing_estimated_latency: register_int_gauge_with_registry!(
181                "sequencing_estimated_latency",
182                "Consensus latency estimated by consensus adapter in milliseconds",
183                registry,
184            )
185            .unwrap(),
186            sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187                "sequencing_resubmission_interval_ms",
188                "Resubmission interval used by consensus adapter in milliseconds",
189                registry,
190            )
191            .unwrap(),
192        }
193    }
194
195    pub fn new_test() -> Self {
196        Self::new(&Registry::default())
197    }
198}
199
200pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
201
202#[mockall::automock]
203pub trait SubmitToConsensus: Sync + Send + 'static {
204    fn submit_to_consensus(
205        &self,
206        transactions: &[ConsensusTransaction],
207        epoch_store: &Arc<AuthorityPerEpochStore>,
208    ) -> IotaResult;
209}
210
211#[mockall::automock]
212#[async_trait::async_trait]
213pub trait ConsensusClient: Sync + Send + 'static {
214    async fn submit(
215        &self,
216        transactions: &[ConsensusTransaction],
217        epoch_store: &Arc<AuthorityPerEpochStore>,
218    ) -> IotaResult<BlockStatusReceiver>;
219}
220
221/// Submit IOTA certificates to the consensus.
222pub struct ConsensusAdapter {
223    /// The network client connecting to the consensus node of this authority.
224    consensus_client: Arc<dyn ConsensusClient>,
225    /// Authority pubkey.
226    authority: AuthorityName,
227    /// The limit to number of inflight transactions at this node.
228    max_pending_transactions: usize,
229    /// Number of submitted transactions still inflight at this node.
230    num_inflight_transactions: AtomicU64,
231    /// Dictates the maximum position  from which will submit to consensus. Even
232    /// if the is elected to submit from a higher position than this, it
233    /// will "reset" to the max_submit_position.
234    max_submit_position: Option<usize>,
235    /// When provided it will override the current back off logic and will use
236    /// this value instead as delay step.
237    submit_delay_step_override: Option<Duration>,
238    /// A structure to check the connection statuses populated by the Connection
239    /// Monitor Listener
240    connection_monitor_status: Arc<dyn CheckConnection>,
241    /// A structure to check the reputation scores populated by Consensus
242    low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
243    /// A structure to register metrics
244    metrics: ConsensusAdapterMetrics,
245    /// Semaphore limiting parallel submissions to consensus
246    submit_semaphore: Semaphore,
247    latency_observer: LatencyObserver,
248}
249
250pub trait CheckConnection: Send + Sync {
251    fn check_connection(
252        &self,
253        ourself: &AuthorityName,
254        authority: &AuthorityName,
255    ) -> Option<ConnectionStatus>;
256    fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
257}
258
259pub struct ConnectionMonitorStatus {
260    /// Current connection statuses forwarded from the connection monitor
261    pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
262    /// A map from authority name to peer id
263    pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
264}
265
266pub struct ConnectionMonitorStatusForTests {}
267
268impl ConsensusAdapter {
269    /// Make a new Consensus adapter instance.
270    pub fn new(
271        consensus_client: Arc<dyn ConsensusClient>,
272        authority: AuthorityName,
273        connection_monitor_status: Arc<dyn CheckConnection>,
274        max_pending_transactions: usize,
275        max_pending_local_submissions: usize,
276        max_submit_position: Option<usize>,
277        submit_delay_step_override: Option<Duration>,
278        metrics: ConsensusAdapterMetrics,
279    ) -> Self {
280        let num_inflight_transactions = Default::default();
281        let low_scoring_authorities =
282            ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
283        Self {
284            consensus_client,
285            authority,
286            max_pending_transactions,
287            max_submit_position,
288            submit_delay_step_override,
289            num_inflight_transactions,
290            connection_monitor_status,
291            low_scoring_authorities,
292            metrics,
293            submit_semaphore: Semaphore::new(max_pending_local_submissions),
294            latency_observer: LatencyObserver::new(),
295        }
296    }
297
298    pub fn swap_low_scoring_authorities(
299        &self,
300        new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
301    ) {
302        self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
303    }
304
305    // TODO - this probably need to hold some kind of lock to make sure epoch does
306    // not change while we are recovering
307    pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
308        // Currently consensus worker might lose transactions on restart, so we need to
309        // resend them.
310        // TODO: get_all_pending_consensus_transactions is called
311        // twice when initializing AuthorityPerEpochStore and here, should not
312        // be a big deal but can be optimized
313        let mut recovered = epoch_store.get_all_pending_consensus_transactions();
314
315        #[expect(clippy::collapsible_if)] // This if can be collapsed but it will be ugly
316        if epoch_store
317            .get_reconfig_state_read_lock_guard()
318            .is_reject_user_certs()
319            && epoch_store.pending_consensus_certificates_empty()
320        {
321            if recovered
322                .iter()
323                .any(ConsensusTransaction::is_end_of_publish)
324            {
325                // There are two cases when this is needed
326                // (1) We send EndOfPublish message after removing pending certificates in
327                // submit_and_wait_inner It is possible that node will crash
328                // between those two steps, in which case we might need to
329                // re-introduce EndOfPublish message on restart
330                // (2) If node crashed inside ConsensusAdapter::close_epoch,
331                // after reconfig lock state was written to DB and before we persisted
332                // EndOfPublish message
333                recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
334            }
335        }
336        debug!(
337            "Submitting {:?} recovered pending consensus transactions to consensus",
338            recovered.len()
339        );
340        for transaction in recovered {
341            if transaction.is_end_of_publish() {
342                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
343            }
344            self.submit_unchecked(&[transaction], epoch_store);
345        }
346    }
347
348    fn await_submit_delay(
349        &self,
350        committee: &Committee,
351        transactions: &[ConsensusTransaction],
352    ) -> (impl Future<Output = ()>, usize, usize, usize) {
353        // Use the minimum digest to compute submit delay.
354        let min_digest = transactions
355            .iter()
356            .filter_map(|tx| match &tx.kind {
357                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
358                    Some(certificate.digest())
359                }
360                _ => None,
361            })
362            .min();
363
364        let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
365            Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
366            _ => (Duration::ZERO, 0, 0, 0),
367        };
368        (
369            tokio::time::sleep(duration),
370            position,
371            positions_moved,
372            preceding_disconnected,
373        )
374    }
375
376    fn await_submit_delay_user_transaction(
377        &self,
378        committee: &Committee,
379        tx_digest: &TransactionDigest,
380    ) -> (Duration, usize, usize, usize) {
381        let (position, positions_moved, preceding_disconnected) =
382            self.submission_position(committee, tx_digest);
383
384        const DEFAULT_LATENCY: Duration = Duration::from_secs(1); // > p50 consensus latency with global deployment
385        const MIN_LATENCY: Duration = Duration::from_millis(150);
386        const MAX_LATENCY: Duration = Duration::from_secs(3);
387
388        let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
389        self.metrics
390            .sequencing_estimated_latency
391            .set(latency.as_millis() as i64);
392
393        let latency = std::cmp::max(latency, MIN_LATENCY);
394        let latency = std::cmp::min(latency, MAX_LATENCY);
395        let latency = latency * 2;
396        let (delay_step, position) =
397            self.override_by_max_submit_position_settings(latency, position);
398
399        self.metrics
400            .sequencing_resubmission_interval_ms
401            .set(delay_step.as_millis() as i64);
402
403        (
404            delay_step * position as u32,
405            position,
406            positions_moved,
407            preceding_disconnected,
408        )
409    }
410
411    /// Overrides the latency and the position if there are defined settings for
412    /// `max_submit_position` and `submit_delay_step_override`. If the
413    /// `max_submit_position` has defined, then that will always be used
414    /// irrespective of any so far decision. Same for the
415    /// `submit_delay_step_override`.
416    fn override_by_max_submit_position_settings(
417        &self,
418        latency: Duration,
419        mut position: usize,
420    ) -> (Duration, usize) {
421        // Respect any manual override for position and latency from the settings
422        if let Some(max_submit_position) = self.max_submit_position {
423            position = std::cmp::min(position, max_submit_position);
424        }
425
426        let delay_step = self.submit_delay_step_override.unwrap_or(latency);
427        (delay_step, position)
428    }
429
430    /// Check when this authority should submit the certificate to consensus.
431    /// This sorts all authorities based on pseudo-random distribution derived
432    /// from transaction hash.
433    ///
434    /// The function targets having 1 consensus transaction submitted per user
435    /// transaction when system operates normally.
436    ///
437    /// The function returns the position of this authority when it is their
438    /// turn to submit the transaction to consensus.
439    fn submission_position(
440        &self,
441        committee: &Committee,
442        tx_digest: &TransactionDigest,
443    ) -> (usize, usize, usize) {
444        let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
445
446        self.check_submission_wrt_connectivity_and_scores(positions)
447    }
448
449    /// This function runs the following algorithm to decide whether or not to
450    /// submit a transaction to consensus.
451    ///
452    /// It takes in a deterministic list that represents positions of all the
453    /// authorities. The authority in the first position will be responsible
454    /// for submitting to consensus, and so we check if we are this
455    /// validator, and if so, return true.
456    ///
457    /// If we are not in that position, we check our connectivity to the
458    /// authority in that position. If we are connected to them, we can
459    /// assume that they are operational and will submit the transaction. If
460    /// we are not connected to them, we assume that they are not operational
461    /// and we will not rely on that authority to submit the transaction. So
462    /// we shift them out of the first position, and run this algorithm
463    /// again on the new set of positions.
464    ///
465    /// This can possibly result in a transaction being submitted twice if an
466    /// authority sees a false negative in connectivity to another, such as
467    /// in the case of a network partition.
468    ///
469    /// Recursively, if the authority further ahead of us in the positions is a
470    /// low performing authority, we will move our positions up one, and
471    /// submit the transaction. This allows maintaining performance overall.
472    /// We will only do this part for authorities that are not low performers
473    /// themselves to prevent extra amplification in the case that the
474    /// positions look like [low_scoring_a1, low_scoring_a2, a3]
475    fn check_submission_wrt_connectivity_and_scores(
476        &self,
477        positions: Vec<AuthorityName>,
478    ) -> (usize, usize, usize) {
479        let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
480        if low_scoring_authorities.get(&self.authority).is_some() {
481            return (positions.len(), 0, 0);
482        }
483        let initial_position = get_position_in_list(self.authority, positions.clone());
484        let mut preceding_disconnected = 0;
485        let mut before_our_position = true;
486
487        let filtered_positions: Vec<_> = positions
488            .into_iter()
489            .filter(|authority| {
490                let keep = self.authority == *authority; // don't filter ourself out
491                if keep {
492                    before_our_position = false;
493                }
494
495                // filter out any nodes that appear disconnected
496                let connected = self
497                    .connection_monitor_status
498                    .check_connection(&self.authority, authority)
499                    .unwrap_or(ConnectionStatus::Disconnected)
500                    == ConnectionStatus::Connected;
501                if !connected && before_our_position {
502                    preceding_disconnected += 1; // used for metrics
503                }
504
505                // Filter out low scoring nodes
506                let high_scoring = low_scoring_authorities.get(authority).is_none();
507
508                keep || (connected && high_scoring)
509            })
510            .collect();
511
512        let position = get_position_in_list(self.authority, filtered_positions);
513
514        (
515            position,
516            initial_position - position,
517            preceding_disconnected,
518        )
519    }
520
521    /// This method blocks until transaction is persisted in local database
522    /// It then returns handle to async task, user can join this handle to await
523    /// while transaction is processed by consensus
524    ///
525    /// This method guarantees that once submit(but not returned async handle)
526    /// returns, transaction is persisted and will eventually be sent to
527    /// consensus even after restart
528    ///
529    /// When submitting a certificate caller **must** provide a ReconfigState
530    /// lock guard
531    pub fn submit(
532        self: &Arc<Self>,
533        transaction: ConsensusTransaction,
534        lock: Option<&RwLockReadGuard<ReconfigState>>,
535        epoch_store: &Arc<AuthorityPerEpochStore>,
536    ) -> IotaResult<JoinHandle<()>> {
537        self.submit_batch(&[transaction], lock, epoch_store)
538    }
539
540    pub fn submit_batch(
541        self: &Arc<Self>,
542        transactions: &[ConsensusTransaction],
543        lock: Option<&RwLockReadGuard<ReconfigState>>,
544        epoch_store: &Arc<AuthorityPerEpochStore>,
545    ) -> IotaResult<JoinHandle<()>> {
546        if transactions.len() > 1 {
547            // In soft bundle, we need to check if all transactions are of UserTransaction
548            // kind. The check is required because we assume this in
549            // submit_and_wait_inner.
550            for transaction in transactions {
551                fp_ensure!(
552                    matches!(
553                        transaction.kind,
554                        ConsensusTransactionKind::CertifiedTransaction(_)
555                    ),
556                    IotaError::InvalidTxKindInSoftBundle
557                );
558            }
559        }
560
561        epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
562        Ok(self.submit_unchecked(transactions, epoch_store))
563    }
564
565    /// Performs weakly consistent checks on internal buffers to quickly
566    /// discard transactions if we are overloaded
567    pub fn check_limits(&self) -> bool {
568        // First check total transactions (waiting and in submission)
569        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
570            > self.max_pending_transactions
571        {
572            return false;
573        }
574        // Then check if submit_semaphore has permits
575        self.submit_semaphore.available_permits() > 0
576    }
577
578    pub(crate) fn check_consensus_overload(&self) -> IotaResult {
579        fp_ensure!(
580            self.check_limits(),
581            IotaError::TooManyTransactionsPendingConsensus
582        );
583        Ok(())
584    }
585
586    fn submit_unchecked(
587        self: &Arc<Self>,
588        transactions: &[ConsensusTransaction],
589        epoch_store: &Arc<AuthorityPerEpochStore>,
590    ) -> JoinHandle<()> {
591        // Reconfiguration lock is dropped when pending_consensus_transactions is
592        // persisted, before it is handled by consensus
593        let async_stage = self
594            .clone()
595            .submit_and_wait(transactions.to_vec(), epoch_store.clone());
596        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
597        // (Limit is not applied atomically, and only to user transactions.)
598        let join_handle = spawn_monitored_task!(async_stage);
599        join_handle
600    }
601
602    async fn submit_and_wait(
603        self: Arc<Self>,
604        transactions: Vec<ConsensusTransaction>,
605        epoch_store: Arc<AuthorityPerEpochStore>,
606    ) {
607        // When epoch_terminated signal is received all pending submit_and_wait_inner
608        // are dropped.
609        //
610        // This is needed because submit_and_wait_inner waits on read_notify for
611        // consensus message to be processed, which may never happen on epoch
612        // boundary.
613        //
614        // In addition to that, within_alive_epoch ensures that all pending consensus
615        // adapter tasks are stopped before reconfiguration can proceed.
616        //
617        // This is essential because consensus workers reuse same ports when consensus
618        // restarts, this means we might be sending transactions from previous
619        // epochs to consensus of new epoch if we have not had this barrier.
620        epoch_store
621            .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
622            .await
623            .ok(); // result here indicates if epoch ended earlier, we don't
624        // care about it
625    }
626
627    async fn submit_and_wait_inner(
628        self: Arc<Self>,
629        transactions: Vec<ConsensusTransaction>,
630        epoch_store: &Arc<AuthorityPerEpochStore>,
631    ) {
632        if transactions.is_empty() {
633            return;
634        }
635
636        // Current code path ensures:
637        // - If transactions.len() > 1, it is a soft bundle. Otherwise transactions
638        //   should have been submitted individually.
639        // - If is_soft_bundle, then all transactions are of UserTransaction kind.
640        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and
641        //   transactions[0] can be of any kind.
642        let is_soft_bundle = transactions.len() > 1;
643
644        let mut transaction_keys = Vec::new();
645
646        for transaction in &transactions {
647            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
648                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
649                epoch_store.record_epoch_pending_certs_process_time_metric();
650            }
651
652            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
653            transaction_keys.push(transaction_key);
654        }
655        let tx_type = if !is_soft_bundle {
656            classify(&transactions[0])
657        } else {
658            "soft_bundle"
659        };
660
661        let mut guard = InflightDropGuard::acquire(&self, tx_type);
662
663        // Create the waiter until the node's turn comes to submit to consensus
664        let (await_submit, position, positions_moved, preceding_disconnected) =
665            self.await_submit_delay(epoch_store.committee(), &transactions[..]);
666
667        // Create the waiter until the transaction is processed by consensus or via
668        // checkpoint
669        let processed_via_consensus_or_checkpoint =
670            self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
671        pin_mut!(processed_via_consensus_or_checkpoint);
672
673        let processed_waiter = tokio::select! {
674            // We need to wait for some delay until we submit transaction to the consensus
675            _ = await_submit => Some(processed_via_consensus_or_checkpoint),
676
677            // If epoch ends, don't wait for submit delay
678            _ = epoch_store.user_certs_closed_notify() => {
679                warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
680                Some(processed_via_consensus_or_checkpoint)
681            }
682
683            // If transaction is received by consensus or checkpoint while we wait, we are done.
684            _ = &mut processed_via_consensus_or_checkpoint => {
685                None
686            }
687        };
688
689        // Log warnings for administrative transactions that fail to get sequenced
690        let _monitor = if !is_soft_bundle
691            && matches!(
692                transactions[0].kind,
693                ConsensusTransactionKind::EndOfPublish(_)
694                    | ConsensusTransactionKind::CapabilityNotificationV1(_)
695                    | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
696                    | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
697            ) {
698            let transaction_keys = transaction_keys.clone();
699            Some(CancelOnDrop(spawn_monitored_task!(async {
700                let mut i = 0u64;
701                loop {
702                    i += 1;
703                    const WARN_DELAY_S: u64 = 30;
704                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
705                    let total_wait = i * WARN_DELAY_S;
706                    warn!(
707                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
708                        total_wait, transaction_keys
709                    );
710                }
711            })))
712        } else {
713            None
714        };
715
716        if let Some(processed_waiter) = processed_waiter {
717            debug!("Submitting {:?} to consensus", transaction_keys);
718
719            // populate the position only when this authority submits the transaction
720            // to consensus
721            guard.position = Some(position);
722            guard.positions_moved = Some(positions_moved);
723            guard.preceding_disconnected = Some(preceding_disconnected);
724
725            let _permit: SemaphorePermit = self
726                .submit_semaphore
727                .acquire()
728                .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
729                .await
730                .expect("Consensus adapter does not close semaphore");
731            let _in_flight_submission_guard =
732                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
733
734            // We enter this branch when in select above await_submit completed and
735            // processed_waiter is pending This means it is time for us to
736            // submit transaction to consensus
737            let submit_inner = async {
738                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
739
740                loop {
741                    // Submit the transaction to consensus and return the submit result with a
742                    // status waiter
743                    let status_waiter = self
744                        .submit_inner(
745                            &transactions,
746                            epoch_store,
747                            &transaction_keys,
748                            tx_type,
749                            is_soft_bundle,
750                        )
751                        .await;
752
753                    match status_waiter.await {
754                        Ok(BlockStatus::Sequenced(_)) => {
755                            self.metrics
756                                .sequencing_certificate_status
757                                .with_label_values(&[tx_type, "sequenced"])
758                                .inc();
759                            // Block has been sequenced. Nothing more to do, we do have guarantees
760                            // that the transaction will appear in consensus output.
761                            trace!(
762                                "Transaction {transaction_keys:?} has been sequenced by consensus."
763                            );
764                            break;
765                        }
766                        Ok(BlockStatus::GarbageCollected(_)) => {
767                            self.metrics
768                                .sequencing_certificate_status
769                                .with_label_values(&[tx_type, "garbage_collected"])
770                                .inc();
771                            // Block has been garbage collected and we have no guarantees that the
772                            // transaction will appear in consensus output. We'll
773                            // resubmit the transaction to consensus. If the transaction has been
774                            // already "processed", then probably someone else has submitted
775                            // the transaction and managed to get sequenced. Then this future will
776                            // have been cancelled anyways so no need to check here on the processed
777                            // output.
778                            debug!(
779                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
780                            );
781                            time::sleep(RETRY_DELAY_STEP).await;
782                            continue;
783                        }
784                        Err(err) => {
785                            warn!(
786                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
787                                err
788                            );
789                            time::sleep(RETRY_DELAY_STEP).await;
790                            continue;
791                        }
792                    }
793                }
794            };
795
796            guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
797                Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
798                Either::Right(((), processed_waiter)) => {
799                    debug!("Submitted {transaction_keys:?} to consensus");
800                    processed_waiter.await
801                }
802            };
803        }
804        debug!("{transaction_keys:?} processed by consensus");
805
806        let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
807        epoch_store
808            .remove_pending_consensus_transactions(&consensus_keys)
809            .expect("Storage error when removing consensus transaction");
810
811        let is_user_tx = is_soft_bundle
812            || matches!(
813                transactions[0].kind,
814                ConsensusTransactionKind::CertifiedTransaction(_)
815            );
816        let send_end_of_publish = if is_user_tx {
817            // If we are in RejectUserCerts state and we just drained the list we need to
818            // send EndOfPublish to signal other validators that we are not submitting more
819            // certificates to the epoch. Note that there could be a race
820            // condition here where we enter this check in RejectAllCerts state.
821            // In that case we don't need to send EndOfPublish because condition to enter
822            // RejectAllCerts is when 2f+1 other validators already sequenced their
823            // EndOfPublish message. Also note that we could sent multiple
824            // EndOfPublish due to that multiple tasks can enter here with
825            // pending_count == 0. This doesn't affect correctness.
826            if epoch_store
827                .get_reconfig_state_read_lock_guard()
828                .is_reject_user_certs()
829            {
830                let pending_count = epoch_store.pending_consensus_certificates_count();
831                debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
832                pending_count == 0 // send end of epoch if empty
833            } else {
834                false
835            }
836        } else {
837            false
838        };
839        if send_end_of_publish {
840            // sending message outside of any locks scope
841            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
842            if let Err(err) = self.submit(
843                ConsensusTransaction::new_end_of_publish(self.authority),
844                None,
845                epoch_store,
846            ) {
847                warn!("Error when sending end of publish message: {:?}", err);
848            }
849        }
850        self.metrics
851            .sequencing_certificate_success
852            .with_label_values(&[tx_type])
853            .inc();
854    }
855
856    async fn submit_inner(
857        self: &Arc<Self>,
858        transactions: &[ConsensusTransaction],
859        epoch_store: &Arc<AuthorityPerEpochStore>,
860        transaction_keys: &[SequencedConsensusTransactionKey],
861        tx_type: &str,
862        is_soft_bundle: bool,
863    ) -> BlockStatusReceiver {
864        let ack_start = Instant::now();
865        let mut retries: u32 = 0;
866
867        let status_waiter = loop {
868            match self
869                .consensus_client
870                .submit(transactions, epoch_store)
871                .await
872            {
873                Err(err) => {
874                    // This can happen during reconfig, or when consensus has full internal buffers
875                    // and needs to back pressure, so retry a few times before logging warnings.
876                    if retries > 30
877                        || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
878                    {
879                        warn!(
880                            "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
881                        );
882                    }
883                    self.metrics
884                        .sequencing_certificate_failures
885                        .with_label_values(&[tx_type])
886                        .inc();
887                    retries += 1;
888
889                    if !is_soft_bundle && transactions[0].kind.is_dkg() {
890                        // Shorter delay for DKG messages, which are time-sensitive and happen at
891                        // start-of-epoch when submit errors due to active reconfig are likely.
892                        time::sleep(Duration::from_millis(100)).await;
893                    } else {
894                        time::sleep(Duration::from_secs(10)).await;
895                    };
896                }
897                Ok(status_waiter) => {
898                    break status_waiter;
899                }
900            }
901        };
902
903        // we want to record the num of retries when reporting latency but to avoid
904        // label cardinality we do some simple bucketing to give us a good
905        // enough idea of how many retries happened associated with the latency.
906        let bucket = match retries {
907            0..=10 => retries.to_string(), // just report the retry count as is
908            11..=20 => "between_10_and_20".to_string(),
909            21..=50 => "between_20_and_50".to_string(),
910            51..=100 => "between_50_and_100".to_string(),
911            _ => "over_100".to_string(),
912        };
913
914        self.metrics
915            .sequencing_acknowledge_latency
916            .with_label_values(&[bucket.as_str(), tx_type])
917            .observe(ack_start.elapsed().as_secs_f64());
918
919        status_waiter
920    }
921
922    /// Waits for transactions to appear either to consensus output or been
923    /// executed via a checkpoint (state sync).
924    /// Returns the processed method, whether the transactions have been
925    /// processed via consensus, or have been synced via checkpoint.
926    async fn await_consensus_or_checkpoint(
927        self: &Arc<Self>,
928        transaction_keys: Vec<SequencedConsensusTransactionKey>,
929        epoch_store: &Arc<AuthorityPerEpochStore>,
930    ) -> ProcessedMethod {
931        let notifications = FuturesUnordered::new();
932        for transaction_key in transaction_keys {
933            let transaction_digests = if let SequencedConsensusTransactionKey::External(
934                ConsensusTransactionKey::Certificate(digest),
935            ) = transaction_key
936            {
937                vec![digest]
938            } else {
939                vec![]
940            };
941
942            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
943                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
944            ) = transaction_key
945            {
946                // If the transaction is a checkpoint signature, we can also wait to get
947                // notified when a checkpoint with equal or higher sequence
948                // number has been already synced. This way we don't try to unnecessarily
949                // sequence the signature for an already verified checkpoint.
950                Either::Left(epoch_store.synced_checkpoint_notify(checkpoint_sequence_number))
951            } else {
952                Either::Right(future::pending())
953            };
954
955            // We wait for each transaction individually to be processed by consensus or
956            // executed in a checkpoint. We could equally just get notified in
957            // aggregate when all transactions are processed, but with this approach can get
958            // notified in a more fine-grained way as transactions can be marked
959            // as processed in different ways. This is mostly a concern for the soft-bundle
960            // transactions.
961            notifications.push(async move {
962                tokio::select! {
963                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
964                        processed.expect("Storage error when waiting for consensus message processed");
965                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
966                        return ProcessedMethod::Consensus;
967                    },
968                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
969                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
970                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
971                    }
972                    processed = checkpoint_synced_future => {
973                        processed.expect("Error when waiting for checkpoint sequence number");
974                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
975                    }
976                }
977                ProcessedMethod::Checkpoint
978            });
979        }
980
981        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
982        for method in processed_methods {
983            if method == ProcessedMethod::Checkpoint {
984                return ProcessedMethod::Checkpoint;
985            }
986        }
987        ProcessedMethod::Consensus
988    }
989}
990
991impl CheckConnection for ConnectionMonitorStatus {
992    fn check_connection(
993        &self,
994        ourself: &AuthorityName,
995        authority: &AuthorityName,
996    ) -> Option<ConnectionStatus> {
997        if ourself == authority {
998            return Some(ConnectionStatus::Connected);
999        }
1000
1001        let mapping = self.authority_names_to_peer_ids.load_full();
1002        let peer_id = match mapping.get(authority) {
1003            Some(p) => p,
1004            None => {
1005                warn!(
1006                    "failed to find peer {:?} in connection monitor listener",
1007                    authority
1008                );
1009                return None;
1010            }
1011        };
1012
1013        let res = match self.connection_statuses.try_get(peer_id) {
1014            TryResult::Present(c) => Some(c.value().clone()),
1015            TryResult::Absent => None,
1016            TryResult::Locked => {
1017                // update is in progress, assume the status is still or becoming disconnected
1018                Some(ConnectionStatus::Disconnected)
1019            }
1020        };
1021        res
1022    }
1023    fn update_mapping_for_epoch(
1024        &self,
1025        authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1026    ) {
1027        self.authority_names_to_peer_ids
1028            .swap(Arc::new(authority_names_to_peer_ids));
1029    }
1030}
1031
1032impl CheckConnection for ConnectionMonitorStatusForTests {
1033    fn check_connection(
1034        &self,
1035        _ourself: &AuthorityName,
1036        _authority: &AuthorityName,
1037    ) -> Option<ConnectionStatus> {
1038        Some(ConnectionStatus::Connected)
1039    }
1040    fn update_mapping_for_epoch(
1041        &self,
1042        _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1043    ) {
1044    }
1045}
1046
1047pub fn get_position_in_list(
1048    search_authority: AuthorityName,
1049    positions: Vec<AuthorityName>,
1050) -> usize {
1051    positions
1052        .into_iter()
1053        .find_position(|authority| *authority == search_authority)
1054        .expect("Couldn't find ourselves in shuffled committee")
1055        .0
1056}
1057
1058impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1059    /// This method is called externally to begin reconfiguration
1060    /// It transition reconfig state to reject new certificates from user
1061    /// ConsensusAdapter will send EndOfPublish message once pending certificate
1062    /// queue is drained.
1063    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1064        let send_end_of_publish = {
1065            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1066            if !reconfig_guard.should_accept_user_certs() {
1067                // Allow caller to call this method multiple times
1068                return;
1069            }
1070            let pending_count = epoch_store.pending_consensus_certificates_count();
1071            debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1072            let send_end_of_publish = pending_count == 0;
1073            epoch_store.close_user_certs(reconfig_guard);
1074            send_end_of_publish
1075            // reconfig_guard lock is dropped here.
1076        };
1077        if send_end_of_publish {
1078            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1079            if let Err(err) = self.submit(
1080                ConsensusTransaction::new_end_of_publish(self.authority),
1081                None,
1082                epoch_store,
1083            ) {
1084                warn!("Error when sending end of publish message: {:?}", err);
1085            }
1086        }
1087    }
1088}
1089
1090struct CancelOnDrop<T>(JoinHandle<T>);
1091
1092impl<T> Deref for CancelOnDrop<T> {
1093    type Target = JoinHandle<T>;
1094
1095    fn deref(&self) -> &Self::Target {
1096        &self.0
1097    }
1098}
1099
1100impl<T> Drop for CancelOnDrop<T> {
1101    fn drop(&mut self) {
1102        self.0.abort();
1103    }
1104}
1105
1106/// Tracks number of inflight consensus requests and relevant metrics
1107struct InflightDropGuard<'a> {
1108    adapter: &'a ConsensusAdapter,
1109    start: Instant,
1110    position: Option<usize>,
1111    positions_moved: Option<usize>,
1112    preceding_disconnected: Option<usize>,
1113    tx_type: &'static str,
1114    processed_method: ProcessedMethod,
1115}
1116
1117#[derive(PartialEq, Eq)]
1118enum ProcessedMethod {
1119    Consensus,
1120    Checkpoint,
1121}
1122
1123impl<'a> InflightDropGuard<'a> {
1124    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1125        adapter
1126            .num_inflight_transactions
1127            .fetch_add(1, Ordering::SeqCst);
1128        adapter
1129            .metrics
1130            .sequencing_certificate_inflight
1131            .with_label_values(&[tx_type])
1132            .inc();
1133        adapter
1134            .metrics
1135            .sequencing_certificate_attempt
1136            .with_label_values(&[tx_type])
1137            .inc();
1138        Self {
1139            adapter,
1140            start: Instant::now(),
1141            position: None,
1142            positions_moved: None,
1143            preceding_disconnected: None,
1144            tx_type,
1145            processed_method: ProcessedMethod::Consensus,
1146        }
1147    }
1148}
1149
1150impl Drop for InflightDropGuard<'_> {
1151    fn drop(&mut self) {
1152        self.adapter
1153            .num_inflight_transactions
1154            .fetch_sub(1, Ordering::SeqCst);
1155        self.adapter
1156            .metrics
1157            .sequencing_certificate_inflight
1158            .with_label_values(&[self.tx_type])
1159            .dec();
1160
1161        let position = if let Some(position) = self.position {
1162            self.adapter
1163                .metrics
1164                .sequencing_certificate_authority_position
1165                .observe(position as f64);
1166            position.to_string()
1167        } else {
1168            "not_submitted".to_string()
1169        };
1170
1171        if let Some(positions_moved) = self.positions_moved {
1172            self.adapter
1173                .metrics
1174                .sequencing_certificate_positions_moved
1175                .observe(positions_moved as f64);
1176        };
1177
1178        if let Some(preceding_disconnected) = self.preceding_disconnected {
1179            self.adapter
1180                .metrics
1181                .sequencing_certificate_preceding_disconnected
1182                .observe(preceding_disconnected as f64);
1183        };
1184
1185        let latency = self.start.elapsed();
1186        let processed_method = match self.processed_method {
1187            ProcessedMethod::Consensus => "processed_via_consensus",
1188            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1189        };
1190        self.adapter
1191            .metrics
1192            .sequencing_certificate_latency
1193            .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1194            .observe(latency.as_secs_f64());
1195
1196        // Only sample latency after consensus quorum is up. Otherwise, the wait for
1197        // consensus quorum at the beginning of an epoch can distort the sampled
1198        // latencies. Technically there are more system transaction types that
1199        // can be included in samples after the first consensus commit, but this
1200        // set of types should be enough.
1201        if self.position == Some(0) {
1202            // Transaction types below require quorum existed in the current epoch.
1203            // TODO: refactor tx_type to enum.
1204            let sampled = matches!(
1205                self.tx_type,
1206                "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1207            );
1208            // if tx has been processed by checkpoint state sync, then exclude from the
1209            // latency calculations as this can introduce to misleading results.
1210            if sampled && self.processed_method == ProcessedMethod::Consensus {
1211                self.adapter.latency_observer.report(latency);
1212            }
1213        }
1214    }
1215}
1216
1217impl SubmitToConsensus for Arc<ConsensusAdapter> {
1218    fn submit_to_consensus(
1219        &self,
1220        transactions: &[ConsensusTransaction],
1221        epoch_store: &Arc<AuthorityPerEpochStore>,
1222    ) -> IotaResult {
1223        self.submit_batch(transactions, None, epoch_store)
1224            .map(|_| ())
1225    }
1226}
1227
1228pub fn position_submit_certificate(
1229    committee: &Committee,
1230    ourselves: &AuthorityName,
1231    tx_digest: &TransactionDigest,
1232) -> usize {
1233    let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1234    get_position_in_list(*ourselves, validators)
1235}
1236
1237#[cfg(test)]
1238mod adapter_tests {
1239    use std::{sync::Arc, time::Duration};
1240
1241    use fastcrypto::traits::KeyPair;
1242    use iota_types::{
1243        base_types::TransactionDigest,
1244        committee::Committee,
1245        crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1246    };
1247    use rand::{Rng, SeedableRng, rngs::StdRng};
1248
1249    use super::position_submit_certificate;
1250    use crate::{
1251        consensus_adapter::{
1252            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1253        },
1254        mysticeti_adapter::LazyMysticetiClient,
1255    };
1256
1257    fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1258        let authorities = (0..size)
1259            .map(|_k| {
1260                (
1261                    AuthorityPublicKeyBytes::from(
1262                        get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1263                    ),
1264                    rng.gen_range(0u64..10u64),
1265                )
1266            })
1267            .collect::<Vec<_>>();
1268        Committee::new_for_testing_with_normalized_voting_power(
1269            0,
1270            authorities.iter().cloned().collect(),
1271        )
1272    }
1273
1274    #[tokio::test]
1275    async fn test_await_submit_delay_user_transaction() {
1276        // grab a random committee and a random stake distribution
1277        let mut rng = StdRng::from_seed([0; 32]);
1278        let committee = test_committee(&mut rng, 10);
1279
1280        // When we define max submit position and delay step
1281        let consensus_adapter = ConsensusAdapter::new(
1282            Arc::new(LazyMysticetiClient::new()),
1283            *committee.authority_by_index(0).unwrap(),
1284            Arc::new(ConnectionMonitorStatusForTests {}),
1285            100_000,
1286            100_000,
1287            Some(1),
1288            Some(Duration::from_secs(2)),
1289            ConsensusAdapterMetrics::new_test(),
1290        );
1291
1292        // transaction to submit
1293        let tx_digest = TransactionDigest::generate(&mut rng);
1294
1295        // Ensure that the original position is higher
1296        let (position, positions_moved, _) =
1297            consensus_adapter.submission_position(&committee, &tx_digest);
1298        assert_eq!(position, 7);
1299        assert!(!positions_moved > 0);
1300
1301        // Make sure that position is set to max value 0
1302        let (delay_step, position, positions_moved, _) =
1303            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1304
1305        assert_eq!(position, 1);
1306        assert_eq!(delay_step, Duration::from_secs(2));
1307        assert!(!positions_moved > 0);
1308
1309        // Without submit position and delay step
1310        let consensus_adapter = ConsensusAdapter::new(
1311            Arc::new(LazyMysticetiClient::new()),
1312            *committee.authority_by_index(0).unwrap(),
1313            Arc::new(ConnectionMonitorStatusForTests {}),
1314            100_000,
1315            100_000,
1316            None,
1317            None,
1318            ConsensusAdapterMetrics::new_test(),
1319        );
1320
1321        let (delay_step, position, positions_moved, _) =
1322            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1323
1324        assert_eq!(position, 7);
1325
1326        // delay_step * position * 2 = 1 * 7 * 2 = 14
1327        assert_eq!(delay_step, Duration::from_secs(14));
1328        assert!(!positions_moved > 0);
1329    }
1330
1331    #[test]
1332    fn test_position_submit_certificate() {
1333        // grab a random committee and a random stake distribution
1334        let mut rng = StdRng::from_seed([0; 32]);
1335        let committee = test_committee(&mut rng, 10);
1336
1337        // generate random transaction digests, and account for validator selection
1338        const NUM_TEST_TRANSACTIONS: usize = 1000;
1339
1340        for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1341            let tx_digest = TransactionDigest::generate(&mut rng);
1342
1343            let mut zero_found = false;
1344            for (name, _) in committee.members() {
1345                let f = position_submit_certificate(&committee, name, &tx_digest);
1346                assert!(f < committee.num_members());
1347                if f == 0 {
1348                    // One and only one validator gets position 0
1349                    assert!(!zero_found);
1350                    zero_found = true;
1351                }
1352            }
1353            assert!(zero_found);
1354        }
1355    }
1356}