1use std::{
6 collections::HashMap,
7 future::Future,
8 ops::Deref,
9 sync::{
10 Arc,
11 atomic::{AtomicU64, Ordering},
12 },
13 time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use dashmap::{DashMap, try_result::TryResult};
18use futures::{
19 FutureExt, StreamExt,
20 future::{self, Either, select},
21 pin_mut,
22 stream::FuturesUnordered,
23};
24use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task};
25use iota_simulator::anemo::PeerId;
26use iota_types::{
27 base_types::{AuthorityName, TransactionDigest},
28 committee::Committee,
29 error::{IotaError, IotaResult},
30 fp_ensure,
31 messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
32};
33use itertools::Itertools;
34use parking_lot::RwLockReadGuard;
35use prometheus::{
36 Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
37 register_histogram_vec_with_registry, register_histogram_with_registry,
38 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
39 register_int_gauge_with_registry,
40};
41use tokio::{
42 sync::{Semaphore, SemaphorePermit, oneshot},
43 task::JoinHandle,
44 time::{
45 Duration, {self},
46 },
47};
48use tracing::{debug, info, trace, warn};
49
50use crate::{
51 authority::authority_per_epoch_store::AuthorityPerEpochStore,
52 checkpoints::CheckpointStore,
53 connection_monitor::ConnectionStatus,
54 consensus_handler::{SequencedConsensusTransactionKey, classify},
55 epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
56 metrics::LatencyObserver,
57};
58
59#[cfg(test)]
60#[path = "unit_tests/consensus_tests.rs"]
61pub mod consensus_tests;
62
63const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
64 0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
65];
66
67pub struct ConsensusAdapterMetrics {
68 pub sequencing_certificate_attempt: IntCounterVec,
70 pub sequencing_certificate_success: IntCounterVec,
71 pub sequencing_certificate_failures: IntCounterVec,
72 pub sequencing_certificate_status: IntCounterVec,
73 pub sequencing_certificate_inflight: IntGaugeVec,
74 pub sequencing_acknowledge_latency: HistogramVec,
75 pub sequencing_certificate_latency: HistogramVec,
76 pub sequencing_certificate_authority_position: Histogram,
77 pub sequencing_certificate_positions_moved: Histogram,
78 pub sequencing_certificate_preceding_disconnected: Histogram,
79 pub sequencing_certificate_processed: IntCounterVec,
80 pub sequencing_in_flight_semaphore_wait: IntGauge,
81 pub sequencing_in_flight_submissions: IntGauge,
82 pub sequencing_estimated_latency: IntGauge,
83 pub sequencing_resubmission_interval_ms: IntGauge,
84}
85
86impl ConsensusAdapterMetrics {
87 pub fn new(registry: &Registry) -> Self {
88 Self {
89 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
90 "sequencing_certificate_attempt",
91 "Counts the number of certificates the validator attempts to sequence.",
92 &["tx_type"],
93 registry,
94 )
95 .unwrap(),
96 sequencing_certificate_success: register_int_counter_vec_with_registry!(
97 "sequencing_certificate_success",
98 "Counts the number of successfully sequenced certificates.",
99 &["tx_type"],
100 registry,
101 )
102 .unwrap(),
103 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
104 "sequencing_certificate_failures",
105 "Counts the number of sequenced certificates that failed other than by timeout.",
106 &["tx_type"],
107 registry,
108 )
109 .unwrap(),
110 sequencing_certificate_status: register_int_counter_vec_with_registry!(
111 "sequencing_certificate_status",
112 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
113 &["tx_type", "status"],
114 registry,
115 )
116 .unwrap(),
117 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
118 "sequencing_certificate_inflight",
119 "The inflight requests to sequence certificates.",
120 &["tx_type"],
121 registry,
122 )
123 .unwrap(),
124 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
125 "sequencing_acknowledge_latency",
126 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
127 &["retry", "tx_type"],
128 LATENCY_SEC_BUCKETS.to_vec(),
129 registry,
130 )
131 .unwrap(),
132 sequencing_certificate_latency: register_histogram_vec_with_registry!(
133 "sequencing_certificate_latency",
134 "The latency for sequencing a certificate.",
135 &["position", "tx_type", "processed_method"],
136 LATENCY_SEC_BUCKETS.to_vec(),
137 registry,
138 )
139 .unwrap(),
140 sequencing_certificate_authority_position: register_histogram_with_registry!(
141 "sequencing_certificate_authority_position",
142 "The position of the authority when submitted a certificate to consensus.",
143 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
144 registry,
145 )
146 .unwrap(),
147 sequencing_certificate_positions_moved: register_histogram_with_registry!(
148 "sequencing_certificate_positions_moved",
149 "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
150 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
151 registry,
152 )
153 .unwrap(),
154 sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155 "sequencing_certificate_preceding_disconnected",
156 "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158 registry,
159 )
160 .unwrap(),
161 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162 "sequencing_certificate_processed",
163 "The number of certificates that have been processed either by consensus or checkpoint.",
164 &["source"],
165 registry
166 )
167 .unwrap(),
168 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169 "sequencing_in_flight_semaphore_wait",
170 "How many requests are blocked on submit_permit.",
171 registry,
172 )
173 .unwrap(),
174 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175 "sequencing_in_flight_submissions",
176 "Number of transactions submitted to local consensus instance and not yet sequenced",
177 registry,
178 )
179 .unwrap(),
180 sequencing_estimated_latency: register_int_gauge_with_registry!(
181 "sequencing_estimated_latency",
182 "Consensus latency estimated by consensus adapter in milliseconds",
183 registry,
184 )
185 .unwrap(),
186 sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187 "sequencing_resubmission_interval_ms",
188 "Resubmission interval used by consensus adapter in milliseconds",
189 registry,
190 )
191 .unwrap(),
192 }
193 }
194
195 pub fn new_test() -> Self {
196 Self::new(&Registry::default())
197 }
198}
199
200pub enum BlockStatusInternal {
203 Sequenced,
204 GarbageCollected,
205}
206
207impl From<consensus_core::BlockStatus> for BlockStatusInternal {
208 fn from(status: consensus_core::BlockStatus) -> Self {
209 match status {
210 consensus_core::BlockStatus::Sequenced(_) => BlockStatusInternal::Sequenced,
211 consensus_core::BlockStatus::GarbageCollected(_) => {
212 BlockStatusInternal::GarbageCollected
213 }
214 }
215 }
216}
217impl From<starfish_core::BlockStatus> for BlockStatusInternal {
218 fn from(status: starfish_core::BlockStatus) -> Self {
219 match status {
220 starfish_core::BlockStatus::Sequenced(_) => BlockStatusInternal::Sequenced,
221 starfish_core::BlockStatus::GarbageCollected(_) => {
222 BlockStatusInternal::GarbageCollected
223 }
224 }
225 }
226}
227
228pub type BlockStatusReceiver = oneshot::Receiver<BlockStatusInternal>;
229
230#[mockall::automock]
231pub trait SubmitToConsensus: Sync + Send + 'static {
232 fn submit_to_consensus(
233 &self,
234 transactions: &[ConsensusTransaction],
235 epoch_store: &Arc<AuthorityPerEpochStore>,
236 ) -> IotaResult;
237}
238
239#[mockall::automock]
240#[async_trait::async_trait]
241pub trait ConsensusClient: Sync + Send + 'static {
242 async fn submit(
243 &self,
244 transactions: &[ConsensusTransaction],
245 epoch_store: &Arc<AuthorityPerEpochStore>,
246 ) -> IotaResult<BlockStatusReceiver>;
247}
248
249pub struct ConsensusAdapter {
251 consensus_client: Arc<dyn ConsensusClient>,
253 checkpoint_store: Arc<CheckpointStore>,
255 authority: AuthorityName,
257 max_pending_transactions: usize,
259 num_inflight_transactions: AtomicU64,
261 max_submit_position: Option<usize>,
265 submit_delay_step_override: Option<Duration>,
268 connection_monitor_status: Arc<dyn CheckConnection>,
271 low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
273 metrics: ConsensusAdapterMetrics,
275 submit_semaphore: Semaphore,
277 latency_observer: LatencyObserver,
278}
279
280pub trait CheckConnection: Send + Sync {
281 fn check_connection(
282 &self,
283 ourself: &AuthorityName,
284 authority: &AuthorityName,
285 ) -> Option<ConnectionStatus>;
286 fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
287}
288
289pub struct ConnectionMonitorStatus {
290 pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
292 pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
294}
295
296pub struct ConnectionMonitorStatusForTests {}
297
298impl ConsensusAdapter {
299 pub fn new(
301 consensus_client: Arc<dyn ConsensusClient>,
302 checkpoint_store: Arc<CheckpointStore>,
303 authority: AuthorityName,
304 connection_monitor_status: Arc<dyn CheckConnection>,
305 max_pending_transactions: usize,
306 max_pending_local_submissions: usize,
307 max_submit_position: Option<usize>,
308 submit_delay_step_override: Option<Duration>,
309 metrics: ConsensusAdapterMetrics,
310 ) -> Self {
311 let num_inflight_transactions = Default::default();
312 let low_scoring_authorities =
313 ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
314 Self {
315 consensus_client,
316 checkpoint_store,
317 authority,
318 max_pending_transactions,
319 max_submit_position,
320 submit_delay_step_override,
321 num_inflight_transactions,
322 connection_monitor_status,
323 low_scoring_authorities,
324 metrics,
325 submit_semaphore: Semaphore::new(max_pending_local_submissions),
326 latency_observer: LatencyObserver::new(),
327 }
328 }
329
330 pub fn swap_low_scoring_authorities(
331 &self,
332 new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
333 ) {
334 self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
335 }
336
337 pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
338 let mut recovered = epoch_store.get_all_pending_consensus_transactions();
344
345 #[expect(clippy::collapsible_if)] if epoch_store
347 .get_reconfig_state_read_lock_guard()
348 .is_reject_user_certs()
349 && epoch_store.pending_consensus_certificates_empty()
350 {
351 if recovered
352 .iter()
353 .any(ConsensusTransaction::is_end_of_publish)
354 {
355 recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
364 }
365 }
366 debug!(
367 "Submitting {:?} recovered pending consensus transactions to consensus",
368 recovered.len()
369 );
370 for transaction in recovered {
371 if transaction.is_end_of_publish() {
372 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
373 }
374 self.submit_unchecked(&[transaction], epoch_store);
375 }
376 }
377
378 fn await_submit_delay(
379 &self,
380 committee: &Committee,
381 transactions: &[ConsensusTransaction],
382 ) -> (impl Future<Output = ()>, usize, usize, usize) {
383 let min_digest = transactions
385 .iter()
386 .filter_map(|tx| match &tx.kind {
387 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
388 Some(certificate.digest())
389 }
390 _ => None,
391 })
392 .min();
393
394 let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
395 Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
396 _ => (Duration::ZERO, 0, 0, 0),
397 };
398 (
399 tokio::time::sleep(duration),
400 position,
401 positions_moved,
402 preceding_disconnected,
403 )
404 }
405
406 fn await_submit_delay_user_transaction(
407 &self,
408 committee: &Committee,
409 tx_digest: &TransactionDigest,
410 ) -> (Duration, usize, usize, usize) {
411 let (position, positions_moved, preceding_disconnected) =
412 self.submission_position(committee, tx_digest);
413
414 const DEFAULT_LATENCY: Duration = Duration::from_secs(1); const MIN_LATENCY: Duration = Duration::from_millis(150);
416 const MAX_LATENCY: Duration = Duration::from_secs(3);
417
418 let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
419 self.metrics
420 .sequencing_estimated_latency
421 .set(latency.as_millis() as i64);
422
423 let latency = std::cmp::max(latency, MIN_LATENCY);
424 let latency = std::cmp::min(latency, MAX_LATENCY);
425 let latency = latency * 2;
426 let (delay_step, position) =
427 self.override_by_max_submit_position_settings(latency, position);
428
429 self.metrics
430 .sequencing_resubmission_interval_ms
431 .set(delay_step.as_millis() as i64);
432
433 (
434 delay_step * position as u32,
435 position,
436 positions_moved,
437 preceding_disconnected,
438 )
439 }
440
441 fn override_by_max_submit_position_settings(
447 &self,
448 latency: Duration,
449 mut position: usize,
450 ) -> (Duration, usize) {
451 if let Some(max_submit_position) = self.max_submit_position {
453 position = std::cmp::min(position, max_submit_position);
454 }
455
456 let delay_step = self.submit_delay_step_override.unwrap_or(latency);
457 (delay_step, position)
458 }
459
460 fn submission_position(
470 &self,
471 committee: &Committee,
472 tx_digest: &TransactionDigest,
473 ) -> (usize, usize, usize) {
474 let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
475
476 self.check_submission_wrt_connectivity_and_scores(positions)
477 }
478
479 fn check_submission_wrt_connectivity_and_scores(
506 &self,
507 positions: Vec<AuthorityName>,
508 ) -> (usize, usize, usize) {
509 let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
510 if low_scoring_authorities.get(&self.authority).is_some() {
511 return (positions.len(), 0, 0);
512 }
513 let initial_position = get_position_in_list(self.authority, positions.clone());
514 let mut preceding_disconnected = 0;
515 let mut before_our_position = true;
516
517 let filtered_positions: Vec<_> = positions
518 .into_iter()
519 .filter(|authority| {
520 let keep = self.authority == *authority; if keep {
522 before_our_position = false;
523 }
524
525 let connected = self
527 .connection_monitor_status
528 .check_connection(&self.authority, authority)
529 .unwrap_or(ConnectionStatus::Disconnected)
530 == ConnectionStatus::Connected;
531 if !connected && before_our_position {
532 preceding_disconnected += 1; }
534
535 let high_scoring = low_scoring_authorities.get(authority).is_none();
537
538 keep || (connected && high_scoring)
539 })
540 .collect();
541
542 let position = get_position_in_list(self.authority, filtered_positions);
543
544 (
545 position,
546 initial_position - position,
547 preceding_disconnected,
548 )
549 }
550
551 pub fn submit(
562 self: &Arc<Self>,
563 transaction: ConsensusTransaction,
564 lock: Option<&RwLockReadGuard<ReconfigState>>,
565 epoch_store: &Arc<AuthorityPerEpochStore>,
566 ) -> IotaResult<JoinHandle<()>> {
567 self.submit_batch(&[transaction], lock, epoch_store)
568 }
569
570 pub fn submit_batch(
571 self: &Arc<Self>,
572 transactions: &[ConsensusTransaction],
573 lock: Option<&RwLockReadGuard<ReconfigState>>,
574 epoch_store: &Arc<AuthorityPerEpochStore>,
575 ) -> IotaResult<JoinHandle<()>> {
576 if transactions.len() > 1 {
577 for transaction in transactions {
581 fp_ensure!(
582 matches!(
583 transaction.kind,
584 ConsensusTransactionKind::CertifiedTransaction(_)
585 ),
586 IotaError::InvalidTxKindInSoftBundle
587 );
588 }
589 }
590
591 epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
592 Ok(self.submit_unchecked(transactions, epoch_store))
593 }
594
595 pub fn check_limits(&self) -> bool {
598 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
600 > self.max_pending_transactions
601 {
602 return false;
603 }
604 self.submit_semaphore.available_permits() > 0
606 }
607
608 pub(crate) fn check_consensus_overload(&self) -> IotaResult {
609 fp_ensure!(
610 self.check_limits(),
611 IotaError::TooManyTransactionsPendingConsensus
612 );
613 Ok(())
614 }
615
616 fn submit_unchecked(
617 self: &Arc<Self>,
618 transactions: &[ConsensusTransaction],
619 epoch_store: &Arc<AuthorityPerEpochStore>,
620 ) -> JoinHandle<()> {
621 let async_stage = self
624 .clone()
625 .submit_and_wait(transactions.to_vec(), epoch_store.clone());
626 let join_handle = spawn_monitored_task!(async_stage);
629 join_handle
630 }
631
632 async fn submit_and_wait(
633 self: Arc<Self>,
634 transactions: Vec<ConsensusTransaction>,
635 epoch_store: Arc<AuthorityPerEpochStore>,
636 ) {
637 epoch_store
651 .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
652 .await
653 .ok(); }
656
657 async fn submit_and_wait_inner(
658 self: Arc<Self>,
659 transactions: Vec<ConsensusTransaction>,
660 epoch_store: &Arc<AuthorityPerEpochStore>,
661 ) {
662 if transactions.is_empty() {
663 return;
664 }
665
666 let is_soft_bundle = transactions.len() > 1;
673
674 let mut transaction_keys = Vec::new();
675
676 for transaction in &transactions {
677 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
678 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
679 epoch_store.record_epoch_pending_certs_process_time_metric();
680 }
681
682 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
683 transaction_keys.push(transaction_key);
684 }
685 let tx_type = if !is_soft_bundle {
686 classify(&transactions[0])
687 } else {
688 "soft_bundle"
689 };
690
691 let mut guard = InflightDropGuard::acquire(&self, tx_type);
692
693 let (await_submit, position, positions_moved, preceding_disconnected) =
695 self.await_submit_delay(epoch_store.committee(), &transactions[..]);
696
697 let processed_via_consensus_or_checkpoint =
700 self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
701 pin_mut!(processed_via_consensus_or_checkpoint);
702
703 let processed_waiter = tokio::select! {
704 _ = await_submit => Some(processed_via_consensus_or_checkpoint),
706
707 _ = epoch_store.user_certs_closed_notify() => {
709 warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
710 Some(processed_via_consensus_or_checkpoint)
711 }
712
713 _ = &mut processed_via_consensus_or_checkpoint => {
715 None
716 }
717 };
718
719 let _monitor = if !is_soft_bundle
721 && matches!(
722 transactions[0].kind,
723 ConsensusTransactionKind::EndOfPublish(_)
724 | ConsensusTransactionKind::CapabilityNotificationV1(_)
725 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
726 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
727 ) {
728 let transaction_keys = transaction_keys.clone();
729 Some(CancelOnDrop(spawn_monitored_task!(async {
730 let mut i = 0u64;
731 loop {
732 i += 1;
733 const WARN_DELAY_S: u64 = 30;
734 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
735 let total_wait = i * WARN_DELAY_S;
736 warn!(
737 "Still waiting {} seconds for transactions {:?} to commit in consensus",
738 total_wait, transaction_keys
739 );
740 }
741 })))
742 } else {
743 None
744 };
745
746 if let Some(processed_waiter) = processed_waiter {
747 debug!("Submitting {:?} to consensus", transaction_keys);
748
749 guard.position = Some(position);
752 guard.positions_moved = Some(positions_moved);
753 guard.preceding_disconnected = Some(preceding_disconnected);
754
755 let _permit: SemaphorePermit = self
756 .submit_semaphore
757 .acquire()
758 .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
759 .await
760 .expect("Consensus adapter does not close semaphore");
761 let _in_flight_submission_guard =
762 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
763
764 let submit_inner = async {
768 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
769
770 loop {
771 let status_waiter = self
774 .submit_inner(
775 &transactions,
776 epoch_store,
777 &transaction_keys,
778 tx_type,
779 is_soft_bundle,
780 )
781 .await;
782
783 match status_waiter.await {
784 Ok(BlockStatusInternal::Sequenced) => {
785 self.metrics
786 .sequencing_certificate_status
787 .with_label_values(&[tx_type, "sequenced"])
788 .inc();
789 trace!(
792 "Transaction {transaction_keys:?} has been sequenced by consensus."
793 );
794 break;
795 }
796 Ok(BlockStatusInternal::GarbageCollected) => {
797 self.metrics
798 .sequencing_certificate_status
799 .with_label_values(&[tx_type, "garbage_collected"])
800 .inc();
801 debug!(
809 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
810 );
811 time::sleep(RETRY_DELAY_STEP).await;
812 continue;
813 }
814 Err(err) => {
815 warn!(
816 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
817 err
818 );
819 time::sleep(RETRY_DELAY_STEP).await;
820 continue;
821 }
822 }
823 }
824 };
825
826 guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
827 Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
828 Either::Right(((), processed_waiter)) => {
829 debug!("Submitted {transaction_keys:?} to consensus");
830 processed_waiter.await
831 }
832 };
833 }
834 debug!("{transaction_keys:?} processed by consensus");
835
836 let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
837 epoch_store
838 .remove_pending_consensus_transactions(&consensus_keys)
839 .expect("Storage error when removing consensus transaction");
840
841 let is_user_tx = is_soft_bundle
842 || matches!(
843 transactions[0].kind,
844 ConsensusTransactionKind::CertifiedTransaction(_)
845 );
846 let send_end_of_publish = if is_user_tx {
847 if epoch_store
857 .get_reconfig_state_read_lock_guard()
858 .is_reject_user_certs()
859 {
860 let pending_count = epoch_store.pending_consensus_certificates_count();
861 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
862 pending_count == 0 } else {
864 false
865 }
866 } else {
867 false
868 };
869 if send_end_of_publish {
870 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
872 if let Err(err) = self.submit(
873 ConsensusTransaction::new_end_of_publish(self.authority),
874 None,
875 epoch_store,
876 ) {
877 warn!("Error when sending end of publish message: {:?}", err);
878 }
879 }
880 self.metrics
881 .sequencing_certificate_success
882 .with_label_values(&[tx_type])
883 .inc();
884 }
885
886 async fn submit_inner(
887 self: &Arc<Self>,
888 transactions: &[ConsensusTransaction],
889 epoch_store: &Arc<AuthorityPerEpochStore>,
890 transaction_keys: &[SequencedConsensusTransactionKey],
891 tx_type: &str,
892 is_soft_bundle: bool,
893 ) -> BlockStatusReceiver {
894 let ack_start = Instant::now();
895 let mut retries: u32 = 0;
896
897 let status_waiter = loop {
898 match self
899 .consensus_client
900 .submit(transactions, epoch_store)
901 .await
902 {
903 Err(err) => {
904 if retries > 30
907 || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
908 {
909 warn!(
910 "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
911 );
912 }
913 self.metrics
914 .sequencing_certificate_failures
915 .with_label_values(&[tx_type])
916 .inc();
917 retries += 1;
918
919 if !is_soft_bundle && transactions[0].kind.is_dkg() {
920 time::sleep(Duration::from_millis(100)).await;
923 } else {
924 time::sleep(Duration::from_secs(10)).await;
925 };
926 }
927 Ok(status_waiter) => {
928 break status_waiter;
929 }
930 }
931 };
932
933 let bucket = match retries {
937 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
939 21..=50 => "between_20_and_50".to_string(),
940 51..=100 => "between_50_and_100".to_string(),
941 _ => "over_100".to_string(),
942 };
943
944 self.metrics
945 .sequencing_acknowledge_latency
946 .with_label_values(&[bucket.as_str(), tx_type])
947 .observe(ack_start.elapsed().as_secs_f64());
948
949 status_waiter
950 }
951
952 async fn await_consensus_or_checkpoint(
957 self: &Arc<Self>,
958 transaction_keys: Vec<SequencedConsensusTransactionKey>,
959 epoch_store: &Arc<AuthorityPerEpochStore>,
960 ) -> ProcessedMethod {
961 let notifications = FuturesUnordered::new();
962 for transaction_key in transaction_keys {
963 let transaction_digests = if let SequencedConsensusTransactionKey::External(
964 ConsensusTransactionKey::Certificate(digest),
965 ) = transaction_key
966 {
967 vec![digest]
968 } else {
969 vec![]
970 };
971
972 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
973 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
974 ) = transaction_key
975 {
976 Either::Left(
981 self.checkpoint_store
982 .notify_read_synced_checkpoint(checkpoint_sequence_number),
983 )
984 } else {
985 Either::Right(future::pending())
986 };
987
988 notifications.push(async move {
995 tokio::select! {
996 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
997 processed.expect("Storage error when waiting for consensus message processed");
998 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
999 return ProcessedMethod::Consensus;
1000 },
1001 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1002 processed.expect("Storage error when waiting for transaction executed in checkpoint");
1003 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1004 }
1005 _ = checkpoint_synced_future => {
1006 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1007 }
1008 }
1009 ProcessedMethod::Checkpoint
1010 });
1011 }
1012
1013 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1014 for method in processed_methods {
1015 if method == ProcessedMethod::Checkpoint {
1016 return ProcessedMethod::Checkpoint;
1017 }
1018 }
1019 ProcessedMethod::Consensus
1020 }
1021}
1022
1023impl CheckConnection for ConnectionMonitorStatus {
1024 fn check_connection(
1025 &self,
1026 ourself: &AuthorityName,
1027 authority: &AuthorityName,
1028 ) -> Option<ConnectionStatus> {
1029 if ourself == authority {
1030 return Some(ConnectionStatus::Connected);
1031 }
1032
1033 let mapping = self.authority_names_to_peer_ids.load_full();
1034 let peer_id = match mapping.get(authority) {
1035 Some(p) => p,
1036 None => {
1037 warn!(
1038 "failed to find peer {:?} in connection monitor listener",
1039 authority
1040 );
1041 return None;
1042 }
1043 };
1044
1045 let res = match self.connection_statuses.try_get(peer_id) {
1046 TryResult::Present(c) => Some(c.value().clone()),
1047 TryResult::Absent => None,
1048 TryResult::Locked => {
1049 Some(ConnectionStatus::Disconnected)
1051 }
1052 };
1053 res
1054 }
1055 fn update_mapping_for_epoch(
1056 &self,
1057 authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1058 ) {
1059 self.authority_names_to_peer_ids
1060 .swap(Arc::new(authority_names_to_peer_ids));
1061 }
1062}
1063
1064impl CheckConnection for ConnectionMonitorStatusForTests {
1065 fn check_connection(
1066 &self,
1067 _ourself: &AuthorityName,
1068 _authority: &AuthorityName,
1069 ) -> Option<ConnectionStatus> {
1070 Some(ConnectionStatus::Connected)
1071 }
1072 fn update_mapping_for_epoch(
1073 &self,
1074 _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1075 ) {
1076 }
1077}
1078
1079pub fn get_position_in_list(
1080 search_authority: AuthorityName,
1081 positions: Vec<AuthorityName>,
1082) -> usize {
1083 positions
1084 .into_iter()
1085 .find_position(|authority| *authority == search_authority)
1086 .expect("Couldn't find ourselves in shuffled committee")
1087 .0
1088}
1089
1090impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1091 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1096 let send_end_of_publish = {
1097 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1098 if !reconfig_guard.should_accept_user_certs() {
1099 return;
1101 }
1102 let pending_count = epoch_store.pending_consensus_certificates_count();
1103 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1104 let send_end_of_publish = pending_count == 0;
1105 epoch_store.close_user_certs(reconfig_guard);
1106 send_end_of_publish
1107 };
1109 if send_end_of_publish {
1110 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1111 if let Err(err) = self.submit(
1112 ConsensusTransaction::new_end_of_publish(self.authority),
1113 None,
1114 epoch_store,
1115 ) {
1116 warn!("Error when sending end of publish message: {:?}", err);
1117 }
1118 }
1119 }
1120}
1121
1122struct CancelOnDrop<T>(JoinHandle<T>);
1123
1124impl<T> Deref for CancelOnDrop<T> {
1125 type Target = JoinHandle<T>;
1126
1127 fn deref(&self) -> &Self::Target {
1128 &self.0
1129 }
1130}
1131
1132impl<T> Drop for CancelOnDrop<T> {
1133 fn drop(&mut self) {
1134 self.0.abort();
1135 }
1136}
1137
1138struct InflightDropGuard<'a> {
1140 adapter: &'a ConsensusAdapter,
1141 start: Instant,
1142 position: Option<usize>,
1143 positions_moved: Option<usize>,
1144 preceding_disconnected: Option<usize>,
1145 tx_type: &'static str,
1146 processed_method: ProcessedMethod,
1147}
1148
1149#[derive(PartialEq, Eq)]
1150enum ProcessedMethod {
1151 Consensus,
1152 Checkpoint,
1153}
1154
1155impl<'a> InflightDropGuard<'a> {
1156 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1157 adapter
1158 .num_inflight_transactions
1159 .fetch_add(1, Ordering::SeqCst);
1160 adapter
1161 .metrics
1162 .sequencing_certificate_inflight
1163 .with_label_values(&[tx_type])
1164 .inc();
1165 adapter
1166 .metrics
1167 .sequencing_certificate_attempt
1168 .with_label_values(&[tx_type])
1169 .inc();
1170 Self {
1171 adapter,
1172 start: Instant::now(),
1173 position: None,
1174 positions_moved: None,
1175 preceding_disconnected: None,
1176 tx_type,
1177 processed_method: ProcessedMethod::Consensus,
1178 }
1179 }
1180}
1181
1182impl Drop for InflightDropGuard<'_> {
1183 fn drop(&mut self) {
1184 self.adapter
1185 .num_inflight_transactions
1186 .fetch_sub(1, Ordering::SeqCst);
1187 self.adapter
1188 .metrics
1189 .sequencing_certificate_inflight
1190 .with_label_values(&[self.tx_type])
1191 .dec();
1192
1193 let position = if let Some(position) = self.position {
1194 self.adapter
1195 .metrics
1196 .sequencing_certificate_authority_position
1197 .observe(position as f64);
1198 position.to_string()
1199 } else {
1200 "not_submitted".to_string()
1201 };
1202
1203 if let Some(positions_moved) = self.positions_moved {
1204 self.adapter
1205 .metrics
1206 .sequencing_certificate_positions_moved
1207 .observe(positions_moved as f64);
1208 };
1209
1210 if let Some(preceding_disconnected) = self.preceding_disconnected {
1211 self.adapter
1212 .metrics
1213 .sequencing_certificate_preceding_disconnected
1214 .observe(preceding_disconnected as f64);
1215 };
1216
1217 let latency = self.start.elapsed();
1218 let processed_method = match self.processed_method {
1219 ProcessedMethod::Consensus => "processed_via_consensus",
1220 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1221 };
1222 self.adapter
1223 .metrics
1224 .sequencing_certificate_latency
1225 .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1226 .observe(latency.as_secs_f64());
1227
1228 if self.position == Some(0) {
1234 let sampled = matches!(
1237 self.tx_type,
1238 "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1239 );
1240 if sampled && self.processed_method == ProcessedMethod::Consensus {
1243 self.adapter.latency_observer.report(latency);
1244 }
1245 }
1246 }
1247}
1248
1249impl SubmitToConsensus for Arc<ConsensusAdapter> {
1250 fn submit_to_consensus(
1251 &self,
1252 transactions: &[ConsensusTransaction],
1253 epoch_store: &Arc<AuthorityPerEpochStore>,
1254 ) -> IotaResult {
1255 self.submit_batch(transactions, None, epoch_store)
1256 .map(|_| ())
1257 }
1258}
1259
1260pub fn position_submit_certificate(
1261 committee: &Committee,
1262 ourselves: &AuthorityName,
1263 tx_digest: &TransactionDigest,
1264) -> usize {
1265 let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1266 get_position_in_list(*ourselves, validators)
1267}
1268
1269#[cfg(test)]
1270mod adapter_tests {
1271 use std::{sync::Arc, time::Duration};
1272
1273 use fastcrypto::traits::KeyPair;
1274 use iota_types::{
1275 base_types::TransactionDigest,
1276 committee::Committee,
1277 crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1278 };
1279 use rand::{Rng, SeedableRng, rngs::StdRng};
1280
1281 use super::position_submit_certificate;
1282 use crate::{
1283 checkpoints::CheckpointStore,
1284 consensus_adapter::{
1285 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1286 },
1287 mysticeti_adapter::LazyMysticetiClient,
1288 };
1289
1290 fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1291 let authorities = (0..size)
1292 .map(|_k| {
1293 (
1294 AuthorityPublicKeyBytes::from(
1295 get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1296 ),
1297 rng.gen_range(0u64..10u64),
1298 )
1299 })
1300 .collect::<Vec<_>>();
1301 Committee::new_for_testing_with_normalized_voting_power(
1302 0,
1303 authorities.iter().cloned().collect(),
1304 )
1305 }
1306
1307 #[tokio::test]
1308 async fn test_await_submit_delay_user_transaction() {
1309 let mut rng = StdRng::from_seed([0; 32]);
1311 let committee = test_committee(&mut rng, 10);
1312
1313 let consensus_adapter = ConsensusAdapter::new(
1315 Arc::new(LazyMysticetiClient::new()),
1316 CheckpointStore::new_for_tests(),
1317 *committee.authority_by_index(0).unwrap(),
1318 Arc::new(ConnectionMonitorStatusForTests {}),
1319 100_000,
1320 100_000,
1321 Some(1),
1322 Some(Duration::from_secs(2)),
1323 ConsensusAdapterMetrics::new_test(),
1324 );
1325
1326 let tx_digest = TransactionDigest::generate(&mut rng);
1328
1329 let (position, positions_moved, _) =
1331 consensus_adapter.submission_position(&committee, &tx_digest);
1332 assert_eq!(position, 7);
1333 assert!(!positions_moved > 0);
1334
1335 let (delay_step, position, positions_moved, _) =
1337 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1338
1339 assert_eq!(position, 1);
1340 assert_eq!(delay_step, Duration::from_secs(2));
1341 assert!(!positions_moved > 0);
1342
1343 let consensus_adapter = ConsensusAdapter::new(
1345 Arc::new(LazyMysticetiClient::new()),
1346 CheckpointStore::new_for_tests(),
1347 *committee.authority_by_index(0).unwrap(),
1348 Arc::new(ConnectionMonitorStatusForTests {}),
1349 100_000,
1350 100_000,
1351 None,
1352 None,
1353 ConsensusAdapterMetrics::new_test(),
1354 );
1355
1356 let (delay_step, position, positions_moved, _) =
1357 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1358
1359 assert_eq!(position, 7);
1360
1361 assert_eq!(delay_step, Duration::from_secs(14));
1363 assert!(!positions_moved > 0);
1364 }
1365
1366 #[test]
1367 fn test_position_submit_certificate() {
1368 let mut rng = StdRng::from_seed([0; 32]);
1370 let committee = test_committee(&mut rng, 10);
1371
1372 const NUM_TEST_TRANSACTIONS: usize = 1000;
1374
1375 for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1376 let tx_digest = TransactionDigest::generate(&mut rng);
1377
1378 let mut zero_found = false;
1379 for (name, _) in committee.members() {
1380 let f = position_submit_certificate(&committee, name, &tx_digest);
1381 assert!(f < committee.num_members());
1382 if f == 0 {
1383 assert!(!zero_found);
1385 zero_found = true;
1386 }
1387 }
1388 assert!(zero_found);
1389 }
1390 }
1391}