1use std::{
6 collections::HashMap,
7 future::Future,
8 ops::Deref,
9 sync::{
10 Arc,
11 atomic::{AtomicU64, Ordering},
12 },
13 time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use consensus_core::BlockStatus;
18use dashmap::{DashMap, try_result::TryResult};
19use futures::{
20 FutureExt, StreamExt,
21 future::{self, Either, select},
22 pin_mut,
23 stream::FuturesUnordered,
24};
25use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task};
26use iota_simulator::anemo::PeerId;
27use iota_types::{
28 base_types::{AuthorityName, TransactionDigest},
29 committee::Committee,
30 error::{IotaError, IotaResult},
31 fp_ensure,
32 messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
33};
34use itertools::Itertools;
35use parking_lot::RwLockReadGuard;
36use prometheus::{
37 Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
38 register_histogram_vec_with_registry, register_histogram_with_registry,
39 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
40 register_int_gauge_with_registry,
41};
42use tokio::{
43 sync::{Semaphore, SemaphorePermit, oneshot},
44 task::JoinHandle,
45 time::{
46 Duration, {self},
47 },
48};
49use tracing::{debug, info, trace, warn};
50
51use crate::{
52 authority::authority_per_epoch_store::AuthorityPerEpochStore,
53 connection_monitor::ConnectionStatus,
54 consensus_handler::{SequencedConsensusTransactionKey, classify},
55 epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
56 metrics::LatencyObserver,
57};
58
59#[cfg(test)]
60#[path = "unit_tests/consensus_tests.rs"]
61pub mod consensus_tests;
62
63const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
64 0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
65];
66
67pub struct ConsensusAdapterMetrics {
68 pub sequencing_certificate_attempt: IntCounterVec,
70 pub sequencing_certificate_success: IntCounterVec,
71 pub sequencing_certificate_failures: IntCounterVec,
72 pub sequencing_certificate_status: IntCounterVec,
73 pub sequencing_certificate_inflight: IntGaugeVec,
74 pub sequencing_acknowledge_latency: HistogramVec,
75 pub sequencing_certificate_latency: HistogramVec,
76 pub sequencing_certificate_authority_position: Histogram,
77 pub sequencing_certificate_positions_moved: Histogram,
78 pub sequencing_certificate_preceding_disconnected: Histogram,
79 pub sequencing_certificate_processed: IntCounterVec,
80 pub sequencing_in_flight_semaphore_wait: IntGauge,
81 pub sequencing_in_flight_submissions: IntGauge,
82 pub sequencing_estimated_latency: IntGauge,
83 pub sequencing_resubmission_interval_ms: IntGauge,
84}
85
86impl ConsensusAdapterMetrics {
87 pub fn new(registry: &Registry) -> Self {
88 Self {
89 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
90 "sequencing_certificate_attempt",
91 "Counts the number of certificates the validator attempts to sequence.",
92 &["tx_type"],
93 registry,
94 )
95 .unwrap(),
96 sequencing_certificate_success: register_int_counter_vec_with_registry!(
97 "sequencing_certificate_success",
98 "Counts the number of successfully sequenced certificates.",
99 &["tx_type"],
100 registry,
101 )
102 .unwrap(),
103 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
104 "sequencing_certificate_failures",
105 "Counts the number of sequenced certificates that failed other than by timeout.",
106 &["tx_type"],
107 registry,
108 )
109 .unwrap(),
110 sequencing_certificate_status: register_int_counter_vec_with_registry!(
111 "sequencing_certificate_status",
112 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
113 &["tx_type", "status"],
114 registry,
115 )
116 .unwrap(),
117 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
118 "sequencing_certificate_inflight",
119 "The inflight requests to sequence certificates.",
120 &["tx_type"],
121 registry,
122 )
123 .unwrap(),
124 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
125 "sequencing_acknowledge_latency",
126 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
127 &["retry", "tx_type"],
128 LATENCY_SEC_BUCKETS.to_vec(),
129 registry,
130 )
131 .unwrap(),
132 sequencing_certificate_latency: register_histogram_vec_with_registry!(
133 "sequencing_certificate_latency",
134 "The latency for sequencing a certificate.",
135 &["position", "tx_type", "processed_method"],
136 LATENCY_SEC_BUCKETS.to_vec(),
137 registry,
138 )
139 .unwrap(),
140 sequencing_certificate_authority_position: register_histogram_with_registry!(
141 "sequencing_certificate_authority_position",
142 "The position of the authority when submitted a certificate to consensus.",
143 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
144 registry,
145 )
146 .unwrap(),
147 sequencing_certificate_positions_moved: register_histogram_with_registry!(
148 "sequencing_certificate_positions_moved",
149 "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
150 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
151 registry,
152 )
153 .unwrap(),
154 sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155 "sequencing_certificate_preceding_disconnected",
156 "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158 registry,
159 )
160 .unwrap(),
161 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162 "sequencing_certificate_processed",
163 "The number of certificates that have been processed either by consensus or checkpoint.",
164 &["source"],
165 registry
166 )
167 .unwrap(),
168 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169 "sequencing_in_flight_semaphore_wait",
170 "How many requests are blocked on submit_permit.",
171 registry,
172 )
173 .unwrap(),
174 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175 "sequencing_in_flight_submissions",
176 "Number of transactions submitted to local consensus instance and not yet sequenced",
177 registry,
178 )
179 .unwrap(),
180 sequencing_estimated_latency: register_int_gauge_with_registry!(
181 "sequencing_estimated_latency",
182 "Consensus latency estimated by consensus adapter in milliseconds",
183 registry,
184 )
185 .unwrap(),
186 sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187 "sequencing_resubmission_interval_ms",
188 "Resubmission interval used by consensus adapter in milliseconds",
189 registry,
190 )
191 .unwrap(),
192 }
193 }
194
195 pub fn new_test() -> Self {
196 Self::new(&Registry::default())
197 }
198}
199
200pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
201
202#[mockall::automock]
203pub trait SubmitToConsensus: Sync + Send + 'static {
204 fn submit_to_consensus(
205 &self,
206 transactions: &[ConsensusTransaction],
207 epoch_store: &Arc<AuthorityPerEpochStore>,
208 ) -> IotaResult;
209}
210
211#[mockall::automock]
212#[async_trait::async_trait]
213pub trait ConsensusClient: Sync + Send + 'static {
214 async fn submit(
215 &self,
216 transactions: &[ConsensusTransaction],
217 epoch_store: &Arc<AuthorityPerEpochStore>,
218 ) -> IotaResult<BlockStatusReceiver>;
219}
220
221pub struct ConsensusAdapter {
223 consensus_client: Arc<dyn ConsensusClient>,
225 authority: AuthorityName,
227 max_pending_transactions: usize,
229 num_inflight_transactions: AtomicU64,
231 max_submit_position: Option<usize>,
235 submit_delay_step_override: Option<Duration>,
238 connection_monitor_status: Arc<dyn CheckConnection>,
241 low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
243 metrics: ConsensusAdapterMetrics,
245 submit_semaphore: Semaphore,
247 latency_observer: LatencyObserver,
248}
249
250pub trait CheckConnection: Send + Sync {
251 fn check_connection(
252 &self,
253 ourself: &AuthorityName,
254 authority: &AuthorityName,
255 ) -> Option<ConnectionStatus>;
256 fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
257}
258
259pub struct ConnectionMonitorStatus {
260 pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
262 pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
264}
265
266pub struct ConnectionMonitorStatusForTests {}
267
268impl ConsensusAdapter {
269 pub fn new(
271 consensus_client: Arc<dyn ConsensusClient>,
272 authority: AuthorityName,
273 connection_monitor_status: Arc<dyn CheckConnection>,
274 max_pending_transactions: usize,
275 max_pending_local_submissions: usize,
276 max_submit_position: Option<usize>,
277 submit_delay_step_override: Option<Duration>,
278 metrics: ConsensusAdapterMetrics,
279 ) -> Self {
280 let num_inflight_transactions = Default::default();
281 let low_scoring_authorities =
282 ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
283 Self {
284 consensus_client,
285 authority,
286 max_pending_transactions,
287 max_submit_position,
288 submit_delay_step_override,
289 num_inflight_transactions,
290 connection_monitor_status,
291 low_scoring_authorities,
292 metrics,
293 submit_semaphore: Semaphore::new(max_pending_local_submissions),
294 latency_observer: LatencyObserver::new(),
295 }
296 }
297
298 pub fn swap_low_scoring_authorities(
299 &self,
300 new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
301 ) {
302 self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
303 }
304
305 pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
308 let mut recovered = epoch_store.get_all_pending_consensus_transactions();
314
315 #[expect(clippy::collapsible_if)] if epoch_store
317 .get_reconfig_state_read_lock_guard()
318 .is_reject_user_certs()
319 && epoch_store.pending_consensus_certificates_empty()
320 {
321 if recovered
322 .iter()
323 .any(ConsensusTransaction::is_end_of_publish)
324 {
325 recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
334 }
335 }
336 debug!(
337 "Submitting {:?} recovered pending consensus transactions to consensus",
338 recovered.len()
339 );
340 for transaction in recovered {
341 if transaction.is_end_of_publish() {
342 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
343 }
344 self.submit_unchecked(&[transaction], epoch_store);
345 }
346 }
347
348 fn await_submit_delay(
349 &self,
350 committee: &Committee,
351 transactions: &[ConsensusTransaction],
352 ) -> (impl Future<Output = ()>, usize, usize, usize) {
353 let min_digest = transactions
355 .iter()
356 .filter_map(|tx| match &tx.kind {
357 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
358 Some(certificate.digest())
359 }
360 _ => None,
361 })
362 .min();
363
364 let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
365 Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
366 _ => (Duration::ZERO, 0, 0, 0),
367 };
368 (
369 tokio::time::sleep(duration),
370 position,
371 positions_moved,
372 preceding_disconnected,
373 )
374 }
375
376 fn await_submit_delay_user_transaction(
377 &self,
378 committee: &Committee,
379 tx_digest: &TransactionDigest,
380 ) -> (Duration, usize, usize, usize) {
381 let (position, positions_moved, preceding_disconnected) =
382 self.submission_position(committee, tx_digest);
383
384 const DEFAULT_LATENCY: Duration = Duration::from_secs(1); const MIN_LATENCY: Duration = Duration::from_millis(150);
386 const MAX_LATENCY: Duration = Duration::from_secs(3);
387
388 let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
389 self.metrics
390 .sequencing_estimated_latency
391 .set(latency.as_millis() as i64);
392
393 let latency = std::cmp::max(latency, MIN_LATENCY);
394 let latency = std::cmp::min(latency, MAX_LATENCY);
395 let latency = latency * 2;
396 let (delay_step, position) =
397 self.override_by_max_submit_position_settings(latency, position);
398
399 self.metrics
400 .sequencing_resubmission_interval_ms
401 .set(delay_step.as_millis() as i64);
402
403 (
404 delay_step * position as u32,
405 position,
406 positions_moved,
407 preceding_disconnected,
408 )
409 }
410
411 fn override_by_max_submit_position_settings(
417 &self,
418 latency: Duration,
419 mut position: usize,
420 ) -> (Duration, usize) {
421 if let Some(max_submit_position) = self.max_submit_position {
423 position = std::cmp::min(position, max_submit_position);
424 }
425
426 let delay_step = self.submit_delay_step_override.unwrap_or(latency);
427 (delay_step, position)
428 }
429
430 fn submission_position(
440 &self,
441 committee: &Committee,
442 tx_digest: &TransactionDigest,
443 ) -> (usize, usize, usize) {
444 let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
445
446 self.check_submission_wrt_connectivity_and_scores(positions)
447 }
448
449 fn check_submission_wrt_connectivity_and_scores(
476 &self,
477 positions: Vec<AuthorityName>,
478 ) -> (usize, usize, usize) {
479 let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
480 if low_scoring_authorities.get(&self.authority).is_some() {
481 return (positions.len(), 0, 0);
482 }
483 let initial_position = get_position_in_list(self.authority, positions.clone());
484 let mut preceding_disconnected = 0;
485 let mut before_our_position = true;
486
487 let filtered_positions: Vec<_> = positions
488 .into_iter()
489 .filter(|authority| {
490 let keep = self.authority == *authority; if keep {
492 before_our_position = false;
493 }
494
495 let connected = self
497 .connection_monitor_status
498 .check_connection(&self.authority, authority)
499 .unwrap_or(ConnectionStatus::Disconnected)
500 == ConnectionStatus::Connected;
501 if !connected && before_our_position {
502 preceding_disconnected += 1; }
504
505 let high_scoring = low_scoring_authorities.get(authority).is_none();
507
508 keep || (connected && high_scoring)
509 })
510 .collect();
511
512 let position = get_position_in_list(self.authority, filtered_positions);
513
514 (
515 position,
516 initial_position - position,
517 preceding_disconnected,
518 )
519 }
520
521 pub fn submit(
532 self: &Arc<Self>,
533 transaction: ConsensusTransaction,
534 lock: Option<&RwLockReadGuard<ReconfigState>>,
535 epoch_store: &Arc<AuthorityPerEpochStore>,
536 ) -> IotaResult<JoinHandle<()>> {
537 self.submit_batch(&[transaction], lock, epoch_store)
538 }
539
540 pub fn submit_batch(
541 self: &Arc<Self>,
542 transactions: &[ConsensusTransaction],
543 lock: Option<&RwLockReadGuard<ReconfigState>>,
544 epoch_store: &Arc<AuthorityPerEpochStore>,
545 ) -> IotaResult<JoinHandle<()>> {
546 if transactions.len() > 1 {
547 for transaction in transactions {
551 fp_ensure!(
552 matches!(
553 transaction.kind,
554 ConsensusTransactionKind::CertifiedTransaction(_)
555 ),
556 IotaError::InvalidTxKindInSoftBundle
557 );
558 }
559 }
560
561 epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
562 Ok(self.submit_unchecked(transactions, epoch_store))
563 }
564
565 pub fn check_limits(&self) -> bool {
568 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
570 > self.max_pending_transactions
571 {
572 return false;
573 }
574 self.submit_semaphore.available_permits() > 0
576 }
577
578 pub(crate) fn check_consensus_overload(&self) -> IotaResult {
579 fp_ensure!(
580 self.check_limits(),
581 IotaError::TooManyTransactionsPendingConsensus
582 );
583 Ok(())
584 }
585
586 fn submit_unchecked(
587 self: &Arc<Self>,
588 transactions: &[ConsensusTransaction],
589 epoch_store: &Arc<AuthorityPerEpochStore>,
590 ) -> JoinHandle<()> {
591 let async_stage = self
594 .clone()
595 .submit_and_wait(transactions.to_vec(), epoch_store.clone());
596 let join_handle = spawn_monitored_task!(async_stage);
599 join_handle
600 }
601
602 async fn submit_and_wait(
603 self: Arc<Self>,
604 transactions: Vec<ConsensusTransaction>,
605 epoch_store: Arc<AuthorityPerEpochStore>,
606 ) {
607 epoch_store
621 .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
622 .await
623 .ok(); }
626
627 async fn submit_and_wait_inner(
628 self: Arc<Self>,
629 transactions: Vec<ConsensusTransaction>,
630 epoch_store: &Arc<AuthorityPerEpochStore>,
631 ) {
632 if transactions.is_empty() {
633 return;
634 }
635
636 let is_soft_bundle = transactions.len() > 1;
643
644 let mut transaction_keys = Vec::new();
645
646 for transaction in &transactions {
647 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
648 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
649 epoch_store.record_epoch_pending_certs_process_time_metric();
650 }
651
652 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
653 transaction_keys.push(transaction_key);
654 }
655 let tx_type = if !is_soft_bundle {
656 classify(&transactions[0])
657 } else {
658 "soft_bundle"
659 };
660
661 let mut guard = InflightDropGuard::acquire(&self, tx_type);
662
663 let (await_submit, position, positions_moved, preceding_disconnected) =
665 self.await_submit_delay(epoch_store.committee(), &transactions[..]);
666
667 let processed_via_consensus_or_checkpoint =
670 self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
671 pin_mut!(processed_via_consensus_or_checkpoint);
672
673 let processed_waiter = tokio::select! {
674 _ = await_submit => Some(processed_via_consensus_or_checkpoint),
676
677 _ = epoch_store.user_certs_closed_notify() => {
679 warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
680 Some(processed_via_consensus_or_checkpoint)
681 }
682
683 _ = &mut processed_via_consensus_or_checkpoint => {
685 None
686 }
687 };
688
689 let _monitor = if !is_soft_bundle
691 && matches!(
692 transactions[0].kind,
693 ConsensusTransactionKind::EndOfPublish(_)
694 | ConsensusTransactionKind::CapabilityNotificationV1(_)
695 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
696 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
697 ) {
698 let transaction_keys = transaction_keys.clone();
699 Some(CancelOnDrop(spawn_monitored_task!(async {
700 let mut i = 0u64;
701 loop {
702 i += 1;
703 const WARN_DELAY_S: u64 = 30;
704 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
705 let total_wait = i * WARN_DELAY_S;
706 warn!(
707 "Still waiting {} seconds for transactions {:?} to commit in consensus",
708 total_wait, transaction_keys
709 );
710 }
711 })))
712 } else {
713 None
714 };
715
716 if let Some(processed_waiter) = processed_waiter {
717 debug!("Submitting {:?} to consensus", transaction_keys);
718
719 guard.position = Some(position);
722 guard.positions_moved = Some(positions_moved);
723 guard.preceding_disconnected = Some(preceding_disconnected);
724
725 let _permit: SemaphorePermit = self
726 .submit_semaphore
727 .acquire()
728 .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
729 .await
730 .expect("Consensus adapter does not close semaphore");
731 let _in_flight_submission_guard =
732 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
733
734 let submit_inner = async {
738 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
739
740 loop {
741 let status_waiter = self
744 .submit_inner(
745 &transactions,
746 epoch_store,
747 &transaction_keys,
748 tx_type,
749 is_soft_bundle,
750 )
751 .await;
752
753 match status_waiter.await {
754 Ok(BlockStatus::Sequenced(_)) => {
755 self.metrics
756 .sequencing_certificate_status
757 .with_label_values(&[tx_type, "sequenced"])
758 .inc();
759 trace!(
762 "Transaction {transaction_keys:?} has been sequenced by consensus."
763 );
764 break;
765 }
766 Ok(BlockStatus::GarbageCollected(_)) => {
767 self.metrics
768 .sequencing_certificate_status
769 .with_label_values(&[tx_type, "garbage_collected"])
770 .inc();
771 debug!(
779 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
780 );
781 time::sleep(RETRY_DELAY_STEP).await;
782 continue;
783 }
784 Err(err) => {
785 warn!(
786 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
787 err
788 );
789 time::sleep(RETRY_DELAY_STEP).await;
790 continue;
791 }
792 }
793 }
794 };
795
796 guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
797 Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
798 Either::Right(((), processed_waiter)) => {
799 debug!("Submitted {transaction_keys:?} to consensus");
800 processed_waiter.await
801 }
802 };
803 }
804 debug!("{transaction_keys:?} processed by consensus");
805
806 let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
807 epoch_store
808 .remove_pending_consensus_transactions(&consensus_keys)
809 .expect("Storage error when removing consensus transaction");
810
811 let is_user_tx = is_soft_bundle
812 || matches!(
813 transactions[0].kind,
814 ConsensusTransactionKind::CertifiedTransaction(_)
815 );
816 let send_end_of_publish = if is_user_tx {
817 if epoch_store
827 .get_reconfig_state_read_lock_guard()
828 .is_reject_user_certs()
829 {
830 let pending_count = epoch_store.pending_consensus_certificates_count();
831 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
832 pending_count == 0 } else {
834 false
835 }
836 } else {
837 false
838 };
839 if send_end_of_publish {
840 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
842 if let Err(err) = self.submit(
843 ConsensusTransaction::new_end_of_publish(self.authority),
844 None,
845 epoch_store,
846 ) {
847 warn!("Error when sending end of publish message: {:?}", err);
848 }
849 }
850 self.metrics
851 .sequencing_certificate_success
852 .with_label_values(&[tx_type])
853 .inc();
854 }
855
856 async fn submit_inner(
857 self: &Arc<Self>,
858 transactions: &[ConsensusTransaction],
859 epoch_store: &Arc<AuthorityPerEpochStore>,
860 transaction_keys: &[SequencedConsensusTransactionKey],
861 tx_type: &str,
862 is_soft_bundle: bool,
863 ) -> BlockStatusReceiver {
864 let ack_start = Instant::now();
865 let mut retries: u32 = 0;
866
867 let status_waiter = loop {
868 match self
869 .consensus_client
870 .submit(transactions, epoch_store)
871 .await
872 {
873 Err(err) => {
874 if retries > 30
877 || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
878 {
879 warn!(
880 "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
881 );
882 }
883 self.metrics
884 .sequencing_certificate_failures
885 .with_label_values(&[tx_type])
886 .inc();
887 retries += 1;
888
889 if !is_soft_bundle && transactions[0].kind.is_dkg() {
890 time::sleep(Duration::from_millis(100)).await;
893 } else {
894 time::sleep(Duration::from_secs(10)).await;
895 };
896 }
897 Ok(status_waiter) => {
898 break status_waiter;
899 }
900 }
901 };
902
903 let bucket = match retries {
907 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
909 21..=50 => "between_20_and_50".to_string(),
910 51..=100 => "between_50_and_100".to_string(),
911 _ => "over_100".to_string(),
912 };
913
914 self.metrics
915 .sequencing_acknowledge_latency
916 .with_label_values(&[bucket.as_str(), tx_type])
917 .observe(ack_start.elapsed().as_secs_f64());
918
919 status_waiter
920 }
921
922 async fn await_consensus_or_checkpoint(
927 self: &Arc<Self>,
928 transaction_keys: Vec<SequencedConsensusTransactionKey>,
929 epoch_store: &Arc<AuthorityPerEpochStore>,
930 ) -> ProcessedMethod {
931 let notifications = FuturesUnordered::new();
932 for transaction_key in transaction_keys {
933 let transaction_digests = if let SequencedConsensusTransactionKey::External(
934 ConsensusTransactionKey::Certificate(digest),
935 ) = transaction_key
936 {
937 vec![digest]
938 } else {
939 vec![]
940 };
941
942 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
943 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
944 ) = transaction_key
945 {
946 Either::Left(epoch_store.synced_checkpoint_notify(checkpoint_sequence_number))
951 } else {
952 Either::Right(future::pending())
953 };
954
955 notifications.push(async move {
962 tokio::select! {
963 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
964 processed.expect("Storage error when waiting for consensus message processed");
965 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
966 return ProcessedMethod::Consensus;
967 },
968 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
969 processed.expect("Storage error when waiting for transaction executed in checkpoint");
970 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
971 }
972 processed = checkpoint_synced_future => {
973 processed.expect("Error when waiting for checkpoint sequence number");
974 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
975 }
976 }
977 ProcessedMethod::Checkpoint
978 });
979 }
980
981 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
982 for method in processed_methods {
983 if method == ProcessedMethod::Checkpoint {
984 return ProcessedMethod::Checkpoint;
985 }
986 }
987 ProcessedMethod::Consensus
988 }
989}
990
991impl CheckConnection for ConnectionMonitorStatus {
992 fn check_connection(
993 &self,
994 ourself: &AuthorityName,
995 authority: &AuthorityName,
996 ) -> Option<ConnectionStatus> {
997 if ourself == authority {
998 return Some(ConnectionStatus::Connected);
999 }
1000
1001 let mapping = self.authority_names_to_peer_ids.load_full();
1002 let peer_id = match mapping.get(authority) {
1003 Some(p) => p,
1004 None => {
1005 warn!(
1006 "failed to find peer {:?} in connection monitor listener",
1007 authority
1008 );
1009 return None;
1010 }
1011 };
1012
1013 let res = match self.connection_statuses.try_get(peer_id) {
1014 TryResult::Present(c) => Some(c.value().clone()),
1015 TryResult::Absent => None,
1016 TryResult::Locked => {
1017 Some(ConnectionStatus::Disconnected)
1019 }
1020 };
1021 res
1022 }
1023 fn update_mapping_for_epoch(
1024 &self,
1025 authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1026 ) {
1027 self.authority_names_to_peer_ids
1028 .swap(Arc::new(authority_names_to_peer_ids));
1029 }
1030}
1031
1032impl CheckConnection for ConnectionMonitorStatusForTests {
1033 fn check_connection(
1034 &self,
1035 _ourself: &AuthorityName,
1036 _authority: &AuthorityName,
1037 ) -> Option<ConnectionStatus> {
1038 Some(ConnectionStatus::Connected)
1039 }
1040 fn update_mapping_for_epoch(
1041 &self,
1042 _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1043 ) {
1044 }
1045}
1046
1047pub fn get_position_in_list(
1048 search_authority: AuthorityName,
1049 positions: Vec<AuthorityName>,
1050) -> usize {
1051 positions
1052 .into_iter()
1053 .find_position(|authority| *authority == search_authority)
1054 .expect("Couldn't find ourselves in shuffled committee")
1055 .0
1056}
1057
1058impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1059 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1064 let send_end_of_publish = {
1065 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1066 if !reconfig_guard.should_accept_user_certs() {
1067 return;
1069 }
1070 let pending_count = epoch_store.pending_consensus_certificates_count();
1071 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1072 let send_end_of_publish = pending_count == 0;
1073 epoch_store.close_user_certs(reconfig_guard);
1074 send_end_of_publish
1075 };
1077 if send_end_of_publish {
1078 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1079 if let Err(err) = self.submit(
1080 ConsensusTransaction::new_end_of_publish(self.authority),
1081 None,
1082 epoch_store,
1083 ) {
1084 warn!("Error when sending end of publish message: {:?}", err);
1085 }
1086 }
1087 }
1088}
1089
1090struct CancelOnDrop<T>(JoinHandle<T>);
1091
1092impl<T> Deref for CancelOnDrop<T> {
1093 type Target = JoinHandle<T>;
1094
1095 fn deref(&self) -> &Self::Target {
1096 &self.0
1097 }
1098}
1099
1100impl<T> Drop for CancelOnDrop<T> {
1101 fn drop(&mut self) {
1102 self.0.abort();
1103 }
1104}
1105
1106struct InflightDropGuard<'a> {
1108 adapter: &'a ConsensusAdapter,
1109 start: Instant,
1110 position: Option<usize>,
1111 positions_moved: Option<usize>,
1112 preceding_disconnected: Option<usize>,
1113 tx_type: &'static str,
1114 processed_method: ProcessedMethod,
1115}
1116
1117#[derive(PartialEq, Eq)]
1118enum ProcessedMethod {
1119 Consensus,
1120 Checkpoint,
1121}
1122
1123impl<'a> InflightDropGuard<'a> {
1124 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1125 adapter
1126 .num_inflight_transactions
1127 .fetch_add(1, Ordering::SeqCst);
1128 adapter
1129 .metrics
1130 .sequencing_certificate_inflight
1131 .with_label_values(&[tx_type])
1132 .inc();
1133 adapter
1134 .metrics
1135 .sequencing_certificate_attempt
1136 .with_label_values(&[tx_type])
1137 .inc();
1138 Self {
1139 adapter,
1140 start: Instant::now(),
1141 position: None,
1142 positions_moved: None,
1143 preceding_disconnected: None,
1144 tx_type,
1145 processed_method: ProcessedMethod::Consensus,
1146 }
1147 }
1148}
1149
1150impl Drop for InflightDropGuard<'_> {
1151 fn drop(&mut self) {
1152 self.adapter
1153 .num_inflight_transactions
1154 .fetch_sub(1, Ordering::SeqCst);
1155 self.adapter
1156 .metrics
1157 .sequencing_certificate_inflight
1158 .with_label_values(&[self.tx_type])
1159 .dec();
1160
1161 let position = if let Some(position) = self.position {
1162 self.adapter
1163 .metrics
1164 .sequencing_certificate_authority_position
1165 .observe(position as f64);
1166 position.to_string()
1167 } else {
1168 "not_submitted".to_string()
1169 };
1170
1171 if let Some(positions_moved) = self.positions_moved {
1172 self.adapter
1173 .metrics
1174 .sequencing_certificate_positions_moved
1175 .observe(positions_moved as f64);
1176 };
1177
1178 if let Some(preceding_disconnected) = self.preceding_disconnected {
1179 self.adapter
1180 .metrics
1181 .sequencing_certificate_preceding_disconnected
1182 .observe(preceding_disconnected as f64);
1183 };
1184
1185 let latency = self.start.elapsed();
1186 let processed_method = match self.processed_method {
1187 ProcessedMethod::Consensus => "processed_via_consensus",
1188 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1189 };
1190 self.adapter
1191 .metrics
1192 .sequencing_certificate_latency
1193 .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1194 .observe(latency.as_secs_f64());
1195
1196 if self.position == Some(0) {
1202 let sampled = matches!(
1205 self.tx_type,
1206 "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1207 );
1208 if sampled && self.processed_method == ProcessedMethod::Consensus {
1211 self.adapter.latency_observer.report(latency);
1212 }
1213 }
1214 }
1215}
1216
1217impl SubmitToConsensus for Arc<ConsensusAdapter> {
1218 fn submit_to_consensus(
1219 &self,
1220 transactions: &[ConsensusTransaction],
1221 epoch_store: &Arc<AuthorityPerEpochStore>,
1222 ) -> IotaResult {
1223 self.submit_batch(transactions, None, epoch_store)
1224 .map(|_| ())
1225 }
1226}
1227
1228pub fn position_submit_certificate(
1229 committee: &Committee,
1230 ourselves: &AuthorityName,
1231 tx_digest: &TransactionDigest,
1232) -> usize {
1233 let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1234 get_position_in_list(*ourselves, validators)
1235}
1236
1237#[cfg(test)]
1238mod adapter_tests {
1239 use std::{sync::Arc, time::Duration};
1240
1241 use fastcrypto::traits::KeyPair;
1242 use iota_types::{
1243 base_types::TransactionDigest,
1244 committee::Committee,
1245 crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1246 };
1247 use rand::{Rng, SeedableRng, rngs::StdRng};
1248
1249 use super::position_submit_certificate;
1250 use crate::{
1251 consensus_adapter::{
1252 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1253 },
1254 mysticeti_adapter::LazyMysticetiClient,
1255 };
1256
1257 fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1258 let authorities = (0..size)
1259 .map(|_k| {
1260 (
1261 AuthorityPublicKeyBytes::from(
1262 get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1263 ),
1264 rng.gen_range(0u64..10u64),
1265 )
1266 })
1267 .collect::<Vec<_>>();
1268 Committee::new_for_testing_with_normalized_voting_power(
1269 0,
1270 authorities.iter().cloned().collect(),
1271 )
1272 }
1273
1274 #[tokio::test]
1275 async fn test_await_submit_delay_user_transaction() {
1276 let mut rng = StdRng::from_seed([0; 32]);
1278 let committee = test_committee(&mut rng, 10);
1279
1280 let consensus_adapter = ConsensusAdapter::new(
1282 Arc::new(LazyMysticetiClient::new()),
1283 *committee.authority_by_index(0).unwrap(),
1284 Arc::new(ConnectionMonitorStatusForTests {}),
1285 100_000,
1286 100_000,
1287 Some(1),
1288 Some(Duration::from_secs(2)),
1289 ConsensusAdapterMetrics::new_test(),
1290 );
1291
1292 let tx_digest = TransactionDigest::generate(&mut rng);
1294
1295 let (position, positions_moved, _) =
1297 consensus_adapter.submission_position(&committee, &tx_digest);
1298 assert_eq!(position, 7);
1299 assert!(!positions_moved > 0);
1300
1301 let (delay_step, position, positions_moved, _) =
1303 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1304
1305 assert_eq!(position, 1);
1306 assert_eq!(delay_step, Duration::from_secs(2));
1307 assert!(!positions_moved > 0);
1308
1309 let consensus_adapter = ConsensusAdapter::new(
1311 Arc::new(LazyMysticetiClient::new()),
1312 *committee.authority_by_index(0).unwrap(),
1313 Arc::new(ConnectionMonitorStatusForTests {}),
1314 100_000,
1315 100_000,
1316 None,
1317 None,
1318 ConsensusAdapterMetrics::new_test(),
1319 );
1320
1321 let (delay_step, position, positions_moved, _) =
1322 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1323
1324 assert_eq!(position, 7);
1325
1326 assert_eq!(delay_step, Duration::from_secs(14));
1328 assert!(!positions_moved > 0);
1329 }
1330
1331 #[test]
1332 fn test_position_submit_certificate() {
1333 let mut rng = StdRng::from_seed([0; 32]);
1335 let committee = test_committee(&mut rng, 10);
1336
1337 const NUM_TEST_TRANSACTIONS: usize = 1000;
1339
1340 for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1341 let tx_digest = TransactionDigest::generate(&mut rng);
1342
1343 let mut zero_found = false;
1344 for (name, _) in committee.members() {
1345 let f = position_submit_certificate(&committee, name, &tx_digest);
1346 assert!(f < committee.num_members());
1347 if f == 0 {
1348 assert!(!zero_found);
1350 zero_found = true;
1351 }
1352 }
1353 assert!(zero_found);
1354 }
1355 }
1356}