iota_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    future::Future,
8    ops::Deref,
9    sync::{
10        Arc,
11        atomic::{AtomicU64, Ordering},
12    },
13    time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use dashmap::{DashMap, try_result::TryResult};
18use futures::{
19    FutureExt, StreamExt,
20    future::{self, Either, select},
21    pin_mut,
22    stream::FuturesUnordered,
23};
24use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task};
25use iota_simulator::anemo::PeerId;
26use iota_types::{
27    base_types::{AuthorityName, TransactionDigest},
28    committee::Committee,
29    error::{IotaError, IotaResult},
30    fp_ensure,
31    messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
32};
33use itertools::Itertools;
34use parking_lot::RwLockReadGuard;
35use prometheus::{
36    Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
37    register_histogram_vec_with_registry, register_histogram_with_registry,
38    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
39    register_int_gauge_with_registry,
40};
41use tokio::{
42    sync::{Semaphore, SemaphorePermit, oneshot},
43    task::JoinHandle,
44    time::{
45        Duration, {self},
46    },
47};
48use tracing::{debug, info, trace, warn};
49
50use crate::{
51    authority::authority_per_epoch_store::AuthorityPerEpochStore,
52    checkpoints::CheckpointStore,
53    connection_monitor::ConnectionStatus,
54    consensus_handler::{SequencedConsensusTransactionKey, classify},
55    epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
56    metrics::LatencyObserver,
57};
58
59#[cfg(test)]
60#[path = "unit_tests/consensus_tests.rs"]
61pub mod consensus_tests;
62
63const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
64    0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
65];
66
67pub struct ConsensusAdapterMetrics {
68    // Certificate sequencing metrics
69    pub sequencing_certificate_attempt: IntCounterVec,
70    pub sequencing_certificate_success: IntCounterVec,
71    pub sequencing_certificate_failures: IntCounterVec,
72    pub sequencing_certificate_status: IntCounterVec,
73    pub sequencing_certificate_inflight: IntGaugeVec,
74    pub sequencing_acknowledge_latency: HistogramVec,
75    pub sequencing_certificate_latency: HistogramVec,
76    pub sequencing_certificate_authority_position: Histogram,
77    pub sequencing_certificate_positions_moved: Histogram,
78    pub sequencing_certificate_preceding_disconnected: Histogram,
79    pub sequencing_certificate_processed: IntCounterVec,
80    pub sequencing_in_flight_semaphore_wait: IntGauge,
81    pub sequencing_in_flight_submissions: IntGauge,
82    pub sequencing_estimated_latency: IntGauge,
83    pub sequencing_resubmission_interval_ms: IntGauge,
84}
85
86impl ConsensusAdapterMetrics {
87    pub fn new(registry: &Registry) -> Self {
88        Self {
89            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
90                "sequencing_certificate_attempt",
91                "Counts the number of certificates the validator attempts to sequence.",
92                &["tx_type"],
93                registry,
94            )
95            .unwrap(),
96            sequencing_certificate_success: register_int_counter_vec_with_registry!(
97                "sequencing_certificate_success",
98                "Counts the number of successfully sequenced certificates.",
99                &["tx_type"],
100                registry,
101            )
102            .unwrap(),
103            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
104                "sequencing_certificate_failures",
105                "Counts the number of sequenced certificates that failed other than by timeout.",
106                &["tx_type"],
107                registry,
108            )
109            .unwrap(),
110            sequencing_certificate_status: register_int_counter_vec_with_registry!(
111                "sequencing_certificate_status",
112                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
113                &["tx_type", "status"],
114                registry,
115            )
116            .unwrap(),
117            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
118                "sequencing_certificate_inflight",
119                "The inflight requests to sequence certificates.",
120                &["tx_type"],
121                registry,
122            )
123            .unwrap(),
124            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
125                "sequencing_acknowledge_latency",
126                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
127                &["retry", "tx_type"],
128                LATENCY_SEC_BUCKETS.to_vec(),
129                registry,
130            )
131            .unwrap(),
132            sequencing_certificate_latency: register_histogram_vec_with_registry!(
133                "sequencing_certificate_latency",
134                "The latency for sequencing a certificate.",
135                &["position", "tx_type", "processed_method"],
136                LATENCY_SEC_BUCKETS.to_vec(),
137                registry,
138            )
139            .unwrap(),
140            sequencing_certificate_authority_position: register_histogram_with_registry!(
141                "sequencing_certificate_authority_position",
142                "The position of the authority when submitted a certificate to consensus.",
143                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
144                registry,
145            )
146            .unwrap(),
147            sequencing_certificate_positions_moved: register_histogram_with_registry!(
148                "sequencing_certificate_positions_moved",
149                "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
150                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
151                registry,
152            )
153            .unwrap(),
154            sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155                "sequencing_certificate_preceding_disconnected",
156                "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158                registry,
159            )
160            .unwrap(),
161            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162                "sequencing_certificate_processed",
163                "The number of certificates that have been processed either by consensus or checkpoint.",
164                &["source"],
165                registry
166            )
167            .unwrap(),
168            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169                "sequencing_in_flight_semaphore_wait",
170                "How many requests are blocked on submit_permit.",
171                registry,
172            )
173            .unwrap(),
174            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175                "sequencing_in_flight_submissions",
176                "Number of transactions submitted to local consensus instance and not yet sequenced",
177                registry,
178            )
179            .unwrap(),
180            sequencing_estimated_latency: register_int_gauge_with_registry!(
181                "sequencing_estimated_latency",
182                "Consensus latency estimated by consensus adapter in milliseconds",
183                registry,
184            )
185            .unwrap(),
186            sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187                "sequencing_resubmission_interval_ms",
188                "Resubmission interval used by consensus adapter in milliseconds",
189                registry,
190            )
191            .unwrap(),
192        }
193    }
194
195    pub fn new_test() -> Self {
196        Self::new(&Registry::default())
197    }
198}
199
200/// Block status for internal use in the consensus adapter.
201pub enum BlockStatusInternal {
202    Sequenced,
203    GarbageCollected,
204}
205
206impl From<starfish_core::BlockStatus> for BlockStatusInternal {
207    fn from(status: starfish_core::BlockStatus) -> Self {
208        match status {
209            starfish_core::BlockStatus::Sequenced(_) => BlockStatusInternal::Sequenced,
210            starfish_core::BlockStatus::GarbageCollected(_) => {
211                BlockStatusInternal::GarbageCollected
212            }
213        }
214    }
215}
216
217pub type BlockStatusReceiver = oneshot::Receiver<BlockStatusInternal>;
218
219#[mockall::automock]
220pub trait SubmitToConsensus: Sync + Send + 'static {
221    fn submit_to_consensus(
222        &self,
223        transactions: &[ConsensusTransaction],
224        epoch_store: &Arc<AuthorityPerEpochStore>,
225    ) -> IotaResult;
226}
227
228#[mockall::automock]
229#[async_trait::async_trait]
230pub trait ConsensusClient: Sync + Send + 'static {
231    async fn submit(
232        &self,
233        transactions: &[ConsensusTransaction],
234        epoch_store: &Arc<AuthorityPerEpochStore>,
235    ) -> IotaResult<BlockStatusReceiver>;
236}
237
238/// Submit IOTA certificates to the consensus.
239pub struct ConsensusAdapter {
240    /// The network client connecting to the consensus node of this authority.
241    consensus_client: Arc<dyn ConsensusClient>,
242    /// The checkpoint store for the validator
243    checkpoint_store: Arc<CheckpointStore>,
244    /// Authority pubkey.
245    authority: AuthorityName,
246    /// The limit to number of inflight transactions at this node.
247    max_pending_transactions: usize,
248    /// Number of submitted transactions still inflight at this node.
249    num_inflight_transactions: AtomicU64,
250    /// Dictates the maximum position  from which will submit to consensus. Even
251    /// if the is elected to submit from a higher position than this, it
252    /// will "reset" to the max_submit_position.
253    max_submit_position: Option<usize>,
254    /// When provided it will override the current back off logic and will use
255    /// this value instead as delay step.
256    submit_delay_step_override: Option<Duration>,
257    /// A structure to check the connection statuses populated by the Connection
258    /// Monitor Listener
259    connection_monitor_status: Arc<dyn CheckConnection>,
260    /// A structure to check the reputation scores populated by Consensus
261    low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
262    /// A structure to register metrics
263    metrics: ConsensusAdapterMetrics,
264    /// Semaphore limiting parallel submissions to consensus
265    submit_semaphore: Semaphore,
266    latency_observer: LatencyObserver,
267}
268
269pub trait CheckConnection: Send + Sync {
270    fn check_connection(
271        &self,
272        ourself: &AuthorityName,
273        authority: &AuthorityName,
274    ) -> Option<ConnectionStatus>;
275    fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
276}
277
278pub struct ConnectionMonitorStatus {
279    /// Current connection statuses forwarded from the connection monitor
280    pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
281    /// A map from authority name to peer id
282    pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
283}
284
285pub struct ConnectionMonitorStatusForTests {}
286
287impl ConsensusAdapter {
288    /// Make a new Consensus adapter instance.
289    pub fn new(
290        consensus_client: Arc<dyn ConsensusClient>,
291        checkpoint_store: Arc<CheckpointStore>,
292        authority: AuthorityName,
293        connection_monitor_status: Arc<dyn CheckConnection>,
294        max_pending_transactions: usize,
295        max_pending_local_submissions: usize,
296        max_submit_position: Option<usize>,
297        submit_delay_step_override: Option<Duration>,
298        metrics: ConsensusAdapterMetrics,
299    ) -> Self {
300        let num_inflight_transactions = Default::default();
301        let low_scoring_authorities =
302            ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
303        Self {
304            consensus_client,
305            checkpoint_store,
306            authority,
307            max_pending_transactions,
308            max_submit_position,
309            submit_delay_step_override,
310            num_inflight_transactions,
311            connection_monitor_status,
312            low_scoring_authorities,
313            metrics,
314            submit_semaphore: Semaphore::new(max_pending_local_submissions),
315            latency_observer: LatencyObserver::new(),
316        }
317    }
318
319    pub fn swap_low_scoring_authorities(
320        &self,
321        new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
322    ) {
323        self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
324    }
325
326    pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
327        // Currently consensus worker might lose transactions on restart, so we need to
328        // resend them.
329        // TODO: get_all_pending_consensus_transactions is called
330        // twice when initializing AuthorityPerEpochStore and here, should not
331        // be a big deal but can be optimized
332        let mut recovered = epoch_store.get_all_pending_consensus_transactions();
333
334        if epoch_store
335            .get_reconfig_state_read_lock_guard()
336            .is_reject_user_certs()
337            && epoch_store.pending_consensus_certificates_empty()
338        {
339            // If `recovered` does not contain `EndOfPublish` yet, we need to insert it.
340            if !recovered
341                .iter()
342                .any(ConsensusTransaction::is_end_of_publish)
343            {
344                // There are two cases when this is needed:
345                // (1) We send `EndOfPublish` message after removing pending certificates in
346                // `submit_and_wait_inner`. It is possible that node will crash
347                // between those two steps, in which case we might need to
348                // re-introduce `EndOfPublish` message on restart.
349                // (2) If node crashed inside `ConsensusAdapter::close_epoch`,
350                // after reconfig lock state was written to DB and before we persisted
351                // `EndOfPublish` message.
352                recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
353            }
354        }
355        debug!(
356            "Submitting {:?} recovered pending consensus transactions to consensus",
357            recovered.len()
358        );
359        for transaction in recovered {
360            if transaction.is_end_of_publish() {
361                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
362            }
363            self.submit_unchecked(&[transaction], epoch_store);
364        }
365    }
366
367    fn await_submit_delay(
368        &self,
369        committee: &Committee,
370        transactions: &[ConsensusTransaction],
371    ) -> (impl Future<Output = ()>, usize, usize, usize) {
372        // Use the minimum digest to compute submit delay.
373        let min_digest = transactions
374            .iter()
375            .filter_map(|tx| match &tx.kind {
376                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
377                    Some(certificate.digest())
378                }
379                _ => None,
380            })
381            .min();
382
383        let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
384            Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
385            _ => (Duration::ZERO, 0, 0, 0),
386        };
387        (
388            tokio::time::sleep(duration),
389            position,
390            positions_moved,
391            preceding_disconnected,
392        )
393    }
394
395    fn await_submit_delay_user_transaction(
396        &self,
397        committee: &Committee,
398        tx_digest: &TransactionDigest,
399    ) -> (Duration, usize, usize, usize) {
400        let (position, positions_moved, preceding_disconnected) =
401            self.submission_position(committee, tx_digest);
402
403        const DEFAULT_LATENCY: Duration = Duration::from_secs(1); // > p50 consensus latency with global deployment
404        const MIN_LATENCY: Duration = Duration::from_millis(150);
405        const MAX_LATENCY: Duration = Duration::from_secs(3);
406
407        let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
408        self.metrics
409            .sequencing_estimated_latency
410            .set(latency.as_millis() as i64);
411
412        let latency = std::cmp::max(latency, MIN_LATENCY);
413        let latency = std::cmp::min(latency, MAX_LATENCY);
414        let latency = latency * 2;
415        let (delay_step, position) =
416            self.override_by_max_submit_position_settings(latency, position);
417
418        self.metrics
419            .sequencing_resubmission_interval_ms
420            .set(delay_step.as_millis() as i64);
421
422        (
423            delay_step * position as u32,
424            position,
425            positions_moved,
426            preceding_disconnected,
427        )
428    }
429
430    /// Overrides the latency and the position if there are defined settings for
431    /// `max_submit_position` and `submit_delay_step_override`. If the
432    /// `max_submit_position` has defined, then that will always be used
433    /// irrespective of any so far decision. Same for the
434    /// `submit_delay_step_override`.
435    fn override_by_max_submit_position_settings(
436        &self,
437        latency: Duration,
438        mut position: usize,
439    ) -> (Duration, usize) {
440        // Respect any manual override for position and latency from the settings
441        if let Some(max_submit_position) = self.max_submit_position {
442            position = std::cmp::min(position, max_submit_position);
443        }
444
445        let delay_step = self.submit_delay_step_override.unwrap_or(latency);
446        (delay_step, position)
447    }
448
449    /// Check when this authority should submit the certificate to consensus.
450    /// This sorts all authorities based on pseudo-random distribution derived
451    /// from transaction hash.
452    ///
453    /// The function targets having 1 consensus transaction submitted per user
454    /// transaction when system operates normally.
455    ///
456    /// The function returns the position of this authority when it is their
457    /// turn to submit the transaction to consensus.
458    fn submission_position(
459        &self,
460        committee: &Committee,
461        tx_digest: &TransactionDigest,
462    ) -> (usize, usize, usize) {
463        let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
464
465        self.check_submission_wrt_connectivity_and_scores(positions)
466    }
467
468    /// This function runs the following algorithm to decide whether or not to
469    /// submit a transaction to consensus.
470    ///
471    /// It takes in a deterministic list that represents positions of all the
472    /// authorities. The authority in the first position will be responsible
473    /// for submitting to consensus, and so we check if we are this
474    /// validator, and if so, return true.
475    ///
476    /// If we are not in that position, we check our connectivity to the
477    /// authority in that position. If we are connected to them, we can
478    /// assume that they are operational and will submit the transaction. If
479    /// we are not connected to them, we assume that they are not operational
480    /// and we will not rely on that authority to submit the transaction. So
481    /// we shift them out of the first position, and run this algorithm
482    /// again on the new set of positions.
483    ///
484    /// This can possibly result in a transaction being submitted twice if an
485    /// authority sees a false negative in connectivity to another, such as
486    /// in the case of a network partition.
487    ///
488    /// Recursively, if the authority further ahead of us in the positions is a
489    /// low performing authority, we will move our positions up one, and
490    /// submit the transaction. This allows maintaining performance overall.
491    /// We will only do this part for authorities that are not low performers
492    /// themselves to prevent extra amplification in the case that the
493    /// positions look like [low_scoring_a1, low_scoring_a2, a3]
494    fn check_submission_wrt_connectivity_and_scores(
495        &self,
496        positions: Vec<AuthorityName>,
497    ) -> (usize, usize, usize) {
498        let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
499        if low_scoring_authorities.get(&self.authority).is_some() {
500            return (positions.len(), 0, 0);
501        }
502        let initial_position = get_position_in_list(self.authority, positions.clone());
503        let mut preceding_disconnected = 0;
504        let mut before_our_position = true;
505
506        let filtered_positions: Vec<_> = positions
507            .into_iter()
508            .filter(|authority| {
509                let keep = self.authority == *authority; // don't filter ourself out
510                if keep {
511                    before_our_position = false;
512                }
513
514                // filter out any nodes that appear disconnected
515                let connected = self
516                    .connection_monitor_status
517                    .check_connection(&self.authority, authority)
518                    .unwrap_or(ConnectionStatus::Disconnected)
519                    == ConnectionStatus::Connected;
520                if !connected && before_our_position {
521                    preceding_disconnected += 1; // used for metrics
522                }
523
524                // Filter out low scoring nodes
525                let high_scoring = low_scoring_authorities.get(authority).is_none();
526
527                keep || (connected && high_scoring)
528            })
529            .collect();
530
531        let position = get_position_in_list(self.authority, filtered_positions);
532
533        (
534            position,
535            initial_position - position,
536            preceding_disconnected,
537        )
538    }
539
540    /// This method blocks until transaction is persisted in local database
541    /// It then returns handle to async task, user can join this handle to await
542    /// while transaction is processed by consensus
543    ///
544    /// This method guarantees that once submit(but not returned async handle)
545    /// returns, transaction is persisted and will eventually be sent to
546    /// consensus even after restart
547    ///
548    /// When submitting a certificate caller **must** provide a ReconfigState
549    /// lock guard
550    pub fn submit(
551        self: &Arc<Self>,
552        transaction: ConsensusTransaction,
553        lock: Option<&RwLockReadGuard<ReconfigState>>,
554        epoch_store: &Arc<AuthorityPerEpochStore>,
555    ) -> IotaResult<JoinHandle<()>> {
556        self.submit_batch(&[transaction], lock, epoch_store)
557    }
558
559    pub fn submit_batch(
560        self: &Arc<Self>,
561        transactions: &[ConsensusTransaction],
562        lock: Option<&RwLockReadGuard<ReconfigState>>,
563        epoch_store: &Arc<AuthorityPerEpochStore>,
564    ) -> IotaResult<JoinHandle<()>> {
565        if transactions.len() > 1 {
566            // In soft bundle, we need to check if all transactions are of UserTransaction
567            // kind. The check is required because we assume this in
568            // submit_and_wait_inner.
569            for transaction in transactions {
570                fp_ensure!(
571                    matches!(
572                        transaction.kind,
573                        ConsensusTransactionKind::CertifiedTransaction(_)
574                    ),
575                    IotaError::InvalidTxKindInSoftBundle
576                );
577            }
578        }
579
580        epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
581        Ok(self.submit_unchecked(transactions, epoch_store))
582    }
583
584    /// Performs weakly consistent checks on internal buffers to quickly
585    /// discard transactions if we are overloaded
586    pub fn check_limits(&self) -> bool {
587        // First check total transactions (waiting and in submission)
588        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
589            > self.max_pending_transactions
590        {
591            return false;
592        }
593        // Then check if submit_semaphore has permits
594        self.submit_semaphore.available_permits() > 0
595    }
596
597    pub(crate) fn check_consensus_overload(&self) -> IotaResult {
598        fp_ensure!(
599            self.check_limits(),
600            IotaError::TooManyTransactionsPendingConsensus
601        );
602        Ok(())
603    }
604
605    fn submit_unchecked(
606        self: &Arc<Self>,
607        transactions: &[ConsensusTransaction],
608        epoch_store: &Arc<AuthorityPerEpochStore>,
609    ) -> JoinHandle<()> {
610        // Reconfiguration lock is dropped when pending_consensus_transactions is
611        // persisted, before it is handled by consensus
612        let async_stage = self
613            .clone()
614            .submit_and_wait(transactions.to_vec(), epoch_store.clone());
615        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
616        // (Limit is not applied atomically, and only to user transactions.)
617        let join_handle = spawn_monitored_task!(async_stage);
618        join_handle
619    }
620
621    async fn submit_and_wait(
622        self: Arc<Self>,
623        transactions: Vec<ConsensusTransaction>,
624        epoch_store: Arc<AuthorityPerEpochStore>,
625    ) {
626        // When epoch_terminated signal is received all pending submit_and_wait_inner
627        // are dropped.
628        //
629        // This is needed because submit_and_wait_inner waits on read_notify for
630        // consensus message to be processed, which may never happen on epoch
631        // boundary.
632        //
633        // In addition to that, within_alive_epoch ensures that all pending consensus
634        // adapter tasks are stopped before reconfiguration can proceed.
635        //
636        // This is essential because consensus workers reuse same ports when consensus
637        // restarts, this means we might be sending transactions from previous
638        // epochs to consensus of new epoch if we have not had this barrier.
639        epoch_store
640            .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
641            .await
642            .ok(); // result here indicates if epoch ended earlier, we don't
643        // care about it
644    }
645
646    async fn submit_and_wait_inner(
647        self: Arc<Self>,
648        transactions: Vec<ConsensusTransaction>,
649        epoch_store: &Arc<AuthorityPerEpochStore>,
650    ) {
651        if transactions.is_empty() {
652            return;
653        }
654
655        // Current code path ensures:
656        // - If transactions.len() > 1, it is a soft bundle. Otherwise transactions
657        //   should have been submitted individually.
658        // - If is_soft_bundle, then all transactions are of UserTransaction kind.
659        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and
660        //   transactions[0] can be of any kind.
661        let is_soft_bundle = transactions.len() > 1;
662
663        let mut transaction_keys = Vec::new();
664
665        for transaction in &transactions {
666            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
667                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
668                epoch_store.record_epoch_pending_certs_process_time_metric();
669            }
670
671            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
672            transaction_keys.push(transaction_key);
673        }
674        let tx_type = if !is_soft_bundle {
675            classify(&transactions[0])
676        } else {
677            "soft_bundle"
678        };
679
680        let mut guard = InflightDropGuard::acquire(&self, tx_type);
681
682        // Create the waiter until the node's turn comes to submit to consensus
683        let (await_submit, position, positions_moved, preceding_disconnected) =
684            self.await_submit_delay(epoch_store.committee(), &transactions[..]);
685
686        // Create the waiter until the transaction is processed by consensus or via
687        // checkpoint
688        let processed_via_consensus_or_checkpoint =
689            self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
690        pin_mut!(processed_via_consensus_or_checkpoint);
691
692        let processed_waiter = tokio::select! {
693            // We need to wait for some delay until we submit transaction to the consensus
694            _ = await_submit => Some(processed_via_consensus_or_checkpoint),
695
696            // If epoch ends, don't wait for submit delay
697            _ = epoch_store.user_certs_closed_notify() => {
698                warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
699                Some(processed_via_consensus_or_checkpoint)
700            }
701
702            // If transaction is received by consensus or checkpoint while we wait, we are done.
703            _ = &mut processed_via_consensus_or_checkpoint => {
704                None
705            }
706        };
707
708        // Log warnings for administrative transactions that fail to get sequenced
709        let _monitor = if !is_soft_bundle
710            && matches!(
711                transactions[0].kind,
712                ConsensusTransactionKind::EndOfPublish(_)
713                    | ConsensusTransactionKind::CapabilityNotificationV1(_)
714                    | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
715                    | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
716            ) {
717            let transaction_keys = transaction_keys.clone();
718            Some(CancelOnDrop(spawn_monitored_task!(async {
719                let mut i = 0u64;
720                loop {
721                    i += 1;
722                    const WARN_DELAY_S: u64 = 30;
723                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
724                    let total_wait = i * WARN_DELAY_S;
725                    warn!(
726                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
727                        total_wait, transaction_keys
728                    );
729                }
730            })))
731        } else {
732            None
733        };
734
735        if let Some(processed_waiter) = processed_waiter {
736            debug!("Submitting {:?} to consensus", transaction_keys);
737
738            // populate the position only when this authority submits the transaction
739            // to consensus
740            guard.position = Some(position);
741            guard.positions_moved = Some(positions_moved);
742            guard.preceding_disconnected = Some(preceding_disconnected);
743
744            let _permit: SemaphorePermit = self
745                .submit_semaphore
746                .acquire()
747                .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
748                .await
749                .expect("Consensus adapter does not close semaphore");
750            let _in_flight_submission_guard =
751                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
752
753            // We enter this branch when in select above await_submit completed and
754            // processed_waiter is pending This means it is time for us to
755            // submit transaction to consensus
756            let submit_inner = async {
757                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
758
759                loop {
760                    // Submit the transaction to consensus and return the submit result with a
761                    // status waiter
762                    let status_waiter = self
763                        .submit_inner(
764                            &transactions,
765                            epoch_store,
766                            &transaction_keys,
767                            tx_type,
768                            is_soft_bundle,
769                        )
770                        .await;
771
772                    match status_waiter.await {
773                        Ok(BlockStatusInternal::Sequenced) => {
774                            self.metrics
775                                .sequencing_certificate_status
776                                .with_label_values(&[tx_type, "sequenced"])
777                                .inc();
778                            // Block has been sequenced. Nothing more to do, we do have guarantees
779                            // that the transaction will appear in consensus output.
780                            trace!(
781                                "Transaction {transaction_keys:?} has been sequenced by consensus."
782                            );
783                            break;
784                        }
785                        Ok(BlockStatusInternal::GarbageCollected) => {
786                            self.metrics
787                                .sequencing_certificate_status
788                                .with_label_values(&[tx_type, "garbage_collected"])
789                                .inc();
790                            // Block has been garbage collected and we have no guarantees that the
791                            // transaction will appear in consensus output. We'll
792                            // resubmit the transaction to consensus. If the transaction has been
793                            // already "processed", then probably someone else has submitted
794                            // the transaction and managed to get sequenced. Then this future will
795                            // have been cancelled anyways so no need to check here on the processed
796                            // output.
797                            debug!(
798                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
799                            );
800                            time::sleep(RETRY_DELAY_STEP).await;
801                            continue;
802                        }
803                        Err(err) => {
804                            warn!(
805                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
806                                err
807                            );
808                            time::sleep(RETRY_DELAY_STEP).await;
809                            continue;
810                        }
811                    }
812                }
813            };
814
815            guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
816                Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
817                Either::Right(((), processed_waiter)) => {
818                    debug!("Submitted {transaction_keys:?} to consensus");
819                    processed_waiter.await
820                }
821            };
822        }
823        debug!("{transaction_keys:?} processed by consensus");
824
825        let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
826        epoch_store
827            .remove_pending_consensus_transactions(&consensus_keys)
828            .expect("Storage error when removing consensus transaction");
829
830        let is_user_tx = is_soft_bundle
831            || matches!(
832                transactions[0].kind,
833                ConsensusTransactionKind::CertifiedTransaction(_)
834            );
835        let send_end_of_publish = if is_user_tx {
836            // If we are in RejectUserCerts state and we just drained the list we need to
837            // send EndOfPublish to signal other validators that we are not submitting more
838            // certificates to the epoch. Note that there could be a race
839            // condition here where we enter this check in RejectAllCerts state.
840            // In that case we don't need to send EndOfPublish because condition to enter
841            // RejectAllCerts is when 2f+1 other validators already sequenced their
842            // EndOfPublish message. Also note that we could sent multiple
843            // EndOfPublish due to that multiple tasks can enter here with
844            // pending_count == 0. This doesn't affect correctness.
845            if epoch_store
846                .get_reconfig_state_read_lock_guard()
847                .is_reject_user_certs()
848            {
849                let pending_count = epoch_store.pending_consensus_certificates_count();
850                debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
851                pending_count == 0 // send end of epoch if empty
852            } else {
853                false
854            }
855        } else {
856            false
857        };
858        if send_end_of_publish {
859            // sending message outside of any locks scope
860            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
861            if let Err(err) = self.submit(
862                ConsensusTransaction::new_end_of_publish(self.authority),
863                None,
864                epoch_store,
865            ) {
866                warn!("Error when sending end of publish message: {:?}", err);
867            }
868        }
869        self.metrics
870            .sequencing_certificate_success
871            .with_label_values(&[tx_type])
872            .inc();
873    }
874
875    async fn submit_inner(
876        self: &Arc<Self>,
877        transactions: &[ConsensusTransaction],
878        epoch_store: &Arc<AuthorityPerEpochStore>,
879        transaction_keys: &[SequencedConsensusTransactionKey],
880        tx_type: &str,
881        is_soft_bundle: bool,
882    ) -> BlockStatusReceiver {
883        let ack_start = Instant::now();
884        let mut retries: u32 = 0;
885
886        let status_waiter = loop {
887            match self
888                .consensus_client
889                .submit(transactions, epoch_store)
890                .await
891            {
892                Err(err) => {
893                    // This can happen during reconfig, or when consensus has full internal buffers
894                    // and needs to back pressure, so retry a few times before logging warnings.
895                    if retries > 30
896                        || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
897                    {
898                        warn!(
899                            "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
900                        );
901                    }
902                    self.metrics
903                        .sequencing_certificate_failures
904                        .with_label_values(&[tx_type])
905                        .inc();
906                    retries += 1;
907
908                    if !is_soft_bundle && transactions[0].kind.is_dkg() {
909                        // Shorter delay for DKG messages, which are time-sensitive and happen at
910                        // start-of-epoch when submit errors due to active reconfig are likely.
911                        time::sleep(Duration::from_millis(100)).await;
912                    } else {
913                        time::sleep(Duration::from_secs(10)).await;
914                    };
915                }
916                Ok(status_waiter) => {
917                    break status_waiter;
918                }
919            }
920        };
921
922        // we want to record the num of retries when reporting latency but to avoid
923        // label cardinality we do some simple bucketing to give us a good
924        // enough idea of how many retries happened associated with the latency.
925        let bucket = match retries {
926            0..=10 => retries.to_string(), // just report the retry count as is
927            11..=20 => "between_10_and_20".to_string(),
928            21..=50 => "between_20_and_50".to_string(),
929            51..=100 => "between_50_and_100".to_string(),
930            _ => "over_100".to_string(),
931        };
932
933        self.metrics
934            .sequencing_acknowledge_latency
935            .with_label_values(&[bucket.as_str(), tx_type])
936            .observe(ack_start.elapsed().as_secs_f64());
937
938        status_waiter
939    }
940
941    /// Waits for transactions to appear either to consensus output or been
942    /// executed via a checkpoint (state sync).
943    /// Returns the processed method, whether the transactions have been
944    /// processed via consensus, or have been synced via checkpoint.
945    async fn await_consensus_or_checkpoint(
946        self: &Arc<Self>,
947        transaction_keys: Vec<SequencedConsensusTransactionKey>,
948        epoch_store: &Arc<AuthorityPerEpochStore>,
949    ) -> ProcessedMethod {
950        let notifications = FuturesUnordered::new();
951        for transaction_key in transaction_keys {
952            let transaction_digests = if let SequencedConsensusTransactionKey::External(
953                ConsensusTransactionKey::Certificate(digest),
954            ) = transaction_key
955            {
956                vec![digest]
957            } else {
958                vec![]
959            };
960
961            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
962                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
963            ) = transaction_key
964            {
965                // If the transaction is a checkpoint signature, we can also wait to get
966                // notified when a checkpoint with equal or higher sequence
967                // number has been already synced. This way we don't try to unnecessarily
968                // sequence the signature for an already verified checkpoint.
969                Either::Left(
970                    self.checkpoint_store
971                        .notify_read_synced_checkpoint(checkpoint_sequence_number),
972                )
973            } else {
974                Either::Right(future::pending())
975            };
976
977            // We wait for each transaction individually to be processed by consensus or
978            // executed in a checkpoint. We could equally just get notified in
979            // aggregate when all transactions are processed, but with this approach can get
980            // notified in a more fine-grained way as transactions can be marked
981            // as processed in different ways. This is mostly a concern for the soft-bundle
982            // transactions.
983            notifications.push(async move {
984                tokio::select! {
985                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
986                        processed.expect("Storage error when waiting for consensus message processed");
987                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
988                        return ProcessedMethod::Consensus;
989                    },
990                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
991                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
992                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
993                    }
994                    _ = checkpoint_synced_future => {
995                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
996                    }
997                }
998                ProcessedMethod::Checkpoint
999            });
1000        }
1001
1002        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1003        for method in processed_methods {
1004            if method == ProcessedMethod::Checkpoint {
1005                return ProcessedMethod::Checkpoint;
1006            }
1007        }
1008        ProcessedMethod::Consensus
1009    }
1010}
1011
1012impl CheckConnection for ConnectionMonitorStatus {
1013    fn check_connection(
1014        &self,
1015        ourself: &AuthorityName,
1016        authority: &AuthorityName,
1017    ) -> Option<ConnectionStatus> {
1018        if ourself == authority {
1019            return Some(ConnectionStatus::Connected);
1020        }
1021
1022        let mapping = self.authority_names_to_peer_ids.load_full();
1023        let peer_id = match mapping.get(authority) {
1024            Some(p) => p,
1025            None => {
1026                warn!(
1027                    "failed to find peer {:?} in connection monitor listener",
1028                    authority
1029                );
1030                return None;
1031            }
1032        };
1033
1034        let res = match self.connection_statuses.try_get(peer_id) {
1035            TryResult::Present(c) => Some(c.value().clone()),
1036            TryResult::Absent => None,
1037            TryResult::Locked => {
1038                // update is in progress, assume the status is still or becoming disconnected
1039                Some(ConnectionStatus::Disconnected)
1040            }
1041        };
1042        res
1043    }
1044    fn update_mapping_for_epoch(
1045        &self,
1046        authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1047    ) {
1048        self.authority_names_to_peer_ids
1049            .swap(Arc::new(authority_names_to_peer_ids));
1050    }
1051}
1052
1053impl CheckConnection for ConnectionMonitorStatusForTests {
1054    fn check_connection(
1055        &self,
1056        _ourself: &AuthorityName,
1057        _authority: &AuthorityName,
1058    ) -> Option<ConnectionStatus> {
1059        Some(ConnectionStatus::Connected)
1060    }
1061    fn update_mapping_for_epoch(
1062        &self,
1063        _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1064    ) {
1065    }
1066}
1067
1068pub fn get_position_in_list(
1069    search_authority: AuthorityName,
1070    positions: Vec<AuthorityName>,
1071) -> usize {
1072    positions
1073        .into_iter()
1074        .find_position(|authority| *authority == search_authority)
1075        .expect("Couldn't find ourselves in shuffled committee")
1076        .0
1077}
1078
1079impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1080    /// This method is called externally to begin reconfiguration
1081    /// It transition reconfig state to reject new certificates from user
1082    /// ConsensusAdapter will send EndOfPublish message once pending certificate
1083    /// queue is drained.
1084    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1085        let send_end_of_publish = {
1086            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1087            if !reconfig_guard.should_accept_user_certs() {
1088                // Allow caller to call this method multiple times
1089                return;
1090            }
1091            let pending_count = epoch_store.pending_consensus_certificates_count();
1092            debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1093            let send_end_of_publish = pending_count == 0;
1094            epoch_store.close_user_certs(reconfig_guard);
1095            send_end_of_publish
1096            // reconfig_guard lock is dropped here.
1097        };
1098        if send_end_of_publish {
1099            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1100            if let Err(err) = self.submit(
1101                ConsensusTransaction::new_end_of_publish(self.authority),
1102                None,
1103                epoch_store,
1104            ) {
1105                warn!("Error when sending end of publish message: {:?}", err);
1106            }
1107        }
1108    }
1109}
1110
1111struct CancelOnDrop<T>(JoinHandle<T>);
1112
1113impl<T> Deref for CancelOnDrop<T> {
1114    type Target = JoinHandle<T>;
1115
1116    fn deref(&self) -> &Self::Target {
1117        &self.0
1118    }
1119}
1120
1121impl<T> Drop for CancelOnDrop<T> {
1122    fn drop(&mut self) {
1123        self.0.abort();
1124    }
1125}
1126
1127/// Tracks number of inflight consensus requests and relevant metrics
1128struct InflightDropGuard<'a> {
1129    adapter: &'a ConsensusAdapter,
1130    start: Instant,
1131    position: Option<usize>,
1132    positions_moved: Option<usize>,
1133    preceding_disconnected: Option<usize>,
1134    tx_type: &'static str,
1135    processed_method: ProcessedMethod,
1136}
1137
1138#[derive(PartialEq, Eq)]
1139enum ProcessedMethod {
1140    Consensus,
1141    Checkpoint,
1142}
1143
1144impl<'a> InflightDropGuard<'a> {
1145    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1146        adapter
1147            .num_inflight_transactions
1148            .fetch_add(1, Ordering::SeqCst);
1149        adapter
1150            .metrics
1151            .sequencing_certificate_inflight
1152            .with_label_values(&[tx_type])
1153            .inc();
1154        adapter
1155            .metrics
1156            .sequencing_certificate_attempt
1157            .with_label_values(&[tx_type])
1158            .inc();
1159        Self {
1160            adapter,
1161            start: Instant::now(),
1162            position: None,
1163            positions_moved: None,
1164            preceding_disconnected: None,
1165            tx_type,
1166            processed_method: ProcessedMethod::Consensus,
1167        }
1168    }
1169}
1170
1171impl Drop for InflightDropGuard<'_> {
1172    fn drop(&mut self) {
1173        self.adapter
1174            .num_inflight_transactions
1175            .fetch_sub(1, Ordering::SeqCst);
1176        self.adapter
1177            .metrics
1178            .sequencing_certificate_inflight
1179            .with_label_values(&[self.tx_type])
1180            .dec();
1181
1182        let position = if let Some(position) = self.position {
1183            self.adapter
1184                .metrics
1185                .sequencing_certificate_authority_position
1186                .observe(position as f64);
1187            position.to_string()
1188        } else {
1189            "not_submitted".to_string()
1190        };
1191
1192        if let Some(positions_moved) = self.positions_moved {
1193            self.adapter
1194                .metrics
1195                .sequencing_certificate_positions_moved
1196                .observe(positions_moved as f64);
1197        };
1198
1199        if let Some(preceding_disconnected) = self.preceding_disconnected {
1200            self.adapter
1201                .metrics
1202                .sequencing_certificate_preceding_disconnected
1203                .observe(preceding_disconnected as f64);
1204        };
1205
1206        let latency = self.start.elapsed();
1207        let processed_method = match self.processed_method {
1208            ProcessedMethod::Consensus => "processed_via_consensus",
1209            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1210        };
1211        self.adapter
1212            .metrics
1213            .sequencing_certificate_latency
1214            .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1215            .observe(latency.as_secs_f64());
1216
1217        // Only sample latency after consensus quorum is up. Otherwise, the wait for
1218        // consensus quorum at the beginning of an epoch can distort the sampled
1219        // latencies. Technically there are more system transaction types that
1220        // can be included in samples after the first consensus commit, but this
1221        // set of types should be enough.
1222        if self.position == Some(0) {
1223            // Transaction types below require quorum existed in the current epoch.
1224            // TODO: refactor tx_type to enum.
1225            let sampled = matches!(
1226                self.tx_type,
1227                "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1228            );
1229            // if tx has been processed by checkpoint state sync, then exclude from the
1230            // latency calculations as this can introduce to misleading results.
1231            if sampled && self.processed_method == ProcessedMethod::Consensus {
1232                self.adapter.latency_observer.report(latency);
1233            }
1234        }
1235    }
1236}
1237
1238impl SubmitToConsensus for Arc<ConsensusAdapter> {
1239    fn submit_to_consensus(
1240        &self,
1241        transactions: &[ConsensusTransaction],
1242        epoch_store: &Arc<AuthorityPerEpochStore>,
1243    ) -> IotaResult {
1244        self.submit_batch(transactions, None, epoch_store)
1245            .map(|_| ())
1246    }
1247}
1248
1249pub fn position_submit_certificate(
1250    committee: &Committee,
1251    ourselves: &AuthorityName,
1252    tx_digest: &TransactionDigest,
1253) -> usize {
1254    let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1255    get_position_in_list(*ourselves, validators)
1256}
1257
1258#[cfg(test)]
1259mod adapter_tests {
1260    use std::{sync::Arc, time::Duration};
1261
1262    use fastcrypto::traits::KeyPair;
1263    use iota_types::{
1264        base_types::TransactionDigest,
1265        committee::Committee,
1266        crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1267    };
1268    use rand::{Rng, SeedableRng, rngs::StdRng};
1269
1270    use super::position_submit_certificate;
1271    use crate::{
1272        checkpoints::CheckpointStore,
1273        consensus_adapter::{
1274            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1275        },
1276        starfish_adapter::LazyStarfishClient,
1277    };
1278
1279    fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1280        let authorities = (0..size)
1281            .map(|_k| {
1282                (
1283                    AuthorityPublicKeyBytes::from(
1284                        get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1285                    ),
1286                    rng.gen_range(0u64..10u64),
1287                )
1288            })
1289            .collect::<Vec<_>>();
1290        Committee::new_for_testing_with_normalized_voting_power(
1291            0,
1292            authorities.iter().cloned().collect(),
1293        )
1294    }
1295
1296    #[tokio::test]
1297    async fn test_await_submit_delay_user_transaction() {
1298        // grab a random committee and a random stake distribution
1299        let mut rng = StdRng::from_seed([0; 32]);
1300        let committee = test_committee(&mut rng, 10);
1301
1302        // When we define max submit position and delay step
1303        let consensus_adapter = ConsensusAdapter::new(
1304            Arc::new(LazyStarfishClient::new()),
1305            CheckpointStore::new_for_tests(),
1306            *committee.authority_by_index(0).unwrap(),
1307            Arc::new(ConnectionMonitorStatusForTests {}),
1308            100_000,
1309            100_000,
1310            Some(1),
1311            Some(Duration::from_secs(2)),
1312            ConsensusAdapterMetrics::new_test(),
1313        );
1314
1315        // transaction to submit
1316        let tx_digest = TransactionDigest::generate(&mut rng);
1317
1318        // Ensure that the original position is higher
1319        let (position, positions_moved, _) =
1320            consensus_adapter.submission_position(&committee, &tx_digest);
1321        assert_eq!(position, 7);
1322        assert!(!positions_moved > 0);
1323
1324        // Make sure that position is set to max value 0
1325        let (delay_step, position, positions_moved, _) =
1326            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1327
1328        assert_eq!(position, 1);
1329        assert_eq!(delay_step, Duration::from_secs(2));
1330        assert!(!positions_moved > 0);
1331
1332        // Without submit position and delay step
1333        let consensus_adapter = ConsensusAdapter::new(
1334            Arc::new(LazyStarfishClient::new()),
1335            CheckpointStore::new_for_tests(),
1336            *committee.authority_by_index(0).unwrap(),
1337            Arc::new(ConnectionMonitorStatusForTests {}),
1338            100_000,
1339            100_000,
1340            None,
1341            None,
1342            ConsensusAdapterMetrics::new_test(),
1343        );
1344
1345        let (delay_step, position, positions_moved, _) =
1346            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1347
1348        assert_eq!(position, 7);
1349
1350        // delay_step * position * 2 = 1 * 7 * 2 = 14
1351        assert_eq!(delay_step, Duration::from_secs(14));
1352        assert!(!positions_moved > 0);
1353    }
1354
1355    #[test]
1356    fn test_position_submit_certificate() {
1357        // grab a random committee and a random stake distribution
1358        let mut rng = StdRng::from_seed([0; 32]);
1359        let committee = test_committee(&mut rng, 10);
1360
1361        // generate random transaction digests, and account for validator selection
1362        const NUM_TEST_TRANSACTIONS: usize = 1000;
1363
1364        for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1365            let tx_digest = TransactionDigest::generate(&mut rng);
1366
1367            let mut zero_found = false;
1368            for (name, _) in committee.members() {
1369                let f = position_submit_certificate(&committee, name, &tx_digest);
1370                assert!(f < committee.num_members());
1371                if f == 0 {
1372                    // One and only one validator gets position 0
1373                    assert!(!zero_found);
1374                    zero_found = true;
1375                }
1376            }
1377            assert!(zero_found);
1378        }
1379    }
1380}