1use std::{
6 collections::HashMap,
7 future::Future,
8 ops::Deref,
9 sync::{
10 Arc,
11 atomic::{AtomicU64, Ordering},
12 },
13 time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use consensus_core::BlockStatus;
18use dashmap::{DashMap, try_result::TryResult};
19use futures::{
20 FutureExt, StreamExt,
21 future::{self, Either, select},
22 pin_mut,
23 stream::FuturesUnordered,
24};
25use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, spawn_monitored_task};
26use iota_simulator::anemo::PeerId;
27use iota_types::{
28 base_types::{AuthorityName, TransactionDigest},
29 committee::Committee,
30 error::{IotaError, IotaResult},
31 fp_ensure,
32 messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
33};
34use itertools::Itertools;
35use parking_lot::RwLockReadGuard;
36use prometheus::{
37 Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
38 register_histogram_vec_with_registry, register_histogram_with_registry,
39 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
40 register_int_gauge_with_registry,
41};
42use tokio::{
43 sync::{Semaphore, SemaphorePermit, oneshot},
44 task::JoinHandle,
45 time::{self, Duration},
46};
47use tracing::{debug, info, trace, warn};
48
49use crate::{
50 authority::authority_per_epoch_store::AuthorityPerEpochStore,
51 connection_monitor::ConnectionStatus,
52 consensus_handler::{SequencedConsensusTransactionKey, classify},
53 epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
54 metrics::LatencyObserver,
55};
56
57#[cfg(test)]
58#[path = "unit_tests/consensus_tests.rs"]
59pub mod consensus_tests;
60
61const SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS: &[f64] = &[
62 0.1, 0.25, 0.5, 0.75, 1., 1.25, 1.5, 1.75, 2., 2.25, 2.5, 2.75, 3., 4., 5., 6., 7., 10., 15.,
63 20., 25., 30., 60., 90., 120., 150., 180., 210., 240., 270., 300.,
64];
65
66const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
67 0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
68];
69
70pub struct ConsensusAdapterMetrics {
71 pub sequencing_certificate_attempt: IntCounterVec,
73 pub sequencing_certificate_success: IntCounterVec,
74 pub sequencing_certificate_failures: IntCounterVec,
75 pub sequencing_certificate_status: IntCounterVec,
76 pub sequencing_certificate_inflight: IntGaugeVec,
77 pub sequencing_acknowledge_latency: HistogramVec,
78 pub sequencing_certificate_latency: HistogramVec,
79 pub sequencing_certificate_authority_position: Histogram,
80 pub sequencing_certificate_positions_moved: Histogram,
81 pub sequencing_certificate_preceding_disconnected: Histogram,
82 pub sequencing_certificate_processed: IntCounterVec,
83 pub sequencing_in_flight_semaphore_wait: IntGauge,
84 pub sequencing_in_flight_submissions: IntGauge,
85 pub sequencing_estimated_latency: IntGauge,
86 pub sequencing_resubmission_interval_ms: IntGauge,
87}
88
89impl ConsensusAdapterMetrics {
90 pub fn new(registry: &Registry) -> Self {
91 Self {
92 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
93 "sequencing_certificate_attempt",
94 "Counts the number of certificates the validator attempts to sequence.",
95 &["tx_type"],
96 registry,
97 )
98 .unwrap(),
99 sequencing_certificate_success: register_int_counter_vec_with_registry!(
100 "sequencing_certificate_success",
101 "Counts the number of successfully sequenced certificates.",
102 &["tx_type"],
103 registry,
104 )
105 .unwrap(),
106 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
107 "sequencing_certificate_failures",
108 "Counts the number of sequenced certificates that failed other than by timeout.",
109 &["tx_type"],
110 registry,
111 )
112 .unwrap(),
113 sequencing_certificate_status: register_int_counter_vec_with_registry!(
114 "sequencing_certificate_status",
115 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
116 &["tx_type", "status"],
117 registry,
118 )
119 .unwrap(),
120 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
121 "sequencing_certificate_inflight",
122 "The inflight requests to sequence certificates.",
123 &["tx_type"],
124 registry,
125 )
126 .unwrap(),
127 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
128 "sequencing_acknowledge_latency",
129 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
130 &["retry", "tx_type"],
131 SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS.to_vec(),
132 registry,
133 )
134 .unwrap(),
135 sequencing_certificate_latency: register_histogram_vec_with_registry!(
136 "sequencing_certificate_latency",
137 "The latency for sequencing a certificate.",
138 &["position", "tx_type", "processed_method"],
139 SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS.to_vec(),
140 registry,
141 )
142 .unwrap(),
143 sequencing_certificate_authority_position: register_histogram_with_registry!(
144 "sequencing_certificate_authority_position",
145 "The position of the authority when submitted a certificate to consensus.",
146 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
147 registry,
148 )
149 .unwrap(),
150 sequencing_certificate_positions_moved: register_histogram_with_registry!(
151 "sequencing_certificate_positions_moved",
152 "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
153 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
154 registry,
155 )
156 .unwrap(),
157 sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
158 "sequencing_certificate_preceding_disconnected",
159 "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
160 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
161 registry,
162 )
163 .unwrap(),
164 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
165 "sequencing_certificate_processed",
166 "The number of certificates that have been processed either by consensus or checkpoint.",
167 &["source"],
168 registry
169 )
170 .unwrap(),
171 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
172 "sequencing_in_flight_semaphore_wait",
173 "How many requests are blocked on submit_permit.",
174 registry,
175 )
176 .unwrap(),
177 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
178 "sequencing_in_flight_submissions",
179 "Number of transactions submitted to local consensus instance and not yet sequenced",
180 registry,
181 )
182 .unwrap(),
183 sequencing_estimated_latency: register_int_gauge_with_registry!(
184 "sequencing_estimated_latency",
185 "Consensus latency estimated by consensus adapter in milliseconds",
186 registry,
187 )
188 .unwrap(),
189 sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
190 "sequencing_resubmission_interval_ms",
191 "Resubmission interval used by consensus adapter in milliseconds",
192 registry,
193 )
194 .unwrap(),
195 }
196 }
197
198 pub fn new_test() -> Self {
199 Self::new(&Registry::default())
200 }
201}
202
203pub type BlockStatusReceiver = oneshot::Receiver<BlockStatus>;
204
205#[mockall::automock]
206#[async_trait::async_trait]
207pub trait SubmitToConsensus: Sync + Send + 'static {
208 async fn submit_to_consensus(
209 &self,
210 transactions: &[ConsensusTransaction],
211 epoch_store: &Arc<AuthorityPerEpochStore>,
212 ) -> IotaResult;
213}
214
215#[mockall::automock]
216#[async_trait::async_trait]
217pub trait ConsensusClient: Sync + Send + 'static {
218 async fn submit(
219 &self,
220 transactions: &[ConsensusTransaction],
221 epoch_store: &Arc<AuthorityPerEpochStore>,
222 ) -> IotaResult<BlockStatusReceiver>;
223}
224
225pub struct ConsensusAdapter {
227 consensus_client: Arc<dyn ConsensusClient>,
229 authority: AuthorityName,
231 max_pending_transactions: usize,
233 num_inflight_transactions: AtomicU64,
235 max_submit_position: Option<usize>,
239 submit_delay_step_override: Option<Duration>,
242 connection_monitor_status: Arc<dyn CheckConnection>,
245 low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
247 metrics: ConsensusAdapterMetrics,
249 submit_semaphore: Semaphore,
251 latency_observer: LatencyObserver,
252}
253
254pub trait CheckConnection: Send + Sync {
255 fn check_connection(
256 &self,
257 ourself: &AuthorityName,
258 authority: &AuthorityName,
259 ) -> Option<ConnectionStatus>;
260 fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
261}
262
263pub struct ConnectionMonitorStatus {
264 pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
266 pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
268}
269
270pub struct ConnectionMonitorStatusForTests {}
271
272impl ConsensusAdapter {
273 pub fn new(
275 consensus_client: Arc<dyn ConsensusClient>,
276 authority: AuthorityName,
277 connection_monitor_status: Arc<dyn CheckConnection>,
278 max_pending_transactions: usize,
279 max_pending_local_submissions: usize,
280 max_submit_position: Option<usize>,
281 submit_delay_step_override: Option<Duration>,
282 metrics: ConsensusAdapterMetrics,
283 ) -> Self {
284 let num_inflight_transactions = Default::default();
285 let low_scoring_authorities =
286 ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
287 Self {
288 consensus_client,
289 authority,
290 max_pending_transactions,
291 max_submit_position,
292 submit_delay_step_override,
293 num_inflight_transactions,
294 connection_monitor_status,
295 low_scoring_authorities,
296 metrics,
297 submit_semaphore: Semaphore::new(max_pending_local_submissions),
298 latency_observer: LatencyObserver::new(),
299 }
300 }
301
302 pub fn swap_low_scoring_authorities(
303 &self,
304 new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
305 ) {
306 self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
307 }
308
309 pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
312 let mut recovered = epoch_store.get_all_pending_consensus_transactions();
318
319 #[expect(clippy::collapsible_if)] if epoch_store
321 .get_reconfig_state_read_lock_guard()
322 .is_reject_user_certs()
323 && epoch_store.pending_consensus_certificates_empty()
324 {
325 if recovered
326 .iter()
327 .any(ConsensusTransaction::is_end_of_publish)
328 {
329 recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
338 }
339 }
340 debug!(
341 "Submitting {:?} recovered pending consensus transactions to consensus",
342 recovered.len()
343 );
344 for transaction in recovered {
345 if transaction.is_end_of_publish() {
346 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
347 }
348 self.submit_unchecked(&[transaction], epoch_store);
349 }
350 }
351
352 fn await_submit_delay(
353 &self,
354 committee: &Committee,
355 transactions: &[ConsensusTransaction],
356 ) -> (impl Future<Output = ()>, usize, usize, usize) {
357 let min_digest = transactions
359 .iter()
360 .filter_map(|tx| match &tx.kind {
361 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
362 Some(certificate.digest())
363 }
364 _ => None,
365 })
366 .min();
367
368 let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
369 Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
370 _ => (Duration::ZERO, 0, 0, 0),
371 };
372 (
373 tokio::time::sleep(duration),
374 position,
375 positions_moved,
376 preceding_disconnected,
377 )
378 }
379
380 fn await_submit_delay_user_transaction(
381 &self,
382 committee: &Committee,
383 tx_digest: &TransactionDigest,
384 ) -> (Duration, usize, usize, usize) {
385 let (position, positions_moved, preceding_disconnected) =
386 self.submission_position(committee, tx_digest);
387
388 const DEFAULT_LATENCY: Duration = Duration::from_secs(1); const MIN_LATENCY: Duration = Duration::from_millis(150);
390 const MAX_LATENCY: Duration = Duration::from_secs(3);
391
392 let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
393 self.metrics
394 .sequencing_estimated_latency
395 .set(latency.as_millis() as i64);
396
397 let latency = std::cmp::max(latency, MIN_LATENCY);
398 let latency = std::cmp::min(latency, MAX_LATENCY);
399 let latency = latency * 2;
400 let (delay_step, position) =
401 self.override_by_max_submit_position_settings(latency, position);
402
403 self.metrics
404 .sequencing_resubmission_interval_ms
405 .set(delay_step.as_millis() as i64);
406
407 (
408 delay_step * position as u32,
409 position,
410 positions_moved,
411 preceding_disconnected,
412 )
413 }
414
415 fn override_by_max_submit_position_settings(
421 &self,
422 latency: Duration,
423 mut position: usize,
424 ) -> (Duration, usize) {
425 if let Some(max_submit_position) = self.max_submit_position {
427 position = std::cmp::min(position, max_submit_position);
428 }
429
430 let delay_step = self.submit_delay_step_override.unwrap_or(latency);
431 (delay_step, position)
432 }
433
434 fn submission_position(
444 &self,
445 committee: &Committee,
446 tx_digest: &TransactionDigest,
447 ) -> (usize, usize, usize) {
448 let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
449
450 self.check_submission_wrt_connectivity_and_scores(positions)
451 }
452
453 fn check_submission_wrt_connectivity_and_scores(
480 &self,
481 positions: Vec<AuthorityName>,
482 ) -> (usize, usize, usize) {
483 let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
484 if low_scoring_authorities.get(&self.authority).is_some() {
485 return (positions.len(), 0, 0);
486 }
487 let initial_position = get_position_in_list(self.authority, positions.clone());
488 let mut preceding_disconnected = 0;
489 let mut before_our_position = true;
490
491 let filtered_positions: Vec<_> = positions
492 .into_iter()
493 .filter(|authority| {
494 let keep = self.authority == *authority; if keep {
496 before_our_position = false;
497 }
498
499 let connected = self
501 .connection_monitor_status
502 .check_connection(&self.authority, authority)
503 .unwrap_or(ConnectionStatus::Disconnected)
504 == ConnectionStatus::Connected;
505 if !connected && before_our_position {
506 preceding_disconnected += 1; }
508
509 let high_scoring = low_scoring_authorities.get(authority).is_none();
511
512 keep || (connected && high_scoring)
513 })
514 .collect();
515
516 let position = get_position_in_list(self.authority, filtered_positions);
517
518 (
519 position,
520 initial_position - position,
521 preceding_disconnected,
522 )
523 }
524
525 pub fn submit(
536 self: &Arc<Self>,
537 transaction: ConsensusTransaction,
538 lock: Option<&RwLockReadGuard<ReconfigState>>,
539 epoch_store: &Arc<AuthorityPerEpochStore>,
540 ) -> IotaResult<JoinHandle<()>> {
541 self.submit_batch(&[transaction], lock, epoch_store)
542 }
543
544 pub fn submit_batch(
545 self: &Arc<Self>,
546 transactions: &[ConsensusTransaction],
547 lock: Option<&RwLockReadGuard<ReconfigState>>,
548 epoch_store: &Arc<AuthorityPerEpochStore>,
549 ) -> IotaResult<JoinHandle<()>> {
550 if transactions.len() > 1 {
551 for transaction in transactions {
555 fp_ensure!(
556 matches!(
557 transaction.kind,
558 ConsensusTransactionKind::CertifiedTransaction(_)
559 ),
560 IotaError::InvalidTxKindInSoftBundle
561 );
562 }
563 }
564
565 epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
566 Ok(self.submit_unchecked(transactions, epoch_store))
567 }
568
569 pub fn check_limits(&self) -> bool {
572 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
574 > self.max_pending_transactions
575 {
576 return false;
577 }
578 self.submit_semaphore.available_permits() > 0
580 }
581
582 pub(crate) fn check_consensus_overload(&self) -> IotaResult {
583 fp_ensure!(
584 self.check_limits(),
585 IotaError::TooManyTransactionsPendingConsensus
586 );
587 Ok(())
588 }
589
590 fn submit_unchecked(
591 self: &Arc<Self>,
592 transactions: &[ConsensusTransaction],
593 epoch_store: &Arc<AuthorityPerEpochStore>,
594 ) -> JoinHandle<()> {
595 let async_stage = self
598 .clone()
599 .submit_and_wait(transactions.to_vec(), epoch_store.clone());
600 let join_handle = spawn_monitored_task!(async_stage);
603 join_handle
604 }
605
606 async fn submit_and_wait(
607 self: Arc<Self>,
608 transactions: Vec<ConsensusTransaction>,
609 epoch_store: Arc<AuthorityPerEpochStore>,
610 ) {
611 epoch_store
625 .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
626 .await
627 .ok(); }
630
631 async fn submit_and_wait_inner(
632 self: Arc<Self>,
633 transactions: Vec<ConsensusTransaction>,
634 epoch_store: &Arc<AuthorityPerEpochStore>,
635 ) {
636 if transactions.is_empty() {
637 return;
638 }
639
640 let is_soft_bundle = transactions.len() > 1;
647
648 let mut transaction_keys = Vec::new();
649
650 for transaction in &transactions {
651 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
652 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
653 epoch_store.record_epoch_pending_certs_process_time_metric();
654 }
655
656 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
657 transaction_keys.push(transaction_key);
658 }
659 let tx_type = if !is_soft_bundle {
660 classify(&transactions[0])
661 } else {
662 "soft_bundle"
663 };
664
665 let mut guard = InflightDropGuard::acquire(&self, tx_type);
666
667 let (await_submit, position, positions_moved, preceding_disconnected) =
669 self.await_submit_delay(epoch_store.committee(), &transactions[..]);
670
671 let processed_via_consensus_or_checkpoint =
674 self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
675 pin_mut!(processed_via_consensus_or_checkpoint);
676
677 let processed_waiter = tokio::select! {
678 _ = await_submit => Some(processed_via_consensus_or_checkpoint),
680
681 _ = epoch_store.user_certs_closed_notify() => {
683 warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
684 Some(processed_via_consensus_or_checkpoint)
685 }
686
687 _ = &mut processed_via_consensus_or_checkpoint => {
689 None
690 }
691 };
692
693 let _monitor = if !is_soft_bundle
695 && matches!(
696 transactions[0].kind,
697 ConsensusTransactionKind::EndOfPublish(_)
698 | ConsensusTransactionKind::CapabilityNotificationV1(_)
699 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
700 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
701 ) {
702 let transaction_keys = transaction_keys.clone();
703 Some(CancelOnDrop(spawn_monitored_task!(async {
704 let mut i = 0u64;
705 loop {
706 i += 1;
707 const WARN_DELAY_S: u64 = 30;
708 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
709 let total_wait = i * WARN_DELAY_S;
710 warn!(
711 "Still waiting {} seconds for transactions {:?} to commit in consensus",
712 total_wait, transaction_keys
713 );
714 }
715 })))
716 } else {
717 None
718 };
719
720 if let Some(processed_waiter) = processed_waiter {
721 debug!("Submitting {:?} to consensus", transaction_keys);
722
723 guard.position = Some(position);
726 guard.positions_moved = Some(positions_moved);
727 guard.preceding_disconnected = Some(preceding_disconnected);
728
729 let _permit: SemaphorePermit = self
730 .submit_semaphore
731 .acquire()
732 .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
733 .await
734 .expect("Consensus adapter does not close semaphore");
735 let _in_flight_submission_guard =
736 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
737
738 let submit_inner = async {
742 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
743
744 loop {
745 let status_waiter = self
748 .submit_inner(
749 &transactions,
750 epoch_store,
751 &transaction_keys,
752 tx_type,
753 is_soft_bundle,
754 )
755 .await;
756
757 match status_waiter.await {
758 Ok(BlockStatus::Sequenced(_)) => {
759 self.metrics
760 .sequencing_certificate_status
761 .with_label_values(&[tx_type, "sequenced"])
762 .inc();
763 trace!(
766 "Transaction {transaction_keys:?} has been sequenced by consensus."
767 );
768 break;
769 }
770 Ok(BlockStatus::GarbageCollected(_)) => {
771 self.metrics
772 .sequencing_certificate_status
773 .with_label_values(&[tx_type, "garbage_collected"])
774 .inc();
775 debug!(
783 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
784 );
785 time::sleep(RETRY_DELAY_STEP).await;
786 continue;
787 }
788 Err(err) => {
789 warn!(
790 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
791 err
792 );
793 time::sleep(RETRY_DELAY_STEP).await;
794 continue;
795 }
796 }
797 }
798 };
799
800 guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
801 Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
802 Either::Right(((), processed_waiter)) => {
803 debug!("Submitted {transaction_keys:?} to consensus");
804 processed_waiter.await
805 }
806 };
807 }
808 debug!("{transaction_keys:?} processed by consensus");
809
810 let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
811 epoch_store
812 .remove_pending_consensus_transactions(&consensus_keys)
813 .expect("Storage error when removing consensus transaction");
814
815 let is_user_tx = is_soft_bundle
816 || matches!(
817 transactions[0].kind,
818 ConsensusTransactionKind::CertifiedTransaction(_)
819 );
820 let send_end_of_publish = if is_user_tx {
821 if epoch_store
831 .get_reconfig_state_read_lock_guard()
832 .is_reject_user_certs()
833 {
834 let pending_count = epoch_store.pending_consensus_certificates_count();
835 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
836 pending_count == 0 } else {
838 false
839 }
840 } else {
841 false
842 };
843 if send_end_of_publish {
844 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
846 if let Err(err) = self.submit(
847 ConsensusTransaction::new_end_of_publish(self.authority),
848 None,
849 epoch_store,
850 ) {
851 warn!("Error when sending end of publish message: {:?}", err);
852 }
853 }
854 self.metrics
855 .sequencing_certificate_success
856 .with_label_values(&[tx_type])
857 .inc();
858 }
859
860 async fn submit_inner(
861 self: &Arc<Self>,
862 transactions: &[ConsensusTransaction],
863 epoch_store: &Arc<AuthorityPerEpochStore>,
864 transaction_keys: &[SequencedConsensusTransactionKey],
865 tx_type: &str,
866 is_soft_bundle: bool,
867 ) -> BlockStatusReceiver {
868 let ack_start = Instant::now();
869 let mut retries: u32 = 0;
870
871 let status_waiter = loop {
872 match self
873 .consensus_client
874 .submit(transactions, epoch_store)
875 .await
876 {
877 Err(err) => {
878 if retries > 30
881 || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
882 {
883 warn!(
884 "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
885 );
886 }
887 self.metrics
888 .sequencing_certificate_failures
889 .with_label_values(&[tx_type])
890 .inc();
891 retries += 1;
892
893 if !is_soft_bundle && transactions[0].kind.is_dkg() {
894 time::sleep(Duration::from_millis(100)).await;
897 } else {
898 time::sleep(Duration::from_secs(10)).await;
899 };
900 }
901 Ok(status_waiter) => {
902 break status_waiter;
903 }
904 }
905 };
906
907 let bucket = match retries {
911 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
913 21..=50 => "between_20_and_50".to_string(),
914 51..=100 => "between_50_and_100".to_string(),
915 _ => "over_100".to_string(),
916 };
917
918 self.metrics
919 .sequencing_acknowledge_latency
920 .with_label_values(&[bucket.as_str(), tx_type])
921 .observe(ack_start.elapsed().as_secs_f64());
922
923 status_waiter
924 }
925
926 async fn await_consensus_or_checkpoint(
931 self: &Arc<Self>,
932 transaction_keys: Vec<SequencedConsensusTransactionKey>,
933 epoch_store: &Arc<AuthorityPerEpochStore>,
934 ) -> ProcessedMethod {
935 let notifications = FuturesUnordered::new();
936 for transaction_key in transaction_keys {
937 let transaction_digests = if let SequencedConsensusTransactionKey::External(
938 ConsensusTransactionKey::Certificate(digest),
939 ) = transaction_key
940 {
941 vec![digest]
942 } else {
943 vec![]
944 };
945
946 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
947 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
948 ) = transaction_key
949 {
950 Either::Left(epoch_store.synced_checkpoint_notify(checkpoint_sequence_number))
955 } else {
956 Either::Right(future::pending())
957 };
958
959 notifications.push(async move {
966 tokio::select! {
967 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
968 processed.expect("Storage error when waiting for consensus message processed");
969 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
970 return ProcessedMethod::Consensus;
971 },
972 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
973 processed.expect("Storage error when waiting for transaction executed in checkpoint");
974 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
975 }
976 processed = checkpoint_synced_future => {
977 processed.expect("Error when waiting for checkpoint sequence number");
978 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
979 }
980 }
981 ProcessedMethod::Checkpoint
982 });
983 }
984
985 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
986 for method in processed_methods {
987 if method == ProcessedMethod::Checkpoint {
988 return ProcessedMethod::Checkpoint;
989 }
990 }
991 ProcessedMethod::Consensus
992 }
993}
994
995impl CheckConnection for ConnectionMonitorStatus {
996 fn check_connection(
997 &self,
998 ourself: &AuthorityName,
999 authority: &AuthorityName,
1000 ) -> Option<ConnectionStatus> {
1001 if ourself == authority {
1002 return Some(ConnectionStatus::Connected);
1003 }
1004
1005 let mapping = self.authority_names_to_peer_ids.load_full();
1006 let peer_id = match mapping.get(authority) {
1007 Some(p) => p,
1008 None => {
1009 warn!(
1010 "failed to find peer {:?} in connection monitor listener",
1011 authority
1012 );
1013 return None;
1014 }
1015 };
1016
1017 let res = match self.connection_statuses.try_get(peer_id) {
1018 TryResult::Present(c) => Some(c.value().clone()),
1019 TryResult::Absent => None,
1020 TryResult::Locked => {
1021 Some(ConnectionStatus::Disconnected)
1023 }
1024 };
1025 res
1026 }
1027 fn update_mapping_for_epoch(
1028 &self,
1029 authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1030 ) {
1031 self.authority_names_to_peer_ids
1032 .swap(Arc::new(authority_names_to_peer_ids));
1033 }
1034}
1035
1036impl CheckConnection for ConnectionMonitorStatusForTests {
1037 fn check_connection(
1038 &self,
1039 _ourself: &AuthorityName,
1040 _authority: &AuthorityName,
1041 ) -> Option<ConnectionStatus> {
1042 Some(ConnectionStatus::Connected)
1043 }
1044 fn update_mapping_for_epoch(
1045 &self,
1046 _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1047 ) {
1048 }
1049}
1050
1051pub fn get_position_in_list(
1052 search_authority: AuthorityName,
1053 positions: Vec<AuthorityName>,
1054) -> usize {
1055 positions
1056 .into_iter()
1057 .find_position(|authority| *authority == search_authority)
1058 .expect("Couldn't find ourselves in shuffled committee")
1059 .0
1060}
1061
1062impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1063 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1068 let send_end_of_publish = {
1069 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1070 if !reconfig_guard.should_accept_user_certs() {
1071 return;
1073 }
1074 let pending_count = epoch_store.pending_consensus_certificates_count();
1075 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1076 let send_end_of_publish = pending_count == 0;
1077 epoch_store.close_user_certs(reconfig_guard);
1078 send_end_of_publish
1079 };
1081 if send_end_of_publish {
1082 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1083 if let Err(err) = self.submit(
1084 ConsensusTransaction::new_end_of_publish(self.authority),
1085 None,
1086 epoch_store,
1087 ) {
1088 warn!("Error when sending end of publish message: {:?}", err);
1089 }
1090 }
1091 }
1092}
1093
1094struct CancelOnDrop<T>(JoinHandle<T>);
1095
1096impl<T> Deref for CancelOnDrop<T> {
1097 type Target = JoinHandle<T>;
1098
1099 fn deref(&self) -> &Self::Target {
1100 &self.0
1101 }
1102}
1103
1104impl<T> Drop for CancelOnDrop<T> {
1105 fn drop(&mut self) {
1106 self.0.abort();
1107 }
1108}
1109
1110struct InflightDropGuard<'a> {
1112 adapter: &'a ConsensusAdapter,
1113 start: Instant,
1114 position: Option<usize>,
1115 positions_moved: Option<usize>,
1116 preceding_disconnected: Option<usize>,
1117 tx_type: &'static str,
1118 processed_method: ProcessedMethod,
1119}
1120
1121#[derive(PartialEq, Eq)]
1122enum ProcessedMethod {
1123 Consensus,
1124 Checkpoint,
1125}
1126
1127impl<'a> InflightDropGuard<'a> {
1128 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1129 adapter
1130 .num_inflight_transactions
1131 .fetch_add(1, Ordering::SeqCst);
1132 adapter
1133 .metrics
1134 .sequencing_certificate_inflight
1135 .with_label_values(&[tx_type])
1136 .inc();
1137 adapter
1138 .metrics
1139 .sequencing_certificate_attempt
1140 .with_label_values(&[tx_type])
1141 .inc();
1142 Self {
1143 adapter,
1144 start: Instant::now(),
1145 position: None,
1146 positions_moved: None,
1147 preceding_disconnected: None,
1148 tx_type,
1149 processed_method: ProcessedMethod::Consensus,
1150 }
1151 }
1152}
1153
1154impl Drop for InflightDropGuard<'_> {
1155 fn drop(&mut self) {
1156 self.adapter
1157 .num_inflight_transactions
1158 .fetch_sub(1, Ordering::SeqCst);
1159 self.adapter
1160 .metrics
1161 .sequencing_certificate_inflight
1162 .with_label_values(&[self.tx_type])
1163 .dec();
1164
1165 let position = if let Some(position) = self.position {
1166 self.adapter
1167 .metrics
1168 .sequencing_certificate_authority_position
1169 .observe(position as f64);
1170 position.to_string()
1171 } else {
1172 "not_submitted".to_string()
1173 };
1174
1175 if let Some(positions_moved) = self.positions_moved {
1176 self.adapter
1177 .metrics
1178 .sequencing_certificate_positions_moved
1179 .observe(positions_moved as f64);
1180 };
1181
1182 if let Some(preceding_disconnected) = self.preceding_disconnected {
1183 self.adapter
1184 .metrics
1185 .sequencing_certificate_preceding_disconnected
1186 .observe(preceding_disconnected as f64);
1187 };
1188
1189 let latency = self.start.elapsed();
1190 let processed_method = match self.processed_method {
1191 ProcessedMethod::Consensus => "processed_via_consensus",
1192 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1193 };
1194 self.adapter
1195 .metrics
1196 .sequencing_certificate_latency
1197 .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1198 .observe(latency.as_secs_f64());
1199
1200 if self.position == Some(0) {
1206 let sampled = matches!(
1209 self.tx_type,
1210 "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1211 );
1212 if sampled && self.processed_method == ProcessedMethod::Consensus {
1215 self.adapter.latency_observer.report(latency);
1216 }
1217 }
1218 }
1219}
1220
1221#[async_trait::async_trait]
1222impl SubmitToConsensus for Arc<ConsensusAdapter> {
1223 async fn submit_to_consensus(
1224 &self,
1225 transactions: &[ConsensusTransaction],
1226 epoch_store: &Arc<AuthorityPerEpochStore>,
1227 ) -> IotaResult {
1228 self.submit_batch(transactions, None, epoch_store)
1229 .map(|_| ())
1230 }
1231}
1232
1233pub fn position_submit_certificate(
1234 committee: &Committee,
1235 ourselves: &AuthorityName,
1236 tx_digest: &TransactionDigest,
1237) -> usize {
1238 let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1239 get_position_in_list(*ourselves, validators)
1240}
1241
1242#[cfg(test)]
1243mod adapter_tests {
1244 use std::{sync::Arc, time::Duration};
1245
1246 use fastcrypto::traits::KeyPair;
1247 use iota_types::{
1248 base_types::TransactionDigest,
1249 committee::Committee,
1250 crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1251 };
1252 use rand::{Rng, SeedableRng, rngs::StdRng};
1253
1254 use super::position_submit_certificate;
1255 use crate::{
1256 consensus_adapter::{
1257 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1258 },
1259 mysticeti_adapter::LazyMysticetiClient,
1260 };
1261
1262 fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1263 let authorities = (0..size)
1264 .map(|_k| {
1265 (
1266 AuthorityPublicKeyBytes::from(
1267 get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1268 ),
1269 rng.gen_range(0u64..10u64),
1270 )
1271 })
1272 .collect::<Vec<_>>();
1273 Committee::new_for_testing_with_normalized_voting_power(
1274 0,
1275 authorities.iter().cloned().collect(),
1276 )
1277 }
1278
1279 #[tokio::test]
1280 async fn test_await_submit_delay_user_transaction() {
1281 let mut rng = StdRng::from_seed([0; 32]);
1283 let committee = test_committee(&mut rng, 10);
1284
1285 let consensus_adapter = ConsensusAdapter::new(
1287 Arc::new(LazyMysticetiClient::new()),
1288 *committee.authority_by_index(0).unwrap(),
1289 Arc::new(ConnectionMonitorStatusForTests {}),
1290 100_000,
1291 100_000,
1292 Some(1),
1293 Some(Duration::from_secs(2)),
1294 ConsensusAdapterMetrics::new_test(),
1295 );
1296
1297 let tx_digest = TransactionDigest::generate(&mut rng);
1299
1300 let (position, positions_moved, _) =
1302 consensus_adapter.submission_position(&committee, &tx_digest);
1303 assert_eq!(position, 7);
1304 assert!(!positions_moved > 0);
1305
1306 let (delay_step, position, positions_moved, _) =
1308 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1309
1310 assert_eq!(position, 1);
1311 assert_eq!(delay_step, Duration::from_secs(2));
1312 assert!(!positions_moved > 0);
1313
1314 let consensus_adapter = ConsensusAdapter::new(
1316 Arc::new(LazyMysticetiClient::new()),
1317 *committee.authority_by_index(0).unwrap(),
1318 Arc::new(ConnectionMonitorStatusForTests {}),
1319 100_000,
1320 100_000,
1321 None,
1322 None,
1323 ConsensusAdapterMetrics::new_test(),
1324 );
1325
1326 let (delay_step, position, positions_moved, _) =
1327 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1328
1329 assert_eq!(position, 7);
1330
1331 assert_eq!(delay_step, Duration::from_secs(14));
1333 assert!(!positions_moved > 0);
1334 }
1335
1336 #[test]
1337 fn test_position_submit_certificate() {
1338 let mut rng = StdRng::from_seed([0; 32]);
1340 let committee = test_committee(&mut rng, 10);
1341
1342 const NUM_TEST_TRANSACTIONS: usize = 1000;
1344
1345 for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1346 let tx_digest = TransactionDigest::generate(&mut rng);
1347
1348 let mut zero_found = false;
1349 for (name, _) in committee.members() {
1350 let f = position_submit_certificate(&committee, name, &tx_digest);
1351 assert!(f < committee.num_members());
1352 if f == 0 {
1353 assert!(!zero_found);
1355 zero_found = true;
1356 }
1357 }
1358 assert!(zero_found);
1359 }
1360 }
1361}