iota_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    future::Future,
8    ops::Deref,
9    sync::{
10        Arc,
11        atomic::{AtomicU64, Ordering},
12    },
13    time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use consensus_core::BlockStatus;
18use dashmap::{DashMap, try_result::TryResult};
19use futures::{
20    FutureExt, StreamExt,
21    future::{self, Either, select},
22    pin_mut,
23    stream::FuturesUnordered,
24};
25use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, spawn_monitored_task};
26use iota_simulator::anemo::PeerId;
27use iota_types::{
28    base_types::{AuthorityName, TransactionDigest},
29    committee::Committee,
30    error::{IotaError, IotaResult},
31    fp_ensure,
32    messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
33};
34use itertools::Itertools;
35use parking_lot::RwLockReadGuard;
36use prometheus::{
37    Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
38    register_histogram_vec_with_registry, register_histogram_with_registry,
39    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
40    register_int_gauge_with_registry,
41};
42use tokio::{
43    sync::{Semaphore, SemaphorePermit, oneshot},
44    task::JoinHandle,
45    time::{self, Duration},
46};
47use tracing::{debug, info, trace, warn};
48
49use crate::{
50    authority::authority_per_epoch_store::AuthorityPerEpochStore,
51    connection_monitor::ConnectionStatus,
52    consensus_handler::{SequencedConsensusTransactionKey, classify},
53    epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
54    metrics::LatencyObserver,
55};
56
57#[cfg(test)]
58#[path = "unit_tests/consensus_tests.rs"]
59pub mod consensus_tests;
60
61const SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS: &[f64] = &[
62    0.1, 0.25, 0.5, 0.75, 1., 1.25, 1.5, 1.75, 2., 2.25, 2.5, 2.75, 3., 4., 5., 6., 7., 10., 15.,
63    20., 25., 30., 60., 90., 120., 150., 180., 210., 240., 270., 300.,
64];
65
66const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
67    0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
68];
69
70pub struct ConsensusAdapterMetrics {
71    // Certificate sequencing metrics
72    pub sequencing_certificate_attempt: IntCounterVec,
73    pub sequencing_certificate_success: IntCounterVec,
74    pub sequencing_certificate_failures: IntCounterVec,
75    pub sequencing_certificate_status: IntCounterVec,
76    pub sequencing_certificate_inflight: IntGaugeVec,
77    pub sequencing_acknowledge_latency: HistogramVec,
78    pub sequencing_certificate_latency: HistogramVec,
79    pub sequencing_certificate_authority_position: Histogram,
80    pub sequencing_certificate_positions_moved: Histogram,
81    pub sequencing_certificate_preceding_disconnected: Histogram,
82    pub sequencing_certificate_processed: IntCounterVec,
83    pub sequencing_in_flight_semaphore_wait: IntGauge,
84    pub sequencing_in_flight_submissions: IntGauge,
85    pub sequencing_estimated_latency: IntGauge,
86    pub sequencing_resubmission_interval_ms: IntGauge,
87}
88
89impl ConsensusAdapterMetrics {
90    pub fn new(registry: &Registry) -> Self {
91        Self {
92            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
93                "sequencing_certificate_attempt",
94                "Counts the number of certificates the validator attempts to sequence.",
95                &["tx_type"],
96                registry,
97            )
98            .unwrap(),
99            sequencing_certificate_success: register_int_counter_vec_with_registry!(
100                "sequencing_certificate_success",
101                "Counts the number of successfully sequenced certificates.",
102                &["tx_type"],
103                registry,
104            )
105            .unwrap(),
106            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
107                "sequencing_certificate_failures",
108                "Counts the number of sequenced certificates that failed other than by timeout.",
109                &["tx_type"],
110                registry,
111            )
112            .unwrap(),
113            sequencing_certificate_status: register_int_counter_vec_with_registry!(
114                "sequencing_certificate_status",
115                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
116                &["tx_type", "status"],
117                registry,
118            )
119            .unwrap(),
120            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
121                "sequencing_certificate_inflight",
122                "The inflight requests to sequence certificates.",
123                &["tx_type"],
124                registry,
125            )
126            .unwrap(),
127            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
128                "sequencing_acknowledge_latency",
129                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
130                &["retry", "tx_type"],
131                SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS.to_vec(),
132                registry,
133            )
134            .unwrap(),
135            sequencing_certificate_latency: register_histogram_vec_with_registry!(
136                "sequencing_certificate_latency",
137                "The latency for sequencing a certificate.",
138                &["position", "tx_type", "processed_method"],
139                SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS.to_vec(),
140                registry,
141            )
142            .unwrap(),
143            sequencing_certificate_authority_position: register_histogram_with_registry!(
144                "sequencing_certificate_authority_position",
145                "The position of the authority when submitted a certificate to consensus.",
146                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
147                registry,
148            )
149            .unwrap(),
150            sequencing_certificate_positions_moved: register_histogram_with_registry!(
151                "sequencing_certificate_positions_moved",
152                "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
153                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
154                registry,
155            )
156            .unwrap(),
157            sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
158                "sequencing_certificate_preceding_disconnected",
159                "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
160                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
161                registry,
162            )
163            .unwrap(),
164            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
165                "sequencing_certificate_processed",
166                "The number of certificates that have been processed either by consensus or checkpoint.",
167                &["source"],
168                registry
169            )
170            .unwrap(),
171            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
172                "sequencing_in_flight_semaphore_wait",
173                "How many requests are blocked on submit_permit.",
174                registry,
175            )
176            .unwrap(),
177            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
178                "sequencing_in_flight_submissions",
179                "Number of transactions submitted to local consensus instance and not yet sequenced",
180                registry,
181            )
182            .unwrap(),
183            sequencing_estimated_latency: register_int_gauge_with_registry!(
184                "sequencing_estimated_latency",
185                "Consensus latency estimated by consensus adapter in milliseconds",
186                registry,
187            )
188            .unwrap(),
189            sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
190                "sequencing_resubmission_interval_ms",
191                "Resubmission interval used by consensus adapter in milliseconds",
192                registry,
193            )
194            .unwrap(),
195        }
196    }
197
198    pub fn new_test() -> Self {
199        Self::new(&Registry::default())
200    }
201}
202
203pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
204
205#[mockall::automock]
206#[async_trait::async_trait]
207pub trait SubmitToConsensus: Sync + Send + 'static {
208    async fn submit_to_consensus(
209        &self,
210        transactions: &[ConsensusTransaction],
211        epoch_store: &Arc<AuthorityPerEpochStore>,
212    ) -> IotaResult;
213}
214
215#[mockall::automock]
216#[async_trait::async_trait]
217pub trait ConsensusClient: Sync + Send + 'static {
218    async fn submit(
219        &self,
220        transactions: &[ConsensusTransaction],
221        epoch_store: &Arc<AuthorityPerEpochStore>,
222    ) -> IotaResult<BlockStatusReceiver>;
223}
224
225/// Submit IOTA certificates to the consensus.
226pub struct ConsensusAdapter {
227    /// The network client connecting to the consensus node of this authority.
228    consensus_client: Arc<dyn ConsensusClient>,
229    /// Authority pubkey.
230    authority: AuthorityName,
231    /// The limit to number of inflight transactions at this node.
232    max_pending_transactions: usize,
233    /// Number of submitted transactions still inflight at this node.
234    num_inflight_transactions: AtomicU64,
235    /// Dictates the maximum position  from which will submit to consensus. Even
236    /// if the is elected to submit from a higher position than this, it
237    /// will "reset" to the max_submit_position.
238    max_submit_position: Option<usize>,
239    /// When provided it will override the current back off logic and will use
240    /// this value instead as delay step.
241    submit_delay_step_override: Option<Duration>,
242    /// A structure to check the connection statuses populated by the Connection
243    /// Monitor Listener
244    connection_monitor_status: Arc<dyn CheckConnection>,
245    /// A structure to check the reputation scores populated by Consensus
246    low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
247    /// A structure to register metrics
248    metrics: ConsensusAdapterMetrics,
249    /// Semaphore limiting parallel submissions to consensus
250    submit_semaphore: Semaphore,
251    latency_observer: LatencyObserver,
252}
253
254pub trait CheckConnection: Send + Sync {
255    fn check_connection(
256        &self,
257        ourself: &AuthorityName,
258        authority: &AuthorityName,
259    ) -> Option<ConnectionStatus>;
260    fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
261}
262
263pub struct ConnectionMonitorStatus {
264    /// Current connection statuses forwarded from the connection monitor
265    pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
266    /// A map from authority name to peer id
267    pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
268}
269
270pub struct ConnectionMonitorStatusForTests {}
271
272impl ConsensusAdapter {
273    /// Make a new Consensus adapter instance.
274    pub fn new(
275        consensus_client: Arc<dyn ConsensusClient>,
276        authority: AuthorityName,
277        connection_monitor_status: Arc<dyn CheckConnection>,
278        max_pending_transactions: usize,
279        max_pending_local_submissions: usize,
280        max_submit_position: Option<usize>,
281        submit_delay_step_override: Option<Duration>,
282        metrics: ConsensusAdapterMetrics,
283    ) -> Self {
284        let num_inflight_transactions = Default::default();
285        let low_scoring_authorities =
286            ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
287        Self {
288            consensus_client,
289            authority,
290            max_pending_transactions,
291            max_submit_position,
292            submit_delay_step_override,
293            num_inflight_transactions,
294            connection_monitor_status,
295            low_scoring_authorities,
296            metrics,
297            submit_semaphore: Semaphore::new(max_pending_local_submissions),
298            latency_observer: LatencyObserver::new(),
299        }
300    }
301
302    pub fn swap_low_scoring_authorities(
303        &self,
304        new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
305    ) {
306        self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
307    }
308
309    // TODO - this probably need to hold some kind of lock to make sure epoch does
310    // not change while we are recovering
311    pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
312        // Currently consensus worker might lose transactions on restart, so we need to
313        // resend them.
314        // TODO: get_all_pending_consensus_transactions is called
315        // twice when initializing AuthorityPerEpochStore and here, should not
316        // be a big deal but can be optimized
317        let mut recovered = epoch_store.get_all_pending_consensus_transactions();
318
319        #[expect(clippy::collapsible_if)] // This if can be collapsed but it will be ugly
320        if epoch_store
321            .get_reconfig_state_read_lock_guard()
322            .is_reject_user_certs()
323            && epoch_store.pending_consensus_certificates_empty()
324        {
325            if recovered
326                .iter()
327                .any(ConsensusTransaction::is_end_of_publish)
328            {
329                // There are two cases when this is needed
330                // (1) We send EndOfPublish message after removing pending certificates in
331                // submit_and_wait_inner It is possible that node will crash
332                // between those two steps, in which case we might need to
333                // re-introduce EndOfPublish message on restart
334                // (2) If node crashed inside ConsensusAdapter::close_epoch,
335                // after reconfig lock state was written to DB and before we persisted
336                // EndOfPublish message
337                recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
338            }
339        }
340        debug!(
341            "Submitting {:?} recovered pending consensus transactions to consensus",
342            recovered.len()
343        );
344        for transaction in recovered {
345            if transaction.is_end_of_publish() {
346                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
347            }
348            self.submit_unchecked(&[transaction], epoch_store);
349        }
350    }
351
352    fn await_submit_delay(
353        &self,
354        committee: &Committee,
355        transactions: &[ConsensusTransaction],
356    ) -> (impl Future<Output = ()>, usize, usize, usize) {
357        // Use the minimum digest to compute submit delay.
358        let min_digest = transactions
359            .iter()
360            .filter_map(|tx| match &tx.kind {
361                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
362                    Some(certificate.digest())
363                }
364                _ => None,
365            })
366            .min();
367
368        let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
369            Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
370            _ => (Duration::ZERO, 0, 0, 0),
371        };
372        (
373            tokio::time::sleep(duration),
374            position,
375            positions_moved,
376            preceding_disconnected,
377        )
378    }
379
380    fn await_submit_delay_user_transaction(
381        &self,
382        committee: &Committee,
383        tx_digest: &TransactionDigest,
384    ) -> (Duration, usize, usize, usize) {
385        let (position, positions_moved, preceding_disconnected) =
386            self.submission_position(committee, tx_digest);
387
388        const DEFAULT_LATENCY: Duration = Duration::from_secs(1); // > p50 consensus latency with global deployment
389        const MIN_LATENCY: Duration = Duration::from_millis(150);
390        const MAX_LATENCY: Duration = Duration::from_secs(3);
391
392        let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
393        self.metrics
394            .sequencing_estimated_latency
395            .set(latency.as_millis() as i64);
396
397        let latency = std::cmp::max(latency, MIN_LATENCY);
398        let latency = std::cmp::min(latency, MAX_LATENCY);
399        let latency = latency * 2;
400        let (delay_step, position) =
401            self.override_by_max_submit_position_settings(latency, position);
402
403        self.metrics
404            .sequencing_resubmission_interval_ms
405            .set(delay_step.as_millis() as i64);
406
407        (
408            delay_step * position as u32,
409            position,
410            positions_moved,
411            preceding_disconnected,
412        )
413    }
414
415    /// Overrides the latency and the position if there are defined settings for
416    /// `max_submit_position` and `submit_delay_step_override`. If the
417    /// `max_submit_position` has defined, then that will always be used
418    /// irrespective of any so far decision. Same for the
419    /// `submit_delay_step_override`.
420    fn override_by_max_submit_position_settings(
421        &self,
422        latency: Duration,
423        mut position: usize,
424    ) -> (Duration, usize) {
425        // Respect any manual override for position and latency from the settings
426        if let Some(max_submit_position) = self.max_submit_position {
427            position = std::cmp::min(position, max_submit_position);
428        }
429
430        let delay_step = self.submit_delay_step_override.unwrap_or(latency);
431        (delay_step, position)
432    }
433
434    /// Check when this authority should submit the certificate to consensus.
435    /// This sorts all authorities based on pseudo-random distribution derived
436    /// from transaction hash.
437    ///
438    /// The function targets having 1 consensus transaction submitted per user
439    /// transaction when system operates normally.
440    ///
441    /// The function returns the position of this authority when it is their
442    /// turn to submit the transaction to consensus.
443    fn submission_position(
444        &self,
445        committee: &Committee,
446        tx_digest: &TransactionDigest,
447    ) -> (usize, usize, usize) {
448        let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
449
450        self.check_submission_wrt_connectivity_and_scores(positions)
451    }
452
453    /// This function runs the following algorithm to decide whether or not to
454    /// submit a transaction to consensus.
455    ///
456    /// It takes in a deterministic list that represents positions of all the
457    /// authorities. The authority in the first position will be responsible
458    /// for submitting to consensus, and so we check if we are this
459    /// validator, and if so, return true.
460    ///
461    /// If we are not in that position, we check our connectivity to the
462    /// authority in that position. If we are connected to them, we can
463    /// assume that they are operational and will submit the transaction. If
464    /// we are not connected to them, we assume that they are not operational
465    /// and we will not rely on that authority to submit the transaction. So
466    /// we shift them out of the first position, and run this algorithm
467    /// again on the new set of positions.
468    ///
469    /// This can possibly result in a transaction being submitted twice if an
470    /// authority sees a false negative in connectivity to another, such as
471    /// in the case of a network partition.
472    ///
473    /// Recursively, if the authority further ahead of us in the positions is a
474    /// low performing authority, we will move our positions up one, and
475    /// submit the transaction. This allows maintaining performance overall.
476    /// We will only do this part for authorities that are not low performers
477    /// themselves to prevent extra amplification in the case that the
478    /// positions look like [low_scoring_a1, low_scoring_a2, a3]
479    fn check_submission_wrt_connectivity_and_scores(
480        &self,
481        positions: Vec<AuthorityName>,
482    ) -> (usize, usize, usize) {
483        let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
484        if low_scoring_authorities.get(&self.authority).is_some() {
485            return (positions.len(), 0, 0);
486        }
487        let initial_position = get_position_in_list(self.authority, positions.clone());
488        let mut preceding_disconnected = 0;
489        let mut before_our_position = true;
490
491        let filtered_positions: Vec<_> = positions
492            .into_iter()
493            .filter(|authority| {
494                let keep = self.authority == *authority; // don't filter ourself out
495                if keep {
496                    before_our_position = false;
497                }
498
499                // filter out any nodes that appear disconnected
500                let connected = self
501                    .connection_monitor_status
502                    .check_connection(&self.authority, authority)
503                    .unwrap_or(ConnectionStatus::Disconnected)
504                    == ConnectionStatus::Connected;
505                if !connected && before_our_position {
506                    preceding_disconnected += 1; // used for metrics
507                }
508
509                // Filter out low scoring nodes
510                let high_scoring = low_scoring_authorities.get(authority).is_none();
511
512                keep || (connected && high_scoring)
513            })
514            .collect();
515
516        let position = get_position_in_list(self.authority, filtered_positions);
517
518        (
519            position,
520            initial_position - position,
521            preceding_disconnected,
522        )
523    }
524
525    /// This method blocks until transaction is persisted in local database
526    /// It then returns handle to async task, user can join this handle to await
527    /// while transaction is processed by consensus
528    ///
529    /// This method guarantees that once submit(but not returned async handle)
530    /// returns, transaction is persisted and will eventually be sent to
531    /// consensus even after restart
532    ///
533    /// When submitting a certificate caller **must** provide a ReconfigState
534    /// lock guard
535    pub fn submit(
536        self: &Arc<Self>,
537        transaction: ConsensusTransaction,
538        lock: Option<&RwLockReadGuard<ReconfigState>>,
539        epoch_store: &Arc<AuthorityPerEpochStore>,
540    ) -> IotaResult<JoinHandle<()>> {
541        self.submit_batch(&[transaction], lock, epoch_store)
542    }
543
544    pub fn submit_batch(
545        self: &Arc<Self>,
546        transactions: &[ConsensusTransaction],
547        lock: Option<&RwLockReadGuard<ReconfigState>>,
548        epoch_store: &Arc<AuthorityPerEpochStore>,
549    ) -> IotaResult<JoinHandle<()>> {
550        if transactions.len() > 1 {
551            // In soft bundle, we need to check if all transactions are of UserTransaction
552            // kind. The check is required because we assume this in
553            // submit_and_wait_inner.
554            for transaction in transactions {
555                fp_ensure!(
556                    matches!(
557                        transaction.kind,
558                        ConsensusTransactionKind::CertifiedTransaction(_)
559                    ),
560                    IotaError::InvalidTxKindInSoftBundle
561                );
562            }
563        }
564
565        epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
566        Ok(self.submit_unchecked(transactions, epoch_store))
567    }
568
569    /// Performs weakly consistent checks on internal buffers to quickly
570    /// discard transactions if we are overloaded
571    pub fn check_limits(&self) -> bool {
572        // First check total transactions (waiting and in submission)
573        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
574            > self.max_pending_transactions
575        {
576            return false;
577        }
578        // Then check if submit_semaphore has permits
579        self.submit_semaphore.available_permits() > 0
580    }
581
582    pub(crate) fn check_consensus_overload(&self) -> IotaResult {
583        fp_ensure!(
584            self.check_limits(),
585            IotaError::TooManyTransactionsPendingConsensus
586        );
587        Ok(())
588    }
589
590    fn submit_unchecked(
591        self: &Arc<Self>,
592        transactions: &[ConsensusTransaction],
593        epoch_store: &Arc<AuthorityPerEpochStore>,
594    ) -> JoinHandle<()> {
595        // Reconfiguration lock is dropped when pending_consensus_transactions is
596        // persisted, before it is handled by consensus
597        let async_stage = self
598            .clone()
599            .submit_and_wait(transactions.to_vec(), epoch_store.clone());
600        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
601        // (Limit is not applied atomically, and only to user transactions.)
602        let join_handle = spawn_monitored_task!(async_stage);
603        join_handle
604    }
605
606    async fn submit_and_wait(
607        self: Arc<Self>,
608        transactions: Vec<ConsensusTransaction>,
609        epoch_store: Arc<AuthorityPerEpochStore>,
610    ) {
611        // When epoch_terminated signal is received all pending submit_and_wait_inner
612        // are dropped.
613        //
614        // This is needed because submit_and_wait_inner waits on read_notify for
615        // consensus message to be processed, which may never happen on epoch
616        // boundary.
617        //
618        // In addition to that, within_alive_epoch ensures that all pending consensus
619        // adapter tasks are stopped before reconfiguration can proceed.
620        //
621        // This is essential because consensus workers reuse same ports when consensus
622        // restarts, this means we might be sending transactions from previous
623        // epochs to consensus of new epoch if we have not had this barrier.
624        epoch_store
625            .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
626            .await
627            .ok(); // result here indicates if epoch ended earlier, we don't
628        // care about it
629    }
630
631    async fn submit_and_wait_inner(
632        self: Arc<Self>,
633        transactions: Vec<ConsensusTransaction>,
634        epoch_store: &Arc<AuthorityPerEpochStore>,
635    ) {
636        if transactions.is_empty() {
637            return;
638        }
639
640        // Current code path ensures:
641        // - If transactions.len() > 1, it is a soft bundle. Otherwise transactions
642        //   should have been submitted individually.
643        // - If is_soft_bundle, then all transactions are of UserTransaction kind.
644        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and
645        //   transactions[0] can be of any kind.
646        let is_soft_bundle = transactions.len() > 1;
647
648        let mut transaction_keys = Vec::new();
649
650        for transaction in &transactions {
651            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
652                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
653                epoch_store.record_epoch_pending_certs_process_time_metric();
654            }
655
656            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
657            transaction_keys.push(transaction_key);
658        }
659        let tx_type = if !is_soft_bundle {
660            classify(&transactions[0])
661        } else {
662            "soft_bundle"
663        };
664
665        let mut guard = InflightDropGuard::acquire(&self, tx_type);
666
667        // Create the waiter until the node's turn comes to submit to consensus
668        let (await_submit, position, positions_moved, preceding_disconnected) =
669            self.await_submit_delay(epoch_store.committee(), &transactions[..]);
670
671        // Create the waiter until the transaction is processed by consensus or via
672        // checkpoint
673        let processed_via_consensus_or_checkpoint =
674            self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
675        pin_mut!(processed_via_consensus_or_checkpoint);
676
677        let processed_waiter = tokio::select! {
678            // We need to wait for some delay until we submit transaction to the consensus
679            _ = await_submit => Some(processed_via_consensus_or_checkpoint),
680
681            // If epoch ends, don't wait for submit delay
682            _ = epoch_store.user_certs_closed_notify() => {
683                warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
684                Some(processed_via_consensus_or_checkpoint)
685            }
686
687            // If transaction is received by consensus or checkpoint while we wait, we are done.
688            _ = &mut processed_via_consensus_or_checkpoint => {
689                None
690            }
691        };
692
693        // Log warnings for administrative transactions that fail to get sequenced
694        let _monitor = if !is_soft_bundle
695            && matches!(
696                transactions[0].kind,
697                ConsensusTransactionKind::EndOfPublish(_)
698                    | ConsensusTransactionKind::CapabilityNotificationV1(_)
699                    | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
700                    | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
701            ) {
702            let transaction_keys = transaction_keys.clone();
703            Some(CancelOnDrop(spawn_monitored_task!(async {
704                let mut i = 0u64;
705                loop {
706                    i += 1;
707                    const WARN_DELAY_S: u64 = 30;
708                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
709                    let total_wait = i * WARN_DELAY_S;
710                    warn!(
711                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
712                        total_wait, transaction_keys
713                    );
714                }
715            })))
716        } else {
717            None
718        };
719
720        if let Some(processed_waiter) = processed_waiter {
721            debug!("Submitting {:?} to consensus", transaction_keys);
722
723            // populate the position only when this authority submits the transaction
724            // to consensus
725            guard.position = Some(position);
726            guard.positions_moved = Some(positions_moved);
727            guard.preceding_disconnected = Some(preceding_disconnected);
728
729            let _permit: SemaphorePermit = self
730                .submit_semaphore
731                .acquire()
732                .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
733                .await
734                .expect("Consensus adapter does not close semaphore");
735            let _in_flight_submission_guard =
736                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
737
738            // We enter this branch when in select above await_submit completed and
739            // processed_waiter is pending This means it is time for us to
740            // submit transaction to consensus
741            let submit_inner = async {
742                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
743
744                loop {
745                    // Submit the transaction to consensus and return the submit result with a
746                    // status waiter
747                    let status_waiter = self
748                        .submit_inner(
749                            &transactions,
750                            epoch_store,
751                            &transaction_keys,
752                            tx_type,
753                            is_soft_bundle,
754                        )
755                        .await;
756
757                    match status_waiter.await {
758                        Ok(BlockStatus::Sequenced(_)) => {
759                            self.metrics
760                                .sequencing_certificate_status
761                                .with_label_values(&[tx_type, "sequenced"])
762                                .inc();
763                            // Block has been sequenced. Nothing more to do, we do have guarantees
764                            // that the transaction will appear in consensus output.
765                            trace!(
766                                "Transaction {transaction_keys:?} has been sequenced by consensus."
767                            );
768                            break;
769                        }
770                        Ok(BlockStatus::GarbageCollected(_)) => {
771                            self.metrics
772                                .sequencing_certificate_status
773                                .with_label_values(&[tx_type, "garbage_collected"])
774                                .inc();
775                            // Block has been garbage collected and we have no guarantees that the
776                            // transaction will appear in consensus output. We'll
777                            // resubmit the transaction to consensus. If the transaction has been
778                            // already "processed", then probably someone else has submitted
779                            // the transaction and managed to get sequenced. Then this future will
780                            // have been cancelled anyways so no need to check here on the processed
781                            // output.
782                            debug!(
783                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
784                            );
785                            time::sleep(RETRY_DELAY_STEP).await;
786                            continue;
787                        }
788                        Err(err) => {
789                            warn!(
790                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
791                                err
792                            );
793                            time::sleep(RETRY_DELAY_STEP).await;
794                            continue;
795                        }
796                    }
797                }
798            };
799
800            guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
801                Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
802                Either::Right(((), processed_waiter)) => {
803                    debug!("Submitted {transaction_keys:?} to consensus");
804                    processed_waiter.await
805                }
806            };
807        }
808        debug!("{transaction_keys:?} processed by consensus");
809
810        let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
811        epoch_store
812            .remove_pending_consensus_transactions(&consensus_keys)
813            .expect("Storage error when removing consensus transaction");
814
815        let is_user_tx = is_soft_bundle
816            || matches!(
817                transactions[0].kind,
818                ConsensusTransactionKind::CertifiedTransaction(_)
819            );
820        let send_end_of_publish = if is_user_tx {
821            // If we are in RejectUserCerts state and we just drained the list we need to
822            // send EndOfPublish to signal other validators that we are not submitting more
823            // certificates to the epoch. Note that there could be a race
824            // condition here where we enter this check in RejectAllCerts state.
825            // In that case we don't need to send EndOfPublish because condition to enter
826            // RejectAllCerts is when 2f+1 other validators already sequenced their
827            // EndOfPublish message. Also note that we could sent multiple
828            // EndOfPublish due to that multiple tasks can enter here with
829            // pending_count == 0. This doesn't affect correctness.
830            if epoch_store
831                .get_reconfig_state_read_lock_guard()
832                .is_reject_user_certs()
833            {
834                let pending_count = epoch_store.pending_consensus_certificates_count();
835                debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
836                pending_count == 0 // send end of epoch if empty
837            } else {
838                false
839            }
840        } else {
841            false
842        };
843        if send_end_of_publish {
844            // sending message outside of any locks scope
845            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
846            if let Err(err) = self.submit(
847                ConsensusTransaction::new_end_of_publish(self.authority),
848                None,
849                epoch_store,
850            ) {
851                warn!("Error when sending end of publish message: {:?}", err);
852            }
853        }
854        self.metrics
855            .sequencing_certificate_success
856            .with_label_values(&[tx_type])
857            .inc();
858    }
859
860    async fn submit_inner(
861        self: &Arc<Self>,
862        transactions: &[ConsensusTransaction],
863        epoch_store: &Arc<AuthorityPerEpochStore>,
864        transaction_keys: &[SequencedConsensusTransactionKey],
865        tx_type: &str,
866        is_soft_bundle: bool,
867    ) -> BlockStatusReceiver {
868        let ack_start = Instant::now();
869        let mut retries: u32 = 0;
870
871        let status_waiter = loop {
872            match self
873                .consensus_client
874                .submit(transactions, epoch_store)
875                .await
876            {
877                Err(err) => {
878                    // This can happen during reconfig, or when consensus has full internal buffers
879                    // and needs to back pressure, so retry a few times before logging warnings.
880                    if retries > 30
881                        || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
882                    {
883                        warn!(
884                            "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
885                        );
886                    }
887                    self.metrics
888                        .sequencing_certificate_failures
889                        .with_label_values(&[tx_type])
890                        .inc();
891                    retries += 1;
892
893                    if !is_soft_bundle && transactions[0].kind.is_dkg() {
894                        // Shorter delay for DKG messages, which are time-sensitive and happen at
895                        // start-of-epoch when submit errors due to active reconfig are likely.
896                        time::sleep(Duration::from_millis(100)).await;
897                    } else {
898                        time::sleep(Duration::from_secs(10)).await;
899                    };
900                }
901                Ok(status_waiter) => {
902                    break status_waiter;
903                }
904            }
905        };
906
907        // we want to record the num of retries when reporting latency but to avoid
908        // label cardinality we do some simple bucketing to give us a good
909        // enough idea of how many retries happened associated with the latency.
910        let bucket = match retries {
911            0..=10 => retries.to_string(), // just report the retry count as is
912            11..=20 => "between_10_and_20".to_string(),
913            21..=50 => "between_20_and_50".to_string(),
914            51..=100 => "between_50_and_100".to_string(),
915            _ => "over_100".to_string(),
916        };
917
918        self.metrics
919            .sequencing_acknowledge_latency
920            .with_label_values(&[bucket.as_str(), tx_type])
921            .observe(ack_start.elapsed().as_secs_f64());
922
923        status_waiter
924    }
925
926    /// Waits for transactions to appear either to consensus output or been
927    /// executed via a checkpoint (state sync).
928    /// Returns the processed method, whether the transactions have been
929    /// processed via consensus, or have been synced via checkpoint.
930    async fn await_consensus_or_checkpoint(
931        self: &Arc<Self>,
932        transaction_keys: Vec<SequencedConsensusTransactionKey>,
933        epoch_store: &Arc<AuthorityPerEpochStore>,
934    ) -> ProcessedMethod {
935        let notifications = FuturesUnordered::new();
936        for transaction_key in transaction_keys {
937            let transaction_digests = if let SequencedConsensusTransactionKey::External(
938                ConsensusTransactionKey::Certificate(digest),
939            ) = transaction_key
940            {
941                vec![digest]
942            } else {
943                vec![]
944            };
945
946            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
947                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
948            ) = transaction_key
949            {
950                // If the transaction is a checkpoint signature, we can also wait to get
951                // notified when a checkpoint with equal or higher sequence
952                // number has been already synced. This way we don't try to unnecessarily
953                // sequence the signature for an already verified checkpoint.
954                Either::Left(epoch_store.synced_checkpoint_notify(checkpoint_sequence_number))
955            } else {
956                Either::Right(future::pending())
957            };
958
959            // We wait for each transaction individually to be processed by consensus or
960            // executed in a checkpoint. We could equally just get notified in
961            // aggregate when all transactions are processed, but with this approach can get
962            // notified in a more fine-grained way as transactions can be marked
963            // as processed in different ways. This is mostly a concern for the soft-bundle
964            // transactions.
965            notifications.push(async move {
966                tokio::select! {
967                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
968                        processed.expect("Storage error when waiting for consensus message processed");
969                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
970                        return ProcessedMethod::Consensus;
971                    },
972                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
973                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
974                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
975                    }
976                    processed = checkpoint_synced_future => {
977                        processed.expect("Error when waiting for checkpoint sequence number");
978                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
979                    }
980                }
981                ProcessedMethod::Checkpoint
982            });
983        }
984
985        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
986        for method in processed_methods {
987            if method == ProcessedMethod::Checkpoint {
988                return ProcessedMethod::Checkpoint;
989            }
990        }
991        ProcessedMethod::Consensus
992    }
993}
994
995impl CheckConnection for ConnectionMonitorStatus {
996    fn check_connection(
997        &self,
998        ourself: &AuthorityName,
999        authority: &AuthorityName,
1000    ) -> Option<ConnectionStatus> {
1001        if ourself == authority {
1002            return Some(ConnectionStatus::Connected);
1003        }
1004
1005        let mapping = self.authority_names_to_peer_ids.load_full();
1006        let peer_id = match mapping.get(authority) {
1007            Some(p) => p,
1008            None => {
1009                warn!(
1010                    "failed to find peer {:?} in connection monitor listener",
1011                    authority
1012                );
1013                return None;
1014            }
1015        };
1016
1017        let res = match self.connection_statuses.try_get(peer_id) {
1018            TryResult::Present(c) => Some(c.value().clone()),
1019            TryResult::Absent => None,
1020            TryResult::Locked => {
1021                // update is in progress, assume the status is still or becoming disconnected
1022                Some(ConnectionStatus::Disconnected)
1023            }
1024        };
1025        res
1026    }
1027    fn update_mapping_for_epoch(
1028        &self,
1029        authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1030    ) {
1031        self.authority_names_to_peer_ids
1032            .swap(Arc::new(authority_names_to_peer_ids));
1033    }
1034}
1035
1036impl CheckConnection for ConnectionMonitorStatusForTests {
1037    fn check_connection(
1038        &self,
1039        _ourself: &AuthorityName,
1040        _authority: &AuthorityName,
1041    ) -> Option<ConnectionStatus> {
1042        Some(ConnectionStatus::Connected)
1043    }
1044    fn update_mapping_for_epoch(
1045        &self,
1046        _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1047    ) {
1048    }
1049}
1050
1051pub fn get_position_in_list(
1052    search_authority: AuthorityName,
1053    positions: Vec<AuthorityName>,
1054) -> usize {
1055    positions
1056        .into_iter()
1057        .find_position(|authority| *authority == search_authority)
1058        .expect("Couldn't find ourselves in shuffled committee")
1059        .0
1060}
1061
1062impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1063    /// This method is called externally to begin reconfiguration
1064    /// It transition reconfig state to reject new certificates from user
1065    /// ConsensusAdapter will send EndOfPublish message once pending certificate
1066    /// queue is drained.
1067    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1068        let send_end_of_publish = {
1069            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1070            if !reconfig_guard.should_accept_user_certs() {
1071                // Allow caller to call this method multiple times
1072                return;
1073            }
1074            let pending_count = epoch_store.pending_consensus_certificates_count();
1075            debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1076            let send_end_of_publish = pending_count == 0;
1077            epoch_store.close_user_certs(reconfig_guard);
1078            send_end_of_publish
1079            // reconfig_guard lock is dropped here.
1080        };
1081        if send_end_of_publish {
1082            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1083            if let Err(err) = self.submit(
1084                ConsensusTransaction::new_end_of_publish(self.authority),
1085                None,
1086                epoch_store,
1087            ) {
1088                warn!("Error when sending end of publish message: {:?}", err);
1089            }
1090        }
1091    }
1092}
1093
1094struct CancelOnDrop<T>(JoinHandle<T>);
1095
1096impl<T> Deref for CancelOnDrop<T> {
1097    type Target = JoinHandle<T>;
1098
1099    fn deref(&self) -> &Self::Target {
1100        &self.0
1101    }
1102}
1103
1104impl<T> Drop for CancelOnDrop<T> {
1105    fn drop(&mut self) {
1106        self.0.abort();
1107    }
1108}
1109
1110/// Tracks number of inflight consensus requests and relevant metrics
1111struct InflightDropGuard<'a> {
1112    adapter: &'a ConsensusAdapter,
1113    start: Instant,
1114    position: Option<usize>,
1115    positions_moved: Option<usize>,
1116    preceding_disconnected: Option<usize>,
1117    tx_type: &'static str,
1118    processed_method: ProcessedMethod,
1119}
1120
1121#[derive(PartialEq, Eq)]
1122enum ProcessedMethod {
1123    Consensus,
1124    Checkpoint,
1125}
1126
1127impl<'a> InflightDropGuard<'a> {
1128    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1129        adapter
1130            .num_inflight_transactions
1131            .fetch_add(1, Ordering::SeqCst);
1132        adapter
1133            .metrics
1134            .sequencing_certificate_inflight
1135            .with_label_values(&[tx_type])
1136            .inc();
1137        adapter
1138            .metrics
1139            .sequencing_certificate_attempt
1140            .with_label_values(&[tx_type])
1141            .inc();
1142        Self {
1143            adapter,
1144            start: Instant::now(),
1145            position: None,
1146            positions_moved: None,
1147            preceding_disconnected: None,
1148            tx_type,
1149            processed_method: ProcessedMethod::Consensus,
1150        }
1151    }
1152}
1153
1154impl Drop for InflightDropGuard<'_> {
1155    fn drop(&mut self) {
1156        self.adapter
1157            .num_inflight_transactions
1158            .fetch_sub(1, Ordering::SeqCst);
1159        self.adapter
1160            .metrics
1161            .sequencing_certificate_inflight
1162            .with_label_values(&[self.tx_type])
1163            .dec();
1164
1165        let position = if let Some(position) = self.position {
1166            self.adapter
1167                .metrics
1168                .sequencing_certificate_authority_position
1169                .observe(position as f64);
1170            position.to_string()
1171        } else {
1172            "not_submitted".to_string()
1173        };
1174
1175        if let Some(positions_moved) = self.positions_moved {
1176            self.adapter
1177                .metrics
1178                .sequencing_certificate_positions_moved
1179                .observe(positions_moved as f64);
1180        };
1181
1182        if let Some(preceding_disconnected) = self.preceding_disconnected {
1183            self.adapter
1184                .metrics
1185                .sequencing_certificate_preceding_disconnected
1186                .observe(preceding_disconnected as f64);
1187        };
1188
1189        let latency = self.start.elapsed();
1190        let processed_method = match self.processed_method {
1191            ProcessedMethod::Consensus => "processed_via_consensus",
1192            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1193        };
1194        self.adapter
1195            .metrics
1196            .sequencing_certificate_latency
1197            .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1198            .observe(latency.as_secs_f64());
1199
1200        // Only sample latency after consensus quorum is up. Otherwise, the wait for
1201        // consensus quorum at the beginning of an epoch can distort the sampled
1202        // latencies. Technically there are more system transaction types that
1203        // can be included in samples after the first consensus commit, but this
1204        // set of types should be enough.
1205        if self.position == Some(0) {
1206            // Transaction types below require quorum existed in the current epoch.
1207            // TODO: refactor tx_type to enum.
1208            let sampled = matches!(
1209                self.tx_type,
1210                "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1211            );
1212            // if tx has been processed by checkpoint state sync, then exclude from the
1213            // latency calculations as this can introduce to misleading results.
1214            if sampled && self.processed_method == ProcessedMethod::Consensus {
1215                self.adapter.latency_observer.report(latency);
1216            }
1217        }
1218    }
1219}
1220
1221#[async_trait::async_trait]
1222impl SubmitToConsensus for Arc<ConsensusAdapter> {
1223    async fn submit_to_consensus(
1224        &self,
1225        transactions: &[ConsensusTransaction],
1226        epoch_store: &Arc<AuthorityPerEpochStore>,
1227    ) -> IotaResult {
1228        self.submit_batch(transactions, None, epoch_store)
1229            .map(|_| ())
1230    }
1231}
1232
1233pub fn position_submit_certificate(
1234    committee: &Committee,
1235    ourselves: &AuthorityName,
1236    tx_digest: &TransactionDigest,
1237) -> usize {
1238    let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1239    get_position_in_list(*ourselves, validators)
1240}
1241
1242#[cfg(test)]
1243mod adapter_tests {
1244    use std::{sync::Arc, time::Duration};
1245
1246    use fastcrypto::traits::KeyPair;
1247    use iota_types::{
1248        base_types::TransactionDigest,
1249        committee::Committee,
1250        crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1251    };
1252    use rand::{Rng, SeedableRng, rngs::StdRng};
1253
1254    use super::position_submit_certificate;
1255    use crate::{
1256        consensus_adapter::{
1257            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1258        },
1259        mysticeti_adapter::LazyMysticetiClient,
1260    };
1261
1262    fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1263        let authorities = (0..size)
1264            .map(|_k| {
1265                (
1266                    AuthorityPublicKeyBytes::from(
1267                        get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1268                    ),
1269                    rng.gen_range(0u64..10u64),
1270                )
1271            })
1272            .collect::<Vec<_>>();
1273        Committee::new_for_testing_with_normalized_voting_power(
1274            0,
1275            authorities.iter().cloned().collect(),
1276        )
1277    }
1278
1279    #[tokio::test]
1280    async fn test_await_submit_delay_user_transaction() {
1281        // grab a random committee and a random stake distribution
1282        let mut rng = StdRng::from_seed([0; 32]);
1283        let committee = test_committee(&mut rng, 10);
1284
1285        // When we define max submit position and delay step
1286        let consensus_adapter = ConsensusAdapter::new(
1287            Arc::new(LazyMysticetiClient::new()),
1288            *committee.authority_by_index(0).unwrap(),
1289            Arc::new(ConnectionMonitorStatusForTests {}),
1290            100_000,
1291            100_000,
1292            Some(1),
1293            Some(Duration::from_secs(2)),
1294            ConsensusAdapterMetrics::new_test(),
1295        );
1296
1297        // transaction to submit
1298        let tx_digest = TransactionDigest::generate(&mut rng);
1299
1300        // Ensure that the original position is higher
1301        let (position, positions_moved, _) =
1302            consensus_adapter.submission_position(&committee, &tx_digest);
1303        assert_eq!(position, 7);
1304        assert!(!positions_moved > 0);
1305
1306        // Make sure that position is set to max value 0
1307        let (delay_step, position, positions_moved, _) =
1308            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1309
1310        assert_eq!(position, 1);
1311        assert_eq!(delay_step, Duration::from_secs(2));
1312        assert!(!positions_moved > 0);
1313
1314        // Without submit position and delay step
1315        let consensus_adapter = ConsensusAdapter::new(
1316            Arc::new(LazyMysticetiClient::new()),
1317            *committee.authority_by_index(0).unwrap(),
1318            Arc::new(ConnectionMonitorStatusForTests {}),
1319            100_000,
1320            100_000,
1321            None,
1322            None,
1323            ConsensusAdapterMetrics::new_test(),
1324        );
1325
1326        let (delay_step, position, positions_moved, _) =
1327            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1328
1329        assert_eq!(position, 7);
1330
1331        // delay_step * position * 2 = 1 * 7 * 2 = 14
1332        assert_eq!(delay_step, Duration::from_secs(14));
1333        assert!(!positions_moved > 0);
1334    }
1335
1336    #[test]
1337    fn test_position_submit_certificate() {
1338        // grab a random committee and a random stake distribution
1339        let mut rng = StdRng::from_seed([0; 32]);
1340        let committee = test_committee(&mut rng, 10);
1341
1342        // generate random transaction digests, and account for validator selection
1343        const NUM_TEST_TRANSACTIONS: usize = 1000;
1344
1345        for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1346            let tx_digest = TransactionDigest::generate(&mut rng);
1347
1348            let mut zero_found = false;
1349            for (name, _) in committee.members() {
1350                let f = position_submit_certificate(&committee, name, &tx_digest);
1351                assert!(f < committee.num_members());
1352                if f == 0 {
1353                    // One and only one validator gets position 0
1354                    assert!(!zero_found);
1355                    zero_found = true;
1356                }
1357            }
1358            assert!(zero_found);
1359        }
1360    }
1361}