iota_core/
consensus_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    future::Future,
8    ops::Deref,
9    sync::{
10        Arc,
11        atomic::{AtomicU64, Ordering},
12    },
13    time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use consensus_core::BlockStatus;
18use dashmap::{DashMap, try_result::TryResult};
19use futures::{
20    FutureExt, StreamExt,
21    future::{self, Either, select},
22    pin_mut,
23    stream::FuturesUnordered,
24};
25use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, spawn_monitored_task};
26use iota_simulator::anemo::PeerId;
27use iota_types::{
28    base_types::{AuthorityName, TransactionDigest},
29    committee::Committee,
30    error::{IotaError, IotaResult},
31    fp_ensure,
32    messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
33};
34use itertools::Itertools;
35use parking_lot::RwLockReadGuard;
36use prometheus::{
37    Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
38    register_histogram_vec_with_registry, register_histogram_with_registry,
39    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
40    register_int_gauge_with_registry,
41};
42use tokio::{
43    sync::{Semaphore, SemaphorePermit, oneshot},
44    task::JoinHandle,
45    time::{self, Duration},
46};
47use tracing::{debug, info, trace, warn};
48
49use crate::{
50    authority::authority_per_epoch_store::AuthorityPerEpochStore,
51    connection_monitor::ConnectionStatus,
52    consensus_handler::{SequencedConsensusTransactionKey, classify},
53    epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
54    metrics::LatencyObserver,
55};
56
57#[cfg(test)]
58#[path = "unit_tests/consensus_tests.rs"]
59pub mod consensus_tests;
60
61const SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS: &[f64] = &[
62    0.1, 0.25, 0.5, 0.75, 1., 1.25, 1.5, 1.75, 2., 2.25, 2.5, 2.75, 3., 4., 5., 6., 7., 10., 15.,
63    20., 25., 30., 60., 90., 120., 150., 180., 210., 240., 270., 300.,
64];
65
66const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
67    0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
68];
69
70pub struct ConsensusAdapterMetrics {
71    // Certificate sequencing metrics
72    pub sequencing_certificate_attempt: IntCounterVec,
73    pub sequencing_certificate_success: IntCounterVec,
74    pub sequencing_certificate_failures: IntCounterVec,
75    pub sequencing_certificate_status: IntCounterVec,
76    pub sequencing_certificate_inflight: IntGaugeVec,
77    pub sequencing_acknowledge_latency: HistogramVec,
78    pub sequencing_certificate_latency: HistogramVec,
79    pub sequencing_certificate_authority_position: Histogram,
80    pub sequencing_certificate_positions_moved: Histogram,
81    pub sequencing_certificate_preceding_disconnected: Histogram,
82    pub sequencing_certificate_processed: IntCounterVec,
83    pub sequencing_in_flight_semaphore_wait: IntGauge,
84    pub sequencing_in_flight_submissions: IntGauge,
85    pub sequencing_estimated_latency: IntGauge,
86    pub sequencing_resubmission_interval_ms: IntGauge,
87}
88
89impl ConsensusAdapterMetrics {
90    pub fn new(registry: &Registry) -> Self {
91        Self {
92            sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
93                "sequencing_certificate_attempt",
94                "Counts the number of certificates the validator attempts to sequence.",
95                &["tx_type"],
96                registry,
97            )
98                .unwrap(),
99            sequencing_certificate_success: register_int_counter_vec_with_registry!(
100                "sequencing_certificate_success",
101                "Counts the number of successfully sequenced certificates.",
102                &["tx_type"],
103                registry,
104            )
105                .unwrap(),
106            sequencing_certificate_failures: register_int_counter_vec_with_registry!(
107                "sequencing_certificate_failures",
108                "Counts the number of sequenced certificates that failed other than by timeout.",
109                &["tx_type"],
110                registry,
111            )
112                .unwrap(),
113            sequencing_certificate_status: register_int_counter_vec_with_registry!(
114                "sequencing_certificate_status",
115                "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
116                &["tx_type", "status"],
117                registry,
118            )
119                .unwrap(),
120            sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
121                "sequencing_certificate_inflight",
122                "The inflight requests to sequence certificates.",
123                &["tx_type"],
124                registry,
125            )
126                .unwrap(),
127            sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
128                "sequencing_acknowledge_latency",
129                "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
130                &["retry", "tx_type"],
131                SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS.to_vec(),
132                registry,
133            )
134                .unwrap(),
135            sequencing_certificate_latency: register_histogram_vec_with_registry!(
136                "sequencing_certificate_latency",
137                "The latency for sequencing a certificate.",
138                &["position", "tx_type", "processed_method"],
139                SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS.to_vec(),
140                registry,
141            ).unwrap(),
142            sequencing_certificate_authority_position: register_histogram_with_registry!(
143                "sequencing_certificate_authority_position",
144                "The position of the authority when submitted a certificate to consensus.",
145                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
146                registry,
147            ).unwrap(),
148            sequencing_certificate_positions_moved: register_histogram_with_registry!(
149                "sequencing_certificate_positions_moved",
150                "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
151                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
152                registry,
153            ).unwrap(),
154            sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155                "sequencing_certificate_preceding_disconnected",
156                "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157                SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158                registry,
159            ).unwrap(),
160            sequencing_certificate_processed: register_int_counter_vec_with_registry!(
161                "sequencing_certificate_processed",
162                "The number of certificates that have been processed either by consensus or checkpoint.",
163                &["source"],
164                registry
165            ).unwrap(),
166            sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
167                "sequencing_in_flight_semaphore_wait",
168                "How many requests are blocked on submit_permit.",
169                registry,
170            )
171                .unwrap(),
172            sequencing_in_flight_submissions: register_int_gauge_with_registry!(
173                "sequencing_in_flight_submissions",
174                "Number of transactions submitted to local consensus instance and not yet sequenced",
175                registry,
176            )
177                .unwrap(),
178            sequencing_estimated_latency: register_int_gauge_with_registry!(
179                "sequencing_estimated_latency",
180                "Consensus latency estimated by consensus adapter in milliseconds",
181                registry,
182            )
183                .unwrap(),
184            sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
185                "sequencing_resubmission_interval_ms",
186                "Resubmission interval used by consensus adapter in milliseconds",
187                registry,
188            )
189                .unwrap(),
190        }
191    }
192
193    pub fn new_test() -> Self {
194        Self::new(&Registry::default())
195    }
196
197    pub fn unregister(&self, registry: &Registry) {
198        registry
199            .unregister(Box::new(self.sequencing_certificate_attempt.clone()))
200            .expect("sequencing_certificate_attempt is in registry");
201        registry
202            .unregister(Box::new(self.sequencing_certificate_success.clone()))
203            .expect("sequencing_certificate_success is in registry");
204        registry
205            .unregister(Box::new(self.sequencing_certificate_failures.clone()))
206            .expect("sequencing_certificate_failures is in registry");
207        registry
208            .unregister(Box::new(self.sequencing_certificate_status.clone()))
209            .expect("sequencing_certificate_status is in registry");
210        registry
211            .unregister(Box::new(self.sequencing_certificate_inflight.clone()))
212            .expect("sequencing_certificate_inflight is in registry");
213        registry
214            .unregister(Box::new(self.sequencing_acknowledge_latency.clone()))
215            .expect("sequencing_acknowledge_latency is in registry");
216        registry
217            .unregister(Box::new(self.sequencing_certificate_latency.clone()))
218            .expect("sequencing_certificate_latency is in registry");
219        registry
220            .unregister(Box::new(
221                self.sequencing_certificate_authority_position.clone(),
222            ))
223            .expect("sequencing_certificate_authority_position is in registry");
224        registry
225            .unregister(Box::new(
226                self.sequencing_certificate_positions_moved.clone(),
227            ))
228            .expect("sequencing_certificate_positions_moved is in registry");
229        registry
230            .unregister(Box::new(
231                self.sequencing_certificate_preceding_disconnected.clone(),
232            ))
233            .expect("sequencing_certificate_preceding_disconnected is in registry");
234        registry
235            .unregister(Box::new(self.sequencing_certificate_processed.clone()))
236            .expect("sequencing_certificate_processed is in registry");
237        registry
238            .unregister(Box::new(self.sequencing_in_flight_semaphore_wait.clone()))
239            .expect("sequencing_in_flight_semaphore_wait is in registry");
240        registry
241            .unregister(Box::new(self.sequencing_in_flight_submissions.clone()))
242            .expect("sequencing_in_flight_submissions is in registry");
243        registry
244            .unregister(Box::new(self.sequencing_estimated_latency.clone()))
245            .expect("sequencing_estimated_latency is in registry");
246        registry
247            .unregister(Box::new(self.sequencing_resubmission_interval_ms.clone()))
248            .expect("sequencing_resubmission_interval_ms is in registry");
249    }
250}
251
252pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
253
254#[mockall::automock]
255#[async_trait::async_trait]
256pub trait SubmitToConsensus: Sync + Send + 'static {
257    async fn submit_to_consensus(
258        &self,
259        transactions: &[ConsensusTransaction],
260        epoch_store: &Arc<AuthorityPerEpochStore>,
261    ) -> IotaResult;
262}
263
264#[mockall::automock]
265#[async_trait::async_trait]
266pub trait ConsensusClient: Sync + Send + 'static {
267    async fn submit(
268        &self,
269        transactions: &[ConsensusTransaction],
270        epoch_store: &Arc<AuthorityPerEpochStore>,
271    ) -> IotaResult<BlockStatusReceiver>;
272}
273
274/// Submit IOTA certificates to the consensus.
275pub struct ConsensusAdapter {
276    /// The network client connecting to the consensus node of this authority.
277    consensus_client: Arc<dyn ConsensusClient>,
278    /// Authority pubkey.
279    authority: AuthorityName,
280    /// The limit to number of inflight transactions at this node.
281    max_pending_transactions: usize,
282    /// Number of submitted transactions still inflight at this node.
283    num_inflight_transactions: AtomicU64,
284    /// Dictates the maximum position  from which will submit to consensus. Even
285    /// if the is elected to submit from a higher position than this, it
286    /// will "reset" to the max_submit_position.
287    max_submit_position: Option<usize>,
288    /// When provided it will override the current back off logic and will use
289    /// this value instead as delay step.
290    submit_delay_step_override: Option<Duration>,
291    /// A structure to check the connection statuses populated by the Connection
292    /// Monitor Listener
293    connection_monitor_status: Arc<dyn CheckConnection>,
294    /// A structure to check the reputation scores populated by Consensus
295    low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
296    /// A structure to register metrics
297    metrics: ConsensusAdapterMetrics,
298    /// Semaphore limiting parallel submissions to consensus
299    submit_semaphore: Semaphore,
300    latency_observer: LatencyObserver,
301}
302
303pub trait CheckConnection: Send + Sync {
304    fn check_connection(
305        &self,
306        ourself: &AuthorityName,
307        authority: &AuthorityName,
308    ) -> Option<ConnectionStatus>;
309    fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
310}
311
312pub struct ConnectionMonitorStatus {
313    /// Current connection statuses forwarded from the connection monitor
314    pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
315    /// A map from authority name to peer id
316    pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
317}
318
319pub struct ConnectionMonitorStatusForTests {}
320
321impl ConsensusAdapter {
322    /// Make a new Consensus adapter instance.
323    pub fn new(
324        consensus_client: Arc<dyn ConsensusClient>,
325        authority: AuthorityName,
326        connection_monitor_status: Arc<dyn CheckConnection>,
327        max_pending_transactions: usize,
328        max_pending_local_submissions: usize,
329        max_submit_position: Option<usize>,
330        submit_delay_step_override: Option<Duration>,
331        metrics: ConsensusAdapterMetrics,
332    ) -> Self {
333        let num_inflight_transactions = Default::default();
334        let low_scoring_authorities =
335            ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
336        Self {
337            consensus_client,
338            authority,
339            max_pending_transactions,
340            max_submit_position,
341            submit_delay_step_override,
342            num_inflight_transactions,
343            connection_monitor_status,
344            low_scoring_authorities,
345            metrics,
346            submit_semaphore: Semaphore::new(max_pending_local_submissions),
347            latency_observer: LatencyObserver::new(),
348        }
349    }
350
351    pub fn swap_low_scoring_authorities(
352        &self,
353        new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
354    ) {
355        self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
356    }
357
358    // TODO - this probably need to hold some kind of lock to make sure epoch does
359    // not change while we are recovering
360    pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
361        // Currently consensus worker might lose transactions on restart, so we need to
362        // resend them.
363        // TODO: get_all_pending_consensus_transactions is called
364        // twice when initializing AuthorityPerEpochStore and here, should not
365        // be a big deal but can be optimized
366        let mut recovered = epoch_store.get_all_pending_consensus_transactions();
367
368        #[expect(clippy::collapsible_if)] // This if can be collapsed but it will be ugly
369        if epoch_store
370            .get_reconfig_state_read_lock_guard()
371            .is_reject_user_certs()
372            && epoch_store.pending_consensus_certificates_empty()
373        {
374            if recovered
375                .iter()
376                .any(ConsensusTransaction::is_end_of_publish)
377            {
378                // There are two cases when this is needed
379                // (1) We send EndOfPublish message after removing pending certificates in
380                // submit_and_wait_inner It is possible that node will crash
381                // between those two steps, in which case we might need to
382                // re-introduce EndOfPublish message on restart
383                // (2) If node crashed inside ConsensusAdapter::close_epoch,
384                // after reconfig lock state was written to DB and before we persisted
385                // EndOfPublish message
386                recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
387            }
388        }
389        debug!(
390            "Submitting {:?} recovered pending consensus transactions to consensus",
391            recovered.len()
392        );
393        for transaction in recovered {
394            if transaction.is_end_of_publish() {
395                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
396            }
397            self.submit_unchecked(&[transaction], epoch_store);
398        }
399    }
400
401    pub fn unregister_consensus_adapter_metrics(&self, registry: &Registry) {
402        self.metrics.unregister(registry);
403    }
404
405    fn await_submit_delay(
406        &self,
407        committee: &Committee,
408        transactions: &[ConsensusTransaction],
409    ) -> (impl Future<Output = ()>, usize, usize, usize) {
410        // Use the minimum digest to compute submit delay.
411        let min_digest = transactions
412            .iter()
413            .filter_map(|tx| match &tx.kind {
414                ConsensusTransactionKind::UserTransaction(certificate) => {
415                    Some(certificate.digest())
416                }
417                _ => None,
418            })
419            .min();
420
421        let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
422            Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
423            _ => (Duration::ZERO, 0, 0, 0),
424        };
425        (
426            tokio::time::sleep(duration),
427            position,
428            positions_moved,
429            preceding_disconnected,
430        )
431    }
432
433    fn await_submit_delay_user_transaction(
434        &self,
435        committee: &Committee,
436        tx_digest: &TransactionDigest,
437    ) -> (Duration, usize, usize, usize) {
438        let (position, positions_moved, preceding_disconnected) =
439            self.submission_position(committee, tx_digest);
440
441        const DEFAULT_LATENCY: Duration = Duration::from_secs(1); // > p50 consensus latency with global deployment
442        const MIN_LATENCY: Duration = Duration::from_millis(150);
443        const MAX_LATENCY: Duration = Duration::from_secs(3);
444
445        let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
446        self.metrics
447            .sequencing_estimated_latency
448            .set(latency.as_millis() as i64);
449
450        let latency = std::cmp::max(latency, MIN_LATENCY);
451        let latency = std::cmp::min(latency, MAX_LATENCY);
452        let latency = latency * 2;
453        let (delay_step, position) =
454            self.override_by_max_submit_position_settings(latency, position);
455
456        self.metrics
457            .sequencing_resubmission_interval_ms
458            .set(delay_step.as_millis() as i64);
459
460        (
461            delay_step * position as u32,
462            position,
463            positions_moved,
464            preceding_disconnected,
465        )
466    }
467
468    /// Overrides the latency and the position if there are defined settings for
469    /// `max_submit_position` and `submit_delay_step_override`. If the
470    /// `max_submit_position` has defined, then that will always be used
471    /// irrespective of any so far decision. Same for the
472    /// `submit_delay_step_override`.
473    fn override_by_max_submit_position_settings(
474        &self,
475        latency: Duration,
476        mut position: usize,
477    ) -> (Duration, usize) {
478        // Respect any manual override for position and latency from the settings
479        if let Some(max_submit_position) = self.max_submit_position {
480            position = std::cmp::min(position, max_submit_position);
481        }
482
483        let delay_step = self.submit_delay_step_override.unwrap_or(latency);
484        (delay_step, position)
485    }
486
487    /// Check when this authority should submit the certificate to consensus.
488    /// This sorts all authorities based on pseudo-random distribution derived
489    /// from transaction hash.
490    ///
491    /// The function targets having 1 consensus transaction submitted per user
492    /// transaction when system operates normally.
493    ///
494    /// The function returns the position of this authority when it is their
495    /// turn to submit the transaction to consensus.
496    fn submission_position(
497        &self,
498        committee: &Committee,
499        tx_digest: &TransactionDigest,
500    ) -> (usize, usize, usize) {
501        let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
502
503        self.check_submission_wrt_connectivity_and_scores(positions)
504    }
505
506    /// This function runs the following algorithm to decide whether or not to
507    /// submit a transaction to consensus.
508    ///
509    /// It takes in a deterministic list that represents positions of all the
510    /// authorities. The authority in the first position will be responsible
511    /// for submitting to consensus, and so we check if we are this
512    /// validator, and if so, return true.
513    ///
514    /// If we are not in that position, we check our connectivity to the
515    /// authority in that position. If we are connected to them, we can
516    /// assume that they are operational and will submit the transaction. If
517    /// we are not connected to them, we assume that they are not operational
518    /// and we will not rely on that authority to submit the transaction. So
519    /// we shift them out of the first position, and run this algorithm
520    /// again on the new set of positions.
521    ///
522    /// This can possibly result in a transaction being submitted twice if an
523    /// authority sees a false negative in connectivity to another, such as
524    /// in the case of a network partition.
525    ///
526    /// Recursively, if the authority further ahead of us in the positions is a
527    /// low performing authority, we will move our positions up one, and
528    /// submit the transaction. This allows maintaining performance overall.
529    /// We will only do this part for authorities that are not low performers
530    /// themselves to prevent extra amplification in the case that the
531    /// positions look like [low_scoring_a1, low_scoring_a2, a3]
532    fn check_submission_wrt_connectivity_and_scores(
533        &self,
534        positions: Vec<AuthorityName>,
535    ) -> (usize, usize, usize) {
536        let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
537        if low_scoring_authorities.get(&self.authority).is_some() {
538            return (positions.len(), 0, 0);
539        }
540        let initial_position = get_position_in_list(self.authority, positions.clone());
541        let mut preceding_disconnected = 0;
542        let mut before_our_position = true;
543
544        let filtered_positions: Vec<_> = positions
545            .into_iter()
546            .filter(|authority| {
547                let keep = self.authority == *authority; // don't filter ourself out
548                if keep {
549                    before_our_position = false;
550                }
551
552                // filter out any nodes that appear disconnected
553                let connected = self
554                    .connection_monitor_status
555                    .check_connection(&self.authority, authority)
556                    .unwrap_or(ConnectionStatus::Disconnected)
557                    == ConnectionStatus::Connected;
558                if !connected && before_our_position {
559                    preceding_disconnected += 1; // used for metrics
560                }
561
562                // Filter out low scoring nodes
563                let high_scoring = low_scoring_authorities.get(authority).is_none();
564
565                keep || (connected && high_scoring)
566            })
567            .collect();
568
569        let position = get_position_in_list(self.authority, filtered_positions);
570
571        (
572            position,
573            initial_position - position,
574            preceding_disconnected,
575        )
576    }
577
578    /// This method blocks until transaction is persisted in local database
579    /// It then returns handle to async task, user can join this handle to await
580    /// while transaction is processed by consensus
581    ///
582    /// This method guarantees that once submit(but not returned async handle)
583    /// returns, transaction is persisted and will eventually be sent to
584    /// consensus even after restart
585    ///
586    /// When submitting a certificate caller **must** provide a ReconfigState
587    /// lock guard
588    pub fn submit(
589        self: &Arc<Self>,
590        transaction: ConsensusTransaction,
591        lock: Option<&RwLockReadGuard<ReconfigState>>,
592        epoch_store: &Arc<AuthorityPerEpochStore>,
593    ) -> IotaResult<JoinHandle<()>> {
594        self.submit_batch(&[transaction], lock, epoch_store)
595    }
596
597    pub fn submit_batch(
598        self: &Arc<Self>,
599        transactions: &[ConsensusTransaction],
600        lock: Option<&RwLockReadGuard<ReconfigState>>,
601        epoch_store: &Arc<AuthorityPerEpochStore>,
602    ) -> IotaResult<JoinHandle<()>> {
603        if transactions.len() > 1 {
604            // In soft bundle, we need to check if all transactions are of UserTransaction
605            // kind. The check is required because we assume this in
606            // submit_and_wait_inner.
607            for transaction in transactions {
608                fp_ensure!(
609                    matches!(
610                        transaction.kind,
611                        ConsensusTransactionKind::UserTransaction(_)
612                    ),
613                    IotaError::InvalidTxKindInSoftBundle
614                );
615            }
616        }
617
618        epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
619        Ok(self.submit_unchecked(transactions, epoch_store))
620    }
621
622    /// Performs weakly consistent checks on internal buffers to quickly
623    /// discard transactions if we are overloaded
624    pub fn check_limits(&self) -> bool {
625        // First check total transactions (waiting and in submission)
626        if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
627            > self.max_pending_transactions
628        {
629            return false;
630        }
631        // Then check if submit_semaphore has permits
632        self.submit_semaphore.available_permits() > 0
633    }
634
635    pub(crate) fn check_consensus_overload(&self) -> IotaResult {
636        fp_ensure!(
637            self.check_limits(),
638            IotaError::TooManyTransactionsPendingConsensus
639        );
640        Ok(())
641    }
642
643    fn submit_unchecked(
644        self: &Arc<Self>,
645        transactions: &[ConsensusTransaction],
646        epoch_store: &Arc<AuthorityPerEpochStore>,
647    ) -> JoinHandle<()> {
648        // Reconfiguration lock is dropped when pending_consensus_transactions is
649        // persisted, before it is handled by consensus
650        let async_stage = self
651            .clone()
652            .submit_and_wait(transactions.to_vec(), epoch_store.clone());
653        // Number of these tasks is weakly limited based on `num_inflight_transactions`.
654        // (Limit is not applied atomically, and only to user transactions.)
655        let join_handle = spawn_monitored_task!(async_stage);
656        join_handle
657    }
658
659    async fn submit_and_wait(
660        self: Arc<Self>,
661        transactions: Vec<ConsensusTransaction>,
662        epoch_store: Arc<AuthorityPerEpochStore>,
663    ) {
664        // When epoch_terminated signal is received all pending submit_and_wait_inner
665        // are dropped.
666        //
667        // This is needed because submit_and_wait_inner waits on read_notify for
668        // consensus message to be processed, which may never happen on epoch
669        // boundary.
670        //
671        // In addition to that, within_alive_epoch ensures that all pending consensus
672        // adapter tasks are stopped before reconfiguration can proceed.
673        //
674        // This is essential because consensus workers reuse same ports when consensus
675        // restarts, this means we might be sending transactions from previous
676        // epochs to consensus of new epoch if we have not had this barrier.
677        epoch_store
678            .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
679            .await
680            .ok(); // result here indicates if epoch ended earlier, we don't
681        // care about it
682    }
683
684    async fn submit_and_wait_inner(
685        self: Arc<Self>,
686        transactions: Vec<ConsensusTransaction>,
687        epoch_store: &Arc<AuthorityPerEpochStore>,
688    ) {
689        if transactions.is_empty() {
690            return;
691        }
692
693        // Current code path ensures:
694        // - If transactions.len() > 1, it is a soft bundle. Otherwise transactions
695        //   should have been submitted individually.
696        // - If is_soft_bundle, then all transactions are of UserTransaction kind.
697        // - If not is_soft_bundle, then transactions must contain exactly 1 tx, and
698        //   transactions[0] can be of any kind.
699        let is_soft_bundle = transactions.len() > 1;
700
701        let mut transaction_keys = Vec::new();
702
703        for transaction in &transactions {
704            if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
705                info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
706                epoch_store.record_epoch_pending_certs_process_time_metric();
707            }
708
709            let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
710            transaction_keys.push(transaction_key);
711        }
712        let tx_type = if !is_soft_bundle {
713            classify(&transactions[0])
714        } else {
715            "soft_bundle"
716        };
717
718        let mut guard = InflightDropGuard::acquire(&self, tx_type);
719
720        // Create the waiter until the node's turn comes to submit to consensus
721        let (await_submit, position, positions_moved, preceding_disconnected) =
722            self.await_submit_delay(epoch_store.committee(), &transactions[..]);
723
724        // Create the waiter until the transaction is processed by consensus or via
725        // checkpoint
726        let processed_via_consensus_or_checkpoint =
727            self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
728        pin_mut!(processed_via_consensus_or_checkpoint);
729
730        let processed_waiter = tokio::select! {
731            // We need to wait for some delay until we submit transaction to the consensus
732            _ = await_submit => Some(processed_via_consensus_or_checkpoint),
733
734            // If epoch ends, don't wait for submit delay
735            _ = epoch_store.user_certs_closed_notify() => {
736                warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
737                Some(processed_via_consensus_or_checkpoint)
738            }
739
740            // If transaction is received by consensus or checkpoint while we wait, we are done.
741            _ = &mut processed_via_consensus_or_checkpoint => {
742                None
743            }
744        };
745
746        // Log warnings for administrative transactions that fail to get sequenced
747        let _monitor = if !is_soft_bundle
748            && matches!(
749                transactions[0].kind,
750                ConsensusTransactionKind::EndOfPublish(_)
751                    | ConsensusTransactionKind::CapabilityNotificationV1(_)
752                    | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
753                    | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
754            ) {
755            let transaction_keys = transaction_keys.clone();
756            Some(CancelOnDrop(spawn_monitored_task!(async {
757                let mut i = 0u64;
758                loop {
759                    i += 1;
760                    const WARN_DELAY_S: u64 = 30;
761                    tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
762                    let total_wait = i * WARN_DELAY_S;
763                    warn!(
764                        "Still waiting {} seconds for transactions {:?} to commit in consensus",
765                        total_wait, transaction_keys
766                    );
767                }
768            })))
769        } else {
770            None
771        };
772
773        if let Some(processed_waiter) = processed_waiter {
774            debug!("Submitting {:?} to consensus", transaction_keys);
775
776            // populate the position only when this authority submits the transaction
777            // to consensus
778            guard.position = Some(position);
779            guard.positions_moved = Some(positions_moved);
780            guard.preceding_disconnected = Some(preceding_disconnected);
781
782            let _permit: SemaphorePermit = self
783                .submit_semaphore
784                .acquire()
785                .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
786                .await
787                .expect("Consensus adapter does not close semaphore");
788            let _in_flight_submission_guard =
789                GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
790
791            // We enter this branch when in select above await_submit completed and
792            // processed_waiter is pending This means it is time for us to
793            // submit transaction to consensus
794            let submit_inner = async {
795                const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
796
797                loop {
798                    // Submit the transaction to consensus and return the submit result with a
799                    // status waiter
800                    let status_waiter = self
801                        .submit_inner(
802                            &transactions,
803                            epoch_store,
804                            &transaction_keys,
805                            tx_type,
806                            is_soft_bundle,
807                        )
808                        .await;
809
810                    match status_waiter.await {
811                        Ok(BlockStatus::Sequenced(_)) => {
812                            self.metrics
813                                .sequencing_certificate_status
814                                .with_label_values(&[tx_type, "sequenced"])
815                                .inc();
816                            // Block has been sequenced. Nothing more to do, we do have guarantees
817                            // that the transaction will appear in consensus output.
818                            trace!(
819                                "Transaction {transaction_keys:?} has been sequenced by consensus."
820                            );
821                            break;
822                        }
823                        Ok(BlockStatus::GarbageCollected(_)) => {
824                            self.metrics
825                                .sequencing_certificate_status
826                                .with_label_values(&[tx_type, "garbage_collected"])
827                                .inc();
828                            // Block has been garbage collected and we have no guarantees that the
829                            // transaction will appear in consensus output. We'll
830                            // resubmit the transaction to consensus. If the transaction has been
831                            // already "processed", then probably someone else has submitted
832                            // the transaction and managed to get sequenced. Then this future will
833                            // have been cancelled anyways so no need to check here on the processed
834                            // output.
835                            debug!(
836                                "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
837                            );
838                            time::sleep(RETRY_DELAY_STEP).await;
839                            continue;
840                        }
841                        Err(err) => {
842                            warn!(
843                                "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
844                                err
845                            );
846                            time::sleep(RETRY_DELAY_STEP).await;
847                            continue;
848                        }
849                    }
850                }
851            };
852
853            guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
854                Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
855                Either::Right(((), processed_waiter)) => {
856                    debug!("Submitted {transaction_keys:?} to consensus");
857                    processed_waiter.await
858                }
859            };
860        }
861        debug!("{transaction_keys:?} processed by consensus");
862
863        let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
864        epoch_store
865            .remove_pending_consensus_transactions(&consensus_keys)
866            .expect("Storage error when removing consensus transaction");
867
868        let is_user_tx = is_soft_bundle
869            || matches!(
870                transactions[0].kind,
871                ConsensusTransactionKind::UserTransaction(_)
872            );
873        let send_end_of_publish = if is_user_tx {
874            // If we are in RejectUserCerts state and we just drained the list we need to
875            // send EndOfPublish to signal other validators that we are not submitting more
876            // certificates to the epoch. Note that there could be a race
877            // condition here where we enter this check in RejectAllCerts state.
878            // In that case we don't need to send EndOfPublish because condition to enter
879            // RejectAllCerts is when 2f+1 other validators already sequenced their
880            // EndOfPublish message. Also note that we could sent multiple
881            // EndOfPublish due to that multiple tasks can enter here with
882            // pending_count == 0. This doesn't affect correctness.
883            if epoch_store
884                .get_reconfig_state_read_lock_guard()
885                .is_reject_user_certs()
886            {
887                let pending_count = epoch_store.pending_consensus_certificates_count();
888                debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
889                pending_count == 0 // send end of epoch if empty
890            } else {
891                false
892            }
893        } else {
894            false
895        };
896        if send_end_of_publish {
897            // sending message outside of any locks scope
898            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
899            if let Err(err) = self.submit(
900                ConsensusTransaction::new_end_of_publish(self.authority),
901                None,
902                epoch_store,
903            ) {
904                warn!("Error when sending end of publish message: {:?}", err);
905            }
906        }
907        self.metrics
908            .sequencing_certificate_success
909            .with_label_values(&[tx_type])
910            .inc();
911    }
912
913    async fn submit_inner(
914        self: &Arc<Self>,
915        transactions: &[ConsensusTransaction],
916        epoch_store: &Arc<AuthorityPerEpochStore>,
917        transaction_keys: &[SequencedConsensusTransactionKey],
918        tx_type: &str,
919        is_soft_bundle: bool,
920    ) -> BlockStatusReceiver {
921        let ack_start = Instant::now();
922        let mut retries: u32 = 0;
923
924        let status_waiter = loop {
925            match self
926                .consensus_client
927                .submit(transactions, epoch_store)
928                .await
929            {
930                Err(err) => {
931                    // This can happen during reconfig, or when consensus has full internal buffers
932                    // and needs to back pressure, so retry a few times before logging warnings.
933                    if retries > 30
934                        || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
935                    {
936                        warn!(
937                            "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
938                        );
939                    }
940                    self.metrics
941                        .sequencing_certificate_failures
942                        .with_label_values(&[tx_type])
943                        .inc();
944                    retries += 1;
945
946                    if !is_soft_bundle && transactions[0].kind.is_dkg() {
947                        // Shorter delay for DKG messages, which are time-sensitive and happen at
948                        // start-of-epoch when submit errors due to active reconfig are likely.
949                        time::sleep(Duration::from_millis(100)).await;
950                    } else {
951                        time::sleep(Duration::from_secs(10)).await;
952                    };
953                }
954                Ok(status_waiter) => {
955                    break status_waiter;
956                }
957            }
958        };
959
960        // we want to record the num of retries when reporting latency but to avoid
961        // label cardinality we do some simple bucketing to give us a good
962        // enough idea of how many retries happened associated with the latency.
963        let bucket = match retries {
964            0..=10 => retries.to_string(), // just report the retry count as is
965            11..=20 => "between_10_and_20".to_string(),
966            21..=50 => "between_20_and_50".to_string(),
967            51..=100 => "between_50_and_100".to_string(),
968            _ => "over_100".to_string(),
969        };
970
971        self.metrics
972            .sequencing_acknowledge_latency
973            .with_label_values(&[bucket.as_str(), tx_type])
974            .observe(ack_start.elapsed().as_secs_f64());
975
976        status_waiter
977    }
978
979    /// Waits for transactions to appear either to consensus output or been
980    /// executed via a checkpoint (state sync).
981    /// Returns the processed method, whether the transactions have been
982    /// processed via consensus, or have been synced via checkpoint.
983    async fn await_consensus_or_checkpoint(
984        self: &Arc<Self>,
985        transaction_keys: Vec<SequencedConsensusTransactionKey>,
986        epoch_store: &Arc<AuthorityPerEpochStore>,
987    ) -> ProcessedMethod {
988        let notifications = FuturesUnordered::new();
989        for transaction_key in transaction_keys {
990            let transaction_digests = if let SequencedConsensusTransactionKey::External(
991                ConsensusTransactionKey::Certificate(digest),
992            ) = transaction_key
993            {
994                vec![digest]
995            } else {
996                vec![]
997            };
998
999            let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
1000                ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
1001            ) = transaction_key
1002            {
1003                // If the transaction is a checkpoint signature, we can also wait to get
1004                // notified when a checkpoint with equal or higher sequence
1005                // number has been already synced. This way we don't try to unnecessarily
1006                // sequence the signature for an already verified checkpoint.
1007                Either::Left(epoch_store.synced_checkpoint_notify(checkpoint_sequence_number))
1008            } else {
1009                Either::Right(future::pending())
1010            };
1011
1012            // We wait for each transaction individually to be processed by consensus or
1013            // executed in a checkpoint. We could equally just get notified in
1014            // aggregate when all transactions are processed, but with this approach can get
1015            // notified in a more fine-grained way as transactions can be marked
1016            // as processed in different ways. This is mostly a concern for the soft-bundle
1017            // transactions.
1018            notifications.push(async move {
1019                tokio::select! {
1020                    processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
1021                        processed.expect("Storage error when waiting for consensus message processed");
1022                        self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
1023                        return ProcessedMethod::Consensus;
1024                    },
1025                    processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1026                        processed.expect("Storage error when waiting for transaction executed in checkpoint");
1027                        self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1028                    }
1029                    processed = checkpoint_synced_future => {
1030                        processed.expect("Error when waiting for checkpoint sequence number");
1031                        self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1032                    }
1033                }
1034                ProcessedMethod::Checkpoint
1035            });
1036        }
1037
1038        let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1039        for method in processed_methods {
1040            if method == ProcessedMethod::Checkpoint {
1041                return ProcessedMethod::Checkpoint;
1042            }
1043        }
1044        ProcessedMethod::Consensus
1045    }
1046}
1047
1048impl CheckConnection for ConnectionMonitorStatus {
1049    fn check_connection(
1050        &self,
1051        ourself: &AuthorityName,
1052        authority: &AuthorityName,
1053    ) -> Option<ConnectionStatus> {
1054        if ourself == authority {
1055            return Some(ConnectionStatus::Connected);
1056        }
1057
1058        let mapping = self.authority_names_to_peer_ids.load_full();
1059        let peer_id = match mapping.get(authority) {
1060            Some(p) => p,
1061            None => {
1062                warn!(
1063                    "failed to find peer {:?} in connection monitor listener",
1064                    authority
1065                );
1066                return None;
1067            }
1068        };
1069
1070        let res = match self.connection_statuses.try_get(peer_id) {
1071            TryResult::Present(c) => Some(c.value().clone()),
1072            TryResult::Absent => None,
1073            TryResult::Locked => {
1074                // update is in progress, assume the status is still or becoming disconnected
1075                Some(ConnectionStatus::Disconnected)
1076            }
1077        };
1078        res
1079    }
1080    fn update_mapping_for_epoch(
1081        &self,
1082        authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1083    ) {
1084        self.authority_names_to_peer_ids
1085            .swap(Arc::new(authority_names_to_peer_ids));
1086    }
1087}
1088
1089impl CheckConnection for ConnectionMonitorStatusForTests {
1090    fn check_connection(
1091        &self,
1092        _ourself: &AuthorityName,
1093        _authority: &AuthorityName,
1094    ) -> Option<ConnectionStatus> {
1095        Some(ConnectionStatus::Connected)
1096    }
1097    fn update_mapping_for_epoch(
1098        &self,
1099        _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1100    ) {
1101    }
1102}
1103
1104pub fn get_position_in_list(
1105    search_authority: AuthorityName,
1106    positions: Vec<AuthorityName>,
1107) -> usize {
1108    positions
1109        .into_iter()
1110        .find_position(|authority| *authority == search_authority)
1111        .expect("Couldn't find ourselves in shuffled committee")
1112        .0
1113}
1114
1115impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1116    /// This method is called externally to begin reconfiguration
1117    /// It transition reconfig state to reject new certificates from user
1118    /// ConsensusAdapter will send EndOfPublish message once pending certificate
1119    /// queue is drained.
1120    fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1121        let send_end_of_publish = {
1122            let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1123            if !reconfig_guard.should_accept_user_certs() {
1124                // Allow caller to call this method multiple times
1125                return;
1126            }
1127            let pending_count = epoch_store.pending_consensus_certificates_count();
1128            debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1129            let send_end_of_publish = pending_count == 0;
1130            epoch_store.close_user_certs(reconfig_guard);
1131            send_end_of_publish
1132            // reconfig_guard lock is dropped here.
1133        };
1134        if send_end_of_publish {
1135            info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1136            if let Err(err) = self.submit(
1137                ConsensusTransaction::new_end_of_publish(self.authority),
1138                None,
1139                epoch_store,
1140            ) {
1141                warn!("Error when sending end of publish message: {:?}", err);
1142            }
1143        }
1144    }
1145}
1146
1147struct CancelOnDrop<T>(JoinHandle<T>);
1148
1149impl<T> Deref for CancelOnDrop<T> {
1150    type Target = JoinHandle<T>;
1151
1152    fn deref(&self) -> &Self::Target {
1153        &self.0
1154    }
1155}
1156
1157impl<T> Drop for CancelOnDrop<T> {
1158    fn drop(&mut self) {
1159        self.0.abort();
1160    }
1161}
1162
1163/// Tracks number of inflight consensus requests and relevant metrics
1164struct InflightDropGuard<'a> {
1165    adapter: &'a ConsensusAdapter,
1166    start: Instant,
1167    position: Option<usize>,
1168    positions_moved: Option<usize>,
1169    preceding_disconnected: Option<usize>,
1170    tx_type: &'static str,
1171    processed_method: ProcessedMethod,
1172}
1173
1174#[derive(PartialEq, Eq)]
1175enum ProcessedMethod {
1176    Consensus,
1177    Checkpoint,
1178}
1179
1180impl<'a> InflightDropGuard<'a> {
1181    pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1182        adapter
1183            .num_inflight_transactions
1184            .fetch_add(1, Ordering::SeqCst);
1185        adapter
1186            .metrics
1187            .sequencing_certificate_inflight
1188            .with_label_values(&[tx_type])
1189            .inc();
1190        adapter
1191            .metrics
1192            .sequencing_certificate_attempt
1193            .with_label_values(&[tx_type])
1194            .inc();
1195        Self {
1196            adapter,
1197            start: Instant::now(),
1198            position: None,
1199            positions_moved: None,
1200            preceding_disconnected: None,
1201            tx_type,
1202            processed_method: ProcessedMethod::Consensus,
1203        }
1204    }
1205}
1206
1207impl Drop for InflightDropGuard<'_> {
1208    fn drop(&mut self) {
1209        self.adapter
1210            .num_inflight_transactions
1211            .fetch_sub(1, Ordering::SeqCst);
1212        self.adapter
1213            .metrics
1214            .sequencing_certificate_inflight
1215            .with_label_values(&[self.tx_type])
1216            .dec();
1217
1218        let position = if let Some(position) = self.position {
1219            self.adapter
1220                .metrics
1221                .sequencing_certificate_authority_position
1222                .observe(position as f64);
1223            position.to_string()
1224        } else {
1225            "not_submitted".to_string()
1226        };
1227
1228        if let Some(positions_moved) = self.positions_moved {
1229            self.adapter
1230                .metrics
1231                .sequencing_certificate_positions_moved
1232                .observe(positions_moved as f64);
1233        };
1234
1235        if let Some(preceding_disconnected) = self.preceding_disconnected {
1236            self.adapter
1237                .metrics
1238                .sequencing_certificate_preceding_disconnected
1239                .observe(preceding_disconnected as f64);
1240        };
1241
1242        let latency = self.start.elapsed();
1243        let processed_method = match self.processed_method {
1244            ProcessedMethod::Consensus => "processed_via_consensus",
1245            ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1246        };
1247        self.adapter
1248            .metrics
1249            .sequencing_certificate_latency
1250            .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1251            .observe(latency.as_secs_f64());
1252
1253        // Only sample latency after consensus quorum is up. Otherwise, the wait for
1254        // consensus quorum at the beginning of an epoch can distort the sampled
1255        // latencies. Technically there are more system transaction types that
1256        // can be included in samples after the first consensus commit, but this
1257        // set of types should be enough.
1258        if self.position == Some(0) {
1259            // Transaction types below require quorum existed in the current epoch.
1260            // TODO: refactor tx_type to enum.
1261            let sampled = matches!(
1262                self.tx_type,
1263                "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1264            );
1265            // if tx has been processed by checkpoint state sync, then exclude from the
1266            // latency calculations as this can introduce to misleading results.
1267            if sampled && self.processed_method == ProcessedMethod::Consensus {
1268                self.adapter.latency_observer.report(latency);
1269            }
1270        }
1271    }
1272}
1273
1274#[async_trait::async_trait]
1275impl SubmitToConsensus for Arc<ConsensusAdapter> {
1276    async fn submit_to_consensus(
1277        &self,
1278        transactions: &[ConsensusTransaction],
1279        epoch_store: &Arc<AuthorityPerEpochStore>,
1280    ) -> IotaResult {
1281        self.submit_batch(transactions, None, epoch_store)
1282            .map(|_| ())
1283    }
1284}
1285
1286pub fn position_submit_certificate(
1287    committee: &Committee,
1288    ourselves: &AuthorityName,
1289    tx_digest: &TransactionDigest,
1290) -> usize {
1291    let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1292    get_position_in_list(*ourselves, validators)
1293}
1294
1295#[cfg(test)]
1296mod adapter_tests {
1297    use std::{sync::Arc, time::Duration};
1298
1299    use fastcrypto::traits::KeyPair;
1300    use iota_types::{
1301        base_types::TransactionDigest,
1302        committee::Committee,
1303        crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1304    };
1305    use prometheus::Registry;
1306    use rand::{Rng, SeedableRng, rngs::StdRng};
1307
1308    use super::position_submit_certificate;
1309    use crate::{
1310        consensus_adapter::{
1311            ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1312        },
1313        mysticeti_adapter::LazyMysticetiClient,
1314    };
1315
1316    fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1317        let authorities = (0..size)
1318            .map(|_k| {
1319                (
1320                    AuthorityPublicKeyBytes::from(
1321                        get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1322                    ),
1323                    rng.gen_range(0u64..10u64),
1324                )
1325            })
1326            .collect::<Vec<_>>();
1327        Committee::new_for_testing_with_normalized_voting_power(
1328            0,
1329            authorities.iter().cloned().collect(),
1330        )
1331    }
1332
1333    #[tokio::test]
1334    async fn test_await_submit_delay_user_transaction() {
1335        // grab a random committee and a random stake distribution
1336        let mut rng = StdRng::from_seed([0; 32]);
1337        let committee = test_committee(&mut rng, 10);
1338
1339        // When we define max submit position and delay step
1340        let consensus_adapter = ConsensusAdapter::new(
1341            Arc::new(LazyMysticetiClient::new()),
1342            *committee.authority_by_index(0).unwrap(),
1343            Arc::new(ConnectionMonitorStatusForTests {}),
1344            100_000,
1345            100_000,
1346            Some(1),
1347            Some(Duration::from_secs(2)),
1348            ConsensusAdapterMetrics::new_test(),
1349        );
1350
1351        // transaction to submit
1352        let tx_digest = TransactionDigest::generate(&mut rng);
1353
1354        // Ensure that the original position is higher
1355        let (position, positions_moved, _) =
1356            consensus_adapter.submission_position(&committee, &tx_digest);
1357        assert_eq!(position, 7);
1358        assert!(!positions_moved > 0);
1359
1360        // Make sure that position is set to max value 0
1361        let (delay_step, position, positions_moved, _) =
1362            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1363
1364        assert_eq!(position, 1);
1365        assert_eq!(delay_step, Duration::from_secs(2));
1366        assert!(!positions_moved > 0);
1367
1368        // Without submit position and delay step
1369        let consensus_adapter = ConsensusAdapter::new(
1370            Arc::new(LazyMysticetiClient::new()),
1371            *committee.authority_by_index(0).unwrap(),
1372            Arc::new(ConnectionMonitorStatusForTests {}),
1373            100_000,
1374            100_000,
1375            None,
1376            None,
1377            ConsensusAdapterMetrics::new_test(),
1378        );
1379
1380        let (delay_step, position, positions_moved, _) =
1381            consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1382
1383        assert_eq!(position, 7);
1384
1385        // delay_step * position * 2 = 1 * 7 * 2 = 14
1386        assert_eq!(delay_step, Duration::from_secs(14));
1387        assert!(!positions_moved > 0);
1388    }
1389
1390    #[test]
1391    fn test_position_submit_certificate() {
1392        // grab a random committee and a random stake distribution
1393        let mut rng = StdRng::from_seed([0; 32]);
1394        let committee = test_committee(&mut rng, 10);
1395
1396        // generate random transaction digests, and account for validator selection
1397        const NUM_TEST_TRANSACTIONS: usize = 1000;
1398
1399        for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1400            let tx_digest = TransactionDigest::generate(&mut rng);
1401
1402            let mut zero_found = false;
1403            for (name, _) in committee.members() {
1404                let f = position_submit_certificate(&committee, name, &tx_digest);
1405                assert!(f < committee.num_members());
1406                if f == 0 {
1407                    // One and only one validator gets position 0
1408                    assert!(!zero_found);
1409                    zero_found = true;
1410                }
1411            }
1412            assert!(zero_found);
1413        }
1414    }
1415
1416    #[tokio::test]
1417    #[should_panic]
1418    async fn test_reregister_consensus_adapter_metrics() {
1419        let registry = Registry::new();
1420        // create metric the first time
1421        let _metrics = ConsensusAdapterMetrics::new(&registry);
1422        // create a new metric in the same registry without unregistering
1423        // should panic
1424        let _metrics = ConsensusAdapterMetrics::new(&registry);
1425    }
1426
1427    #[tokio::test]
1428    async fn test_unregister_consensus_adapter_metrics() {
1429        let registry = Registry::new();
1430
1431        // create metric the first time
1432        let metrics = ConsensusAdapterMetrics::new(&registry);
1433        // use metric
1434        metrics
1435            .sequencing_certificate_attempt
1436            .with_label_values(&["tx"])
1437            .inc_by(1);
1438        assert_eq!(
1439            1,
1440            metrics
1441                .sequencing_certificate_attempt
1442                .with_label_values(&["tx"])
1443                .get()
1444        );
1445        // should not panic
1446        metrics.unregister(&registry);
1447        // metric can safely be used unregistered
1448        metrics
1449            .sequencing_certificate_attempt
1450            .with_label_values(&["tx"])
1451            .inc_by(1);
1452
1453        // create a new metric in the same registry
1454        let metrics = ConsensusAdapterMetrics::new(&registry);
1455        // it's fresh
1456        assert_eq!(
1457            0,
1458            metrics
1459                .sequencing_certificate_attempt
1460                .with_label_values(&["tx"])
1461                .get()
1462        );
1463        // and can be unregistered
1464        metrics.unregister(&registry);
1465    }
1466}