1use std::{
6 collections::HashMap,
7 future::Future,
8 ops::Deref,
9 sync::{
10 Arc,
11 atomic::{AtomicU64, Ordering},
12 },
13 time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use consensus_core::BlockStatus;
18use dashmap::{DashMap, try_result::TryResult};
19use futures::{
20 FutureExt, StreamExt,
21 future::{self, Either, select},
22 pin_mut,
23 stream::FuturesUnordered,
24};
25use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, spawn_monitored_task};
26use iota_simulator::anemo::PeerId;
27use iota_types::{
28 base_types::{AuthorityName, TransactionDigest},
29 committee::Committee,
30 error::{IotaError, IotaResult},
31 fp_ensure,
32 messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
33};
34use itertools::Itertools;
35use parking_lot::RwLockReadGuard;
36use prometheus::{
37 Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
38 register_histogram_vec_with_registry, register_histogram_with_registry,
39 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
40 register_int_gauge_with_registry,
41};
42use tokio::{
43 sync::{Semaphore, SemaphorePermit, oneshot},
44 task::JoinHandle,
45 time::{self, Duration},
46};
47use tracing::{debug, info, trace, warn};
48
49use crate::{
50 authority::authority_per_epoch_store::AuthorityPerEpochStore,
51 connection_monitor::ConnectionStatus,
52 consensus_handler::{SequencedConsensusTransactionKey, classify},
53 epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
54 metrics::LatencyObserver,
55};
56
57#[cfg(test)]
58#[path = "unit_tests/consensus_tests.rs"]
59pub mod consensus_tests;
60
61const SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS: &[f64] = &[
62 0.1, 0.25, 0.5, 0.75, 1., 1.25, 1.5, 1.75, 2., 2.25, 2.5, 2.75, 3., 4., 5., 6., 7., 10., 15.,
63 20., 25., 30., 60., 90., 120., 150., 180., 210., 240., 270., 300.,
64];
65
66const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
67 0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
68];
69
70pub struct ConsensusAdapterMetrics {
71 pub sequencing_certificate_attempt: IntCounterVec,
73 pub sequencing_certificate_success: IntCounterVec,
74 pub sequencing_certificate_failures: IntCounterVec,
75 pub sequencing_certificate_status: IntCounterVec,
76 pub sequencing_certificate_inflight: IntGaugeVec,
77 pub sequencing_acknowledge_latency: HistogramVec,
78 pub sequencing_certificate_latency: HistogramVec,
79 pub sequencing_certificate_authority_position: Histogram,
80 pub sequencing_certificate_positions_moved: Histogram,
81 pub sequencing_certificate_preceding_disconnected: Histogram,
82 pub sequencing_certificate_processed: IntCounterVec,
83 pub sequencing_in_flight_semaphore_wait: IntGauge,
84 pub sequencing_in_flight_submissions: IntGauge,
85 pub sequencing_estimated_latency: IntGauge,
86 pub sequencing_resubmission_interval_ms: IntGauge,
87}
88
89impl ConsensusAdapterMetrics {
90 pub fn new(registry: &Registry) -> Self {
91 Self {
92 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
93 "sequencing_certificate_attempt",
94 "Counts the number of certificates the validator attempts to sequence.",
95 &["tx_type"],
96 registry,
97 )
98 .unwrap(),
99 sequencing_certificate_success: register_int_counter_vec_with_registry!(
100 "sequencing_certificate_success",
101 "Counts the number of successfully sequenced certificates.",
102 &["tx_type"],
103 registry,
104 )
105 .unwrap(),
106 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
107 "sequencing_certificate_failures",
108 "Counts the number of sequenced certificates that failed other than by timeout.",
109 &["tx_type"],
110 registry,
111 )
112 .unwrap(),
113 sequencing_certificate_status: register_int_counter_vec_with_registry!(
114 "sequencing_certificate_status",
115 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
116 &["tx_type", "status"],
117 registry,
118 )
119 .unwrap(),
120 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
121 "sequencing_certificate_inflight",
122 "The inflight requests to sequence certificates.",
123 &["tx_type"],
124 registry,
125 )
126 .unwrap(),
127 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
128 "sequencing_acknowledge_latency",
129 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
130 &["retry", "tx_type"],
131 SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS.to_vec(),
132 registry,
133 )
134 .unwrap(),
135 sequencing_certificate_latency: register_histogram_vec_with_registry!(
136 "sequencing_certificate_latency",
137 "The latency for sequencing a certificate.",
138 &["position", "tx_type", "processed_method"],
139 SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS.to_vec(),
140 registry,
141 ).unwrap(),
142 sequencing_certificate_authority_position: register_histogram_with_registry!(
143 "sequencing_certificate_authority_position",
144 "The position of the authority when submitted a certificate to consensus.",
145 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
146 registry,
147 ).unwrap(),
148 sequencing_certificate_positions_moved: register_histogram_with_registry!(
149 "sequencing_certificate_positions_moved",
150 "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
151 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
152 registry,
153 ).unwrap(),
154 sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155 "sequencing_certificate_preceding_disconnected",
156 "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158 registry,
159 ).unwrap(),
160 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
161 "sequencing_certificate_processed",
162 "The number of certificates that have been processed either by consensus or checkpoint.",
163 &["source"],
164 registry
165 ).unwrap(),
166 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
167 "sequencing_in_flight_semaphore_wait",
168 "How many requests are blocked on submit_permit.",
169 registry,
170 )
171 .unwrap(),
172 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
173 "sequencing_in_flight_submissions",
174 "Number of transactions submitted to local consensus instance and not yet sequenced",
175 registry,
176 )
177 .unwrap(),
178 sequencing_estimated_latency: register_int_gauge_with_registry!(
179 "sequencing_estimated_latency",
180 "Consensus latency estimated by consensus adapter in milliseconds",
181 registry,
182 )
183 .unwrap(),
184 sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
185 "sequencing_resubmission_interval_ms",
186 "Resubmission interval used by consensus adapter in milliseconds",
187 registry,
188 )
189 .unwrap(),
190 }
191 }
192
193 pub fn new_test() -> Self {
194 Self::new(&Registry::default())
195 }
196
197 pub fn unregister(&self, registry: &Registry) {
198 registry
199 .unregister(Box::new(self.sequencing_certificate_attempt.clone()))
200 .expect("sequencing_certificate_attempt is in registry");
201 registry
202 .unregister(Box::new(self.sequencing_certificate_success.clone()))
203 .expect("sequencing_certificate_success is in registry");
204 registry
205 .unregister(Box::new(self.sequencing_certificate_failures.clone()))
206 .expect("sequencing_certificate_failures is in registry");
207 registry
208 .unregister(Box::new(self.sequencing_certificate_status.clone()))
209 .expect("sequencing_certificate_status is in registry");
210 registry
211 .unregister(Box::new(self.sequencing_certificate_inflight.clone()))
212 .expect("sequencing_certificate_inflight is in registry");
213 registry
214 .unregister(Box::new(self.sequencing_acknowledge_latency.clone()))
215 .expect("sequencing_acknowledge_latency is in registry");
216 registry
217 .unregister(Box::new(self.sequencing_certificate_latency.clone()))
218 .expect("sequencing_certificate_latency is in registry");
219 registry
220 .unregister(Box::new(
221 self.sequencing_certificate_authority_position.clone(),
222 ))
223 .expect("sequencing_certificate_authority_position is in registry");
224 registry
225 .unregister(Box::new(
226 self.sequencing_certificate_positions_moved.clone(),
227 ))
228 .expect("sequencing_certificate_positions_moved is in registry");
229 registry
230 .unregister(Box::new(
231 self.sequencing_certificate_preceding_disconnected.clone(),
232 ))
233 .expect("sequencing_certificate_preceding_disconnected is in registry");
234 registry
235 .unregister(Box::new(self.sequencing_certificate_processed.clone()))
236 .expect("sequencing_certificate_processed is in registry");
237 registry
238 .unregister(Box::new(self.sequencing_in_flight_semaphore_wait.clone()))
239 .expect("sequencing_in_flight_semaphore_wait is in registry");
240 registry
241 .unregister(Box::new(self.sequencing_in_flight_submissions.clone()))
242 .expect("sequencing_in_flight_submissions is in registry");
243 registry
244 .unregister(Box::new(self.sequencing_estimated_latency.clone()))
245 .expect("sequencing_estimated_latency is in registry");
246 registry
247 .unregister(Box::new(self.sequencing_resubmission_interval_ms.clone()))
248 .expect("sequencing_resubmission_interval_ms is in registry");
249 }
250}
251
252pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
253
254#[mockall::automock]
255#[async_trait::async_trait]
256pub trait SubmitToConsensus: Sync + Send + 'static {
257 async fn submit_to_consensus(
258 &self,
259 transactions: &[ConsensusTransaction],
260 epoch_store: &Arc<AuthorityPerEpochStore>,
261 ) -> IotaResult;
262}
263
264#[mockall::automock]
265#[async_trait::async_trait]
266pub trait ConsensusClient: Sync + Send + 'static {
267 async fn submit(
268 &self,
269 transactions: &[ConsensusTransaction],
270 epoch_store: &Arc<AuthorityPerEpochStore>,
271 ) -> IotaResult<BlockStatusReceiver>;
272}
273
274pub struct ConsensusAdapter {
276 consensus_client: Arc<dyn ConsensusClient>,
278 authority: AuthorityName,
280 max_pending_transactions: usize,
282 num_inflight_transactions: AtomicU64,
284 max_submit_position: Option<usize>,
288 submit_delay_step_override: Option<Duration>,
291 connection_monitor_status: Arc<dyn CheckConnection>,
294 low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
296 metrics: ConsensusAdapterMetrics,
298 submit_semaphore: Semaphore,
300 latency_observer: LatencyObserver,
301}
302
303pub trait CheckConnection: Send + Sync {
304 fn check_connection(
305 &self,
306 ourself: &AuthorityName,
307 authority: &AuthorityName,
308 ) -> Option<ConnectionStatus>;
309 fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
310}
311
312pub struct ConnectionMonitorStatus {
313 pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
315 pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
317}
318
319pub struct ConnectionMonitorStatusForTests {}
320
321impl ConsensusAdapter {
322 pub fn new(
324 consensus_client: Arc<dyn ConsensusClient>,
325 authority: AuthorityName,
326 connection_monitor_status: Arc<dyn CheckConnection>,
327 max_pending_transactions: usize,
328 max_pending_local_submissions: usize,
329 max_submit_position: Option<usize>,
330 submit_delay_step_override: Option<Duration>,
331 metrics: ConsensusAdapterMetrics,
332 ) -> Self {
333 let num_inflight_transactions = Default::default();
334 let low_scoring_authorities =
335 ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
336 Self {
337 consensus_client,
338 authority,
339 max_pending_transactions,
340 max_submit_position,
341 submit_delay_step_override,
342 num_inflight_transactions,
343 connection_monitor_status,
344 low_scoring_authorities,
345 metrics,
346 submit_semaphore: Semaphore::new(max_pending_local_submissions),
347 latency_observer: LatencyObserver::new(),
348 }
349 }
350
351 pub fn swap_low_scoring_authorities(
352 &self,
353 new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
354 ) {
355 self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
356 }
357
358 pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
361 let mut recovered = epoch_store.get_all_pending_consensus_transactions();
367
368 #[expect(clippy::collapsible_if)] if epoch_store
370 .get_reconfig_state_read_lock_guard()
371 .is_reject_user_certs()
372 && epoch_store.pending_consensus_certificates_empty()
373 {
374 if recovered
375 .iter()
376 .any(ConsensusTransaction::is_end_of_publish)
377 {
378 recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
387 }
388 }
389 debug!(
390 "Submitting {:?} recovered pending consensus transactions to consensus",
391 recovered.len()
392 );
393 for transaction in recovered {
394 if transaction.is_end_of_publish() {
395 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
396 }
397 self.submit_unchecked(&[transaction], epoch_store);
398 }
399 }
400
401 pub fn unregister_consensus_adapter_metrics(&self, registry: &Registry) {
402 self.metrics.unregister(registry);
403 }
404
405 fn await_submit_delay(
406 &self,
407 committee: &Committee,
408 transactions: &[ConsensusTransaction],
409 ) -> (impl Future<Output = ()>, usize, usize, usize) {
410 let min_digest = transactions
412 .iter()
413 .filter_map(|tx| match &tx.kind {
414 ConsensusTransactionKind::UserTransaction(certificate) => {
415 Some(certificate.digest())
416 }
417 _ => None,
418 })
419 .min();
420
421 let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
422 Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
423 _ => (Duration::ZERO, 0, 0, 0),
424 };
425 (
426 tokio::time::sleep(duration),
427 position,
428 positions_moved,
429 preceding_disconnected,
430 )
431 }
432
433 fn await_submit_delay_user_transaction(
434 &self,
435 committee: &Committee,
436 tx_digest: &TransactionDigest,
437 ) -> (Duration, usize, usize, usize) {
438 let (position, positions_moved, preceding_disconnected) =
439 self.submission_position(committee, tx_digest);
440
441 const DEFAULT_LATENCY: Duration = Duration::from_secs(1); const MIN_LATENCY: Duration = Duration::from_millis(150);
443 const MAX_LATENCY: Duration = Duration::from_secs(3);
444
445 let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
446 self.metrics
447 .sequencing_estimated_latency
448 .set(latency.as_millis() as i64);
449
450 let latency = std::cmp::max(latency, MIN_LATENCY);
451 let latency = std::cmp::min(latency, MAX_LATENCY);
452 let latency = latency * 2;
453 let (delay_step, position) =
454 self.override_by_max_submit_position_settings(latency, position);
455
456 self.metrics
457 .sequencing_resubmission_interval_ms
458 .set(delay_step.as_millis() as i64);
459
460 (
461 delay_step * position as u32,
462 position,
463 positions_moved,
464 preceding_disconnected,
465 )
466 }
467
468 fn override_by_max_submit_position_settings(
474 &self,
475 latency: Duration,
476 mut position: usize,
477 ) -> (Duration, usize) {
478 if let Some(max_submit_position) = self.max_submit_position {
480 position = std::cmp::min(position, max_submit_position);
481 }
482
483 let delay_step = self.submit_delay_step_override.unwrap_or(latency);
484 (delay_step, position)
485 }
486
487 fn submission_position(
497 &self,
498 committee: &Committee,
499 tx_digest: &TransactionDigest,
500 ) -> (usize, usize, usize) {
501 let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
502
503 self.check_submission_wrt_connectivity_and_scores(positions)
504 }
505
506 fn check_submission_wrt_connectivity_and_scores(
533 &self,
534 positions: Vec<AuthorityName>,
535 ) -> (usize, usize, usize) {
536 let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
537 if low_scoring_authorities.get(&self.authority).is_some() {
538 return (positions.len(), 0, 0);
539 }
540 let initial_position = get_position_in_list(self.authority, positions.clone());
541 let mut preceding_disconnected = 0;
542 let mut before_our_position = true;
543
544 let filtered_positions: Vec<_> = positions
545 .into_iter()
546 .filter(|authority| {
547 let keep = self.authority == *authority; if keep {
549 before_our_position = false;
550 }
551
552 let connected = self
554 .connection_monitor_status
555 .check_connection(&self.authority, authority)
556 .unwrap_or(ConnectionStatus::Disconnected)
557 == ConnectionStatus::Connected;
558 if !connected && before_our_position {
559 preceding_disconnected += 1; }
561
562 let high_scoring = low_scoring_authorities.get(authority).is_none();
564
565 keep || (connected && high_scoring)
566 })
567 .collect();
568
569 let position = get_position_in_list(self.authority, filtered_positions);
570
571 (
572 position,
573 initial_position - position,
574 preceding_disconnected,
575 )
576 }
577
578 pub fn submit(
589 self: &Arc<Self>,
590 transaction: ConsensusTransaction,
591 lock: Option<&RwLockReadGuard<ReconfigState>>,
592 epoch_store: &Arc<AuthorityPerEpochStore>,
593 ) -> IotaResult<JoinHandle<()>> {
594 self.submit_batch(&[transaction], lock, epoch_store)
595 }
596
597 pub fn submit_batch(
598 self: &Arc<Self>,
599 transactions: &[ConsensusTransaction],
600 lock: Option<&RwLockReadGuard<ReconfigState>>,
601 epoch_store: &Arc<AuthorityPerEpochStore>,
602 ) -> IotaResult<JoinHandle<()>> {
603 if transactions.len() > 1 {
604 for transaction in transactions {
608 fp_ensure!(
609 matches!(
610 transaction.kind,
611 ConsensusTransactionKind::UserTransaction(_)
612 ),
613 IotaError::InvalidTxKindInSoftBundle
614 );
615 }
616 }
617
618 epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
619 Ok(self.submit_unchecked(transactions, epoch_store))
620 }
621
622 pub fn check_limits(&self) -> bool {
625 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
627 > self.max_pending_transactions
628 {
629 return false;
630 }
631 self.submit_semaphore.available_permits() > 0
633 }
634
635 pub(crate) fn check_consensus_overload(&self) -> IotaResult {
636 fp_ensure!(
637 self.check_limits(),
638 IotaError::TooManyTransactionsPendingConsensus
639 );
640 Ok(())
641 }
642
643 fn submit_unchecked(
644 self: &Arc<Self>,
645 transactions: &[ConsensusTransaction],
646 epoch_store: &Arc<AuthorityPerEpochStore>,
647 ) -> JoinHandle<()> {
648 let async_stage = self
651 .clone()
652 .submit_and_wait(transactions.to_vec(), epoch_store.clone());
653 let join_handle = spawn_monitored_task!(async_stage);
656 join_handle
657 }
658
659 async fn submit_and_wait(
660 self: Arc<Self>,
661 transactions: Vec<ConsensusTransaction>,
662 epoch_store: Arc<AuthorityPerEpochStore>,
663 ) {
664 epoch_store
678 .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
679 .await
680 .ok(); }
683
684 async fn submit_and_wait_inner(
685 self: Arc<Self>,
686 transactions: Vec<ConsensusTransaction>,
687 epoch_store: &Arc<AuthorityPerEpochStore>,
688 ) {
689 if transactions.is_empty() {
690 return;
691 }
692
693 let is_soft_bundle = transactions.len() > 1;
700
701 let mut transaction_keys = Vec::new();
702
703 for transaction in &transactions {
704 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
705 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
706 epoch_store.record_epoch_pending_certs_process_time_metric();
707 }
708
709 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
710 transaction_keys.push(transaction_key);
711 }
712 let tx_type = if !is_soft_bundle {
713 classify(&transactions[0])
714 } else {
715 "soft_bundle"
716 };
717
718 let mut guard = InflightDropGuard::acquire(&self, tx_type);
719
720 let (await_submit, position, positions_moved, preceding_disconnected) =
722 self.await_submit_delay(epoch_store.committee(), &transactions[..]);
723
724 let processed_via_consensus_or_checkpoint =
727 self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
728 pin_mut!(processed_via_consensus_or_checkpoint);
729
730 let processed_waiter = tokio::select! {
731 _ = await_submit => Some(processed_via_consensus_or_checkpoint),
733
734 _ = epoch_store.user_certs_closed_notify() => {
736 warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
737 Some(processed_via_consensus_or_checkpoint)
738 }
739
740 _ = &mut processed_via_consensus_or_checkpoint => {
742 None
743 }
744 };
745
746 let _monitor = if !is_soft_bundle
748 && matches!(
749 transactions[0].kind,
750 ConsensusTransactionKind::EndOfPublish(_)
751 | ConsensusTransactionKind::CapabilityNotificationV1(_)
752 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
753 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
754 ) {
755 let transaction_keys = transaction_keys.clone();
756 Some(CancelOnDrop(spawn_monitored_task!(async {
757 let mut i = 0u64;
758 loop {
759 i += 1;
760 const WARN_DELAY_S: u64 = 30;
761 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
762 let total_wait = i * WARN_DELAY_S;
763 warn!(
764 "Still waiting {} seconds for transactions {:?} to commit in consensus",
765 total_wait, transaction_keys
766 );
767 }
768 })))
769 } else {
770 None
771 };
772
773 if let Some(processed_waiter) = processed_waiter {
774 debug!("Submitting {:?} to consensus", transaction_keys);
775
776 guard.position = Some(position);
779 guard.positions_moved = Some(positions_moved);
780 guard.preceding_disconnected = Some(preceding_disconnected);
781
782 let _permit: SemaphorePermit = self
783 .submit_semaphore
784 .acquire()
785 .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
786 .await
787 .expect("Consensus adapter does not close semaphore");
788 let _in_flight_submission_guard =
789 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
790
791 let submit_inner = async {
795 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
796
797 loop {
798 let status_waiter = self
801 .submit_inner(
802 &transactions,
803 epoch_store,
804 &transaction_keys,
805 tx_type,
806 is_soft_bundle,
807 )
808 .await;
809
810 match status_waiter.await {
811 Ok(BlockStatus::Sequenced(_)) => {
812 self.metrics
813 .sequencing_certificate_status
814 .with_label_values(&[tx_type, "sequenced"])
815 .inc();
816 trace!(
819 "Transaction {transaction_keys:?} has been sequenced by consensus."
820 );
821 break;
822 }
823 Ok(BlockStatus::GarbageCollected(_)) => {
824 self.metrics
825 .sequencing_certificate_status
826 .with_label_values(&[tx_type, "garbage_collected"])
827 .inc();
828 debug!(
836 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
837 );
838 time::sleep(RETRY_DELAY_STEP).await;
839 continue;
840 }
841 Err(err) => {
842 warn!(
843 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
844 err
845 );
846 time::sleep(RETRY_DELAY_STEP).await;
847 continue;
848 }
849 }
850 }
851 };
852
853 guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
854 Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
855 Either::Right(((), processed_waiter)) => {
856 debug!("Submitted {transaction_keys:?} to consensus");
857 processed_waiter.await
858 }
859 };
860 }
861 debug!("{transaction_keys:?} processed by consensus");
862
863 let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
864 epoch_store
865 .remove_pending_consensus_transactions(&consensus_keys)
866 .expect("Storage error when removing consensus transaction");
867
868 let is_user_tx = is_soft_bundle
869 || matches!(
870 transactions[0].kind,
871 ConsensusTransactionKind::UserTransaction(_)
872 );
873 let send_end_of_publish = if is_user_tx {
874 if epoch_store
884 .get_reconfig_state_read_lock_guard()
885 .is_reject_user_certs()
886 {
887 let pending_count = epoch_store.pending_consensus_certificates_count();
888 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
889 pending_count == 0 } else {
891 false
892 }
893 } else {
894 false
895 };
896 if send_end_of_publish {
897 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
899 if let Err(err) = self.submit(
900 ConsensusTransaction::new_end_of_publish(self.authority),
901 None,
902 epoch_store,
903 ) {
904 warn!("Error when sending end of publish message: {:?}", err);
905 }
906 }
907 self.metrics
908 .sequencing_certificate_success
909 .with_label_values(&[tx_type])
910 .inc();
911 }
912
913 async fn submit_inner(
914 self: &Arc<Self>,
915 transactions: &[ConsensusTransaction],
916 epoch_store: &Arc<AuthorityPerEpochStore>,
917 transaction_keys: &[SequencedConsensusTransactionKey],
918 tx_type: &str,
919 is_soft_bundle: bool,
920 ) -> BlockStatusReceiver {
921 let ack_start = Instant::now();
922 let mut retries: u32 = 0;
923
924 let status_waiter = loop {
925 match self
926 .consensus_client
927 .submit(transactions, epoch_store)
928 .await
929 {
930 Err(err) => {
931 if retries > 30
934 || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
935 {
936 warn!(
937 "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
938 );
939 }
940 self.metrics
941 .sequencing_certificate_failures
942 .with_label_values(&[tx_type])
943 .inc();
944 retries += 1;
945
946 if !is_soft_bundle && transactions[0].kind.is_dkg() {
947 time::sleep(Duration::from_millis(100)).await;
950 } else {
951 time::sleep(Duration::from_secs(10)).await;
952 };
953 }
954 Ok(status_waiter) => {
955 break status_waiter;
956 }
957 }
958 };
959
960 let bucket = match retries {
964 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
966 21..=50 => "between_20_and_50".to_string(),
967 51..=100 => "between_50_and_100".to_string(),
968 _ => "over_100".to_string(),
969 };
970
971 self.metrics
972 .sequencing_acknowledge_latency
973 .with_label_values(&[bucket.as_str(), tx_type])
974 .observe(ack_start.elapsed().as_secs_f64());
975
976 status_waiter
977 }
978
979 async fn await_consensus_or_checkpoint(
984 self: &Arc<Self>,
985 transaction_keys: Vec<SequencedConsensusTransactionKey>,
986 epoch_store: &Arc<AuthorityPerEpochStore>,
987 ) -> ProcessedMethod {
988 let notifications = FuturesUnordered::new();
989 for transaction_key in transaction_keys {
990 let transaction_digests = if let SequencedConsensusTransactionKey::External(
991 ConsensusTransactionKey::Certificate(digest),
992 ) = transaction_key
993 {
994 vec![digest]
995 } else {
996 vec![]
997 };
998
999 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
1000 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
1001 ) = transaction_key
1002 {
1003 Either::Left(epoch_store.synced_checkpoint_notify(checkpoint_sequence_number))
1008 } else {
1009 Either::Right(future::pending())
1010 };
1011
1012 notifications.push(async move {
1019 tokio::select! {
1020 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
1021 processed.expect("Storage error when waiting for consensus message processed");
1022 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
1023 return ProcessedMethod::Consensus;
1024 },
1025 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1026 processed.expect("Storage error when waiting for transaction executed in checkpoint");
1027 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1028 }
1029 processed = checkpoint_synced_future => {
1030 processed.expect("Error when waiting for checkpoint sequence number");
1031 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1032 }
1033 }
1034 ProcessedMethod::Checkpoint
1035 });
1036 }
1037
1038 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1039 for method in processed_methods {
1040 if method == ProcessedMethod::Checkpoint {
1041 return ProcessedMethod::Checkpoint;
1042 }
1043 }
1044 ProcessedMethod::Consensus
1045 }
1046}
1047
1048impl CheckConnection for ConnectionMonitorStatus {
1049 fn check_connection(
1050 &self,
1051 ourself: &AuthorityName,
1052 authority: &AuthorityName,
1053 ) -> Option<ConnectionStatus> {
1054 if ourself == authority {
1055 return Some(ConnectionStatus::Connected);
1056 }
1057
1058 let mapping = self.authority_names_to_peer_ids.load_full();
1059 let peer_id = match mapping.get(authority) {
1060 Some(p) => p,
1061 None => {
1062 warn!(
1063 "failed to find peer {:?} in connection monitor listener",
1064 authority
1065 );
1066 return None;
1067 }
1068 };
1069
1070 let res = match self.connection_statuses.try_get(peer_id) {
1071 TryResult::Present(c) => Some(c.value().clone()),
1072 TryResult::Absent => None,
1073 TryResult::Locked => {
1074 Some(ConnectionStatus::Disconnected)
1076 }
1077 };
1078 res
1079 }
1080 fn update_mapping_for_epoch(
1081 &self,
1082 authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1083 ) {
1084 self.authority_names_to_peer_ids
1085 .swap(Arc::new(authority_names_to_peer_ids));
1086 }
1087}
1088
1089impl CheckConnection for ConnectionMonitorStatusForTests {
1090 fn check_connection(
1091 &self,
1092 _ourself: &AuthorityName,
1093 _authority: &AuthorityName,
1094 ) -> Option<ConnectionStatus> {
1095 Some(ConnectionStatus::Connected)
1096 }
1097 fn update_mapping_for_epoch(
1098 &self,
1099 _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1100 ) {
1101 }
1102}
1103
1104pub fn get_position_in_list(
1105 search_authority: AuthorityName,
1106 positions: Vec<AuthorityName>,
1107) -> usize {
1108 positions
1109 .into_iter()
1110 .find_position(|authority| *authority == search_authority)
1111 .expect("Couldn't find ourselves in shuffled committee")
1112 .0
1113}
1114
1115impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1116 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1121 let send_end_of_publish = {
1122 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1123 if !reconfig_guard.should_accept_user_certs() {
1124 return;
1126 }
1127 let pending_count = epoch_store.pending_consensus_certificates_count();
1128 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1129 let send_end_of_publish = pending_count == 0;
1130 epoch_store.close_user_certs(reconfig_guard);
1131 send_end_of_publish
1132 };
1134 if send_end_of_publish {
1135 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1136 if let Err(err) = self.submit(
1137 ConsensusTransaction::new_end_of_publish(self.authority),
1138 None,
1139 epoch_store,
1140 ) {
1141 warn!("Error when sending end of publish message: {:?}", err);
1142 }
1143 }
1144 }
1145}
1146
1147struct CancelOnDrop<T>(JoinHandle<T>);
1148
1149impl<T> Deref for CancelOnDrop<T> {
1150 type Target = JoinHandle<T>;
1151
1152 fn deref(&self) -> &Self::Target {
1153 &self.0
1154 }
1155}
1156
1157impl<T> Drop for CancelOnDrop<T> {
1158 fn drop(&mut self) {
1159 self.0.abort();
1160 }
1161}
1162
1163struct InflightDropGuard<'a> {
1165 adapter: &'a ConsensusAdapter,
1166 start: Instant,
1167 position: Option<usize>,
1168 positions_moved: Option<usize>,
1169 preceding_disconnected: Option<usize>,
1170 tx_type: &'static str,
1171 processed_method: ProcessedMethod,
1172}
1173
1174#[derive(PartialEq, Eq)]
1175enum ProcessedMethod {
1176 Consensus,
1177 Checkpoint,
1178}
1179
1180impl<'a> InflightDropGuard<'a> {
1181 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1182 adapter
1183 .num_inflight_transactions
1184 .fetch_add(1, Ordering::SeqCst);
1185 adapter
1186 .metrics
1187 .sequencing_certificate_inflight
1188 .with_label_values(&[tx_type])
1189 .inc();
1190 adapter
1191 .metrics
1192 .sequencing_certificate_attempt
1193 .with_label_values(&[tx_type])
1194 .inc();
1195 Self {
1196 adapter,
1197 start: Instant::now(),
1198 position: None,
1199 positions_moved: None,
1200 preceding_disconnected: None,
1201 tx_type,
1202 processed_method: ProcessedMethod::Consensus,
1203 }
1204 }
1205}
1206
1207impl Drop for InflightDropGuard<'_> {
1208 fn drop(&mut self) {
1209 self.adapter
1210 .num_inflight_transactions
1211 .fetch_sub(1, Ordering::SeqCst);
1212 self.adapter
1213 .metrics
1214 .sequencing_certificate_inflight
1215 .with_label_values(&[self.tx_type])
1216 .dec();
1217
1218 let position = if let Some(position) = self.position {
1219 self.adapter
1220 .metrics
1221 .sequencing_certificate_authority_position
1222 .observe(position as f64);
1223 position.to_string()
1224 } else {
1225 "not_submitted".to_string()
1226 };
1227
1228 if let Some(positions_moved) = self.positions_moved {
1229 self.adapter
1230 .metrics
1231 .sequencing_certificate_positions_moved
1232 .observe(positions_moved as f64);
1233 };
1234
1235 if let Some(preceding_disconnected) = self.preceding_disconnected {
1236 self.adapter
1237 .metrics
1238 .sequencing_certificate_preceding_disconnected
1239 .observe(preceding_disconnected as f64);
1240 };
1241
1242 let latency = self.start.elapsed();
1243 let processed_method = match self.processed_method {
1244 ProcessedMethod::Consensus => "processed_via_consensus",
1245 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1246 };
1247 self.adapter
1248 .metrics
1249 .sequencing_certificate_latency
1250 .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1251 .observe(latency.as_secs_f64());
1252
1253 if self.position == Some(0) {
1259 let sampled = matches!(
1262 self.tx_type,
1263 "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1264 );
1265 if sampled && self.processed_method == ProcessedMethod::Consensus {
1268 self.adapter.latency_observer.report(latency);
1269 }
1270 }
1271 }
1272}
1273
1274#[async_trait::async_trait]
1275impl SubmitToConsensus for Arc<ConsensusAdapter> {
1276 async fn submit_to_consensus(
1277 &self,
1278 transactions: &[ConsensusTransaction],
1279 epoch_store: &Arc<AuthorityPerEpochStore>,
1280 ) -> IotaResult {
1281 self.submit_batch(transactions, None, epoch_store)
1282 .map(|_| ())
1283 }
1284}
1285
1286pub fn position_submit_certificate(
1287 committee: &Committee,
1288 ourselves: &AuthorityName,
1289 tx_digest: &TransactionDigest,
1290) -> usize {
1291 let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1292 get_position_in_list(*ourselves, validators)
1293}
1294
1295#[cfg(test)]
1296mod adapter_tests {
1297 use std::{sync::Arc, time::Duration};
1298
1299 use fastcrypto::traits::KeyPair;
1300 use iota_types::{
1301 base_types::TransactionDigest,
1302 committee::Committee,
1303 crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1304 };
1305 use prometheus::Registry;
1306 use rand::{Rng, SeedableRng, rngs::StdRng};
1307
1308 use super::position_submit_certificate;
1309 use crate::{
1310 consensus_adapter::{
1311 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1312 },
1313 mysticeti_adapter::LazyMysticetiClient,
1314 };
1315
1316 fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1317 let authorities = (0..size)
1318 .map(|_k| {
1319 (
1320 AuthorityPublicKeyBytes::from(
1321 get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1322 ),
1323 rng.gen_range(0u64..10u64),
1324 )
1325 })
1326 .collect::<Vec<_>>();
1327 Committee::new_for_testing_with_normalized_voting_power(
1328 0,
1329 authorities.iter().cloned().collect(),
1330 )
1331 }
1332
1333 #[tokio::test]
1334 async fn test_await_submit_delay_user_transaction() {
1335 let mut rng = StdRng::from_seed([0; 32]);
1337 let committee = test_committee(&mut rng, 10);
1338
1339 let consensus_adapter = ConsensusAdapter::new(
1341 Arc::new(LazyMysticetiClient::new()),
1342 *committee.authority_by_index(0).unwrap(),
1343 Arc::new(ConnectionMonitorStatusForTests {}),
1344 100_000,
1345 100_000,
1346 Some(1),
1347 Some(Duration::from_secs(2)),
1348 ConsensusAdapterMetrics::new_test(),
1349 );
1350
1351 let tx_digest = TransactionDigest::generate(&mut rng);
1353
1354 let (position, positions_moved, _) =
1356 consensus_adapter.submission_position(&committee, &tx_digest);
1357 assert_eq!(position, 7);
1358 assert!(!positions_moved > 0);
1359
1360 let (delay_step, position, positions_moved, _) =
1362 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1363
1364 assert_eq!(position, 1);
1365 assert_eq!(delay_step, Duration::from_secs(2));
1366 assert!(!positions_moved > 0);
1367
1368 let consensus_adapter = ConsensusAdapter::new(
1370 Arc::new(LazyMysticetiClient::new()),
1371 *committee.authority_by_index(0).unwrap(),
1372 Arc::new(ConnectionMonitorStatusForTests {}),
1373 100_000,
1374 100_000,
1375 None,
1376 None,
1377 ConsensusAdapterMetrics::new_test(),
1378 );
1379
1380 let (delay_step, position, positions_moved, _) =
1381 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1382
1383 assert_eq!(position, 7);
1384
1385 assert_eq!(delay_step, Duration::from_secs(14));
1387 assert!(!positions_moved > 0);
1388 }
1389
1390 #[test]
1391 fn test_position_submit_certificate() {
1392 let mut rng = StdRng::from_seed([0; 32]);
1394 let committee = test_committee(&mut rng, 10);
1395
1396 const NUM_TEST_TRANSACTIONS: usize = 1000;
1398
1399 for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1400 let tx_digest = TransactionDigest::generate(&mut rng);
1401
1402 let mut zero_found = false;
1403 for (name, _) in committee.members() {
1404 let f = position_submit_certificate(&committee, name, &tx_digest);
1405 assert!(f < committee.num_members());
1406 if f == 0 {
1407 assert!(!zero_found);
1409 zero_found = true;
1410 }
1411 }
1412 assert!(zero_found);
1413 }
1414 }
1415
1416 #[tokio::test]
1417 #[should_panic]
1418 async fn test_reregister_consensus_adapter_metrics() {
1419 let registry = Registry::new();
1420 let _metrics = ConsensusAdapterMetrics::new(®istry);
1422 let _metrics = ConsensusAdapterMetrics::new(®istry);
1425 }
1426
1427 #[tokio::test]
1428 async fn test_unregister_consensus_adapter_metrics() {
1429 let registry = Registry::new();
1430
1431 let metrics = ConsensusAdapterMetrics::new(®istry);
1433 metrics
1435 .sequencing_certificate_attempt
1436 .with_label_values(&["tx"])
1437 .inc_by(1);
1438 assert_eq!(
1439 1,
1440 metrics
1441 .sequencing_certificate_attempt
1442 .with_label_values(&["tx"])
1443 .get()
1444 );
1445 metrics.unregister(®istry);
1447 metrics
1449 .sequencing_certificate_attempt
1450 .with_label_values(&["tx"])
1451 .inc_by(1);
1452
1453 let metrics = ConsensusAdapterMetrics::new(®istry);
1455 assert_eq!(
1457 0,
1458 metrics
1459 .sequencing_certificate_attempt
1460 .with_label_values(&["tx"])
1461 .get()
1462 );
1463 metrics.unregister(®istry);
1465 }
1466}