1use std::{
6 collections::HashMap,
7 future::Future,
8 ops::Deref,
9 sync::{
10 Arc,
11 atomic::{AtomicU64, Ordering},
12 },
13 time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use dashmap::{DashMap, try_result::TryResult};
18use futures::{
19 FutureExt, StreamExt,
20 future::{self, Either, select},
21 pin_mut,
22 stream::FuturesUnordered,
23};
24use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task};
25use iota_simulator::anemo::PeerId;
26use iota_types::{
27 base_types::{AuthorityName, TransactionDigest},
28 committee::Committee,
29 error::{IotaError, IotaResult},
30 fp_ensure,
31 messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
32};
33use itertools::Itertools;
34use parking_lot::RwLockReadGuard;
35use prometheus::{
36 Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
37 register_histogram_vec_with_registry, register_histogram_with_registry,
38 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
39 register_int_gauge_with_registry,
40};
41use tokio::{
42 sync::{Semaphore, SemaphorePermit, oneshot},
43 task::JoinHandle,
44 time::{
45 Duration, {self},
46 },
47};
48use tracing::{debug, info, trace, warn};
49
50use crate::{
51 authority::authority_per_epoch_store::AuthorityPerEpochStore,
52 checkpoints::CheckpointStore,
53 connection_monitor::ConnectionStatus,
54 consensus_handler::{SequencedConsensusTransactionKey, classify},
55 epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
56 metrics::LatencyObserver,
57};
58
59#[cfg(test)]
60#[path = "unit_tests/consensus_tests.rs"]
61pub mod consensus_tests;
62
63const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
64 0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
65];
66
67pub struct ConsensusAdapterMetrics {
68 pub sequencing_certificate_attempt: IntCounterVec,
70 pub sequencing_certificate_success: IntCounterVec,
71 pub sequencing_certificate_failures: IntCounterVec,
72 pub sequencing_certificate_status: IntCounterVec,
73 pub sequencing_certificate_inflight: IntGaugeVec,
74 pub sequencing_acknowledge_latency: HistogramVec,
75 pub sequencing_certificate_latency: HistogramVec,
76 pub sequencing_certificate_authority_position: Histogram,
77 pub sequencing_certificate_positions_moved: Histogram,
78 pub sequencing_certificate_preceding_disconnected: Histogram,
79 pub sequencing_certificate_processed: IntCounterVec,
80 pub sequencing_in_flight_semaphore_wait: IntGauge,
81 pub sequencing_in_flight_submissions: IntGauge,
82 pub sequencing_estimated_latency: IntGauge,
83 pub sequencing_resubmission_interval_ms: IntGauge,
84}
85
86impl ConsensusAdapterMetrics {
87 pub fn new(registry: &Registry) -> Self {
88 Self {
89 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
90 "sequencing_certificate_attempt",
91 "Counts the number of certificates the validator attempts to sequence.",
92 &["tx_type"],
93 registry,
94 )
95 .unwrap(),
96 sequencing_certificate_success: register_int_counter_vec_with_registry!(
97 "sequencing_certificate_success",
98 "Counts the number of successfully sequenced certificates.",
99 &["tx_type"],
100 registry,
101 )
102 .unwrap(),
103 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
104 "sequencing_certificate_failures",
105 "Counts the number of sequenced certificates that failed other than by timeout.",
106 &["tx_type"],
107 registry,
108 )
109 .unwrap(),
110 sequencing_certificate_status: register_int_counter_vec_with_registry!(
111 "sequencing_certificate_status",
112 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
113 &["tx_type", "status"],
114 registry,
115 )
116 .unwrap(),
117 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
118 "sequencing_certificate_inflight",
119 "The inflight requests to sequence certificates.",
120 &["tx_type"],
121 registry,
122 )
123 .unwrap(),
124 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
125 "sequencing_acknowledge_latency",
126 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
127 &["retry", "tx_type"],
128 LATENCY_SEC_BUCKETS.to_vec(),
129 registry,
130 )
131 .unwrap(),
132 sequencing_certificate_latency: register_histogram_vec_with_registry!(
133 "sequencing_certificate_latency",
134 "The latency for sequencing a certificate.",
135 &["position", "tx_type", "processed_method"],
136 LATENCY_SEC_BUCKETS.to_vec(),
137 registry,
138 )
139 .unwrap(),
140 sequencing_certificate_authority_position: register_histogram_with_registry!(
141 "sequencing_certificate_authority_position",
142 "The position of the authority when submitted a certificate to consensus.",
143 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
144 registry,
145 )
146 .unwrap(),
147 sequencing_certificate_positions_moved: register_histogram_with_registry!(
148 "sequencing_certificate_positions_moved",
149 "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
150 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
151 registry,
152 )
153 .unwrap(),
154 sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155 "sequencing_certificate_preceding_disconnected",
156 "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158 registry,
159 )
160 .unwrap(),
161 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162 "sequencing_certificate_processed",
163 "The number of certificates that have been processed either by consensus or checkpoint.",
164 &["source"],
165 registry
166 )
167 .unwrap(),
168 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169 "sequencing_in_flight_semaphore_wait",
170 "How many requests are blocked on submit_permit.",
171 registry,
172 )
173 .unwrap(),
174 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175 "sequencing_in_flight_submissions",
176 "Number of transactions submitted to local consensus instance and not yet sequenced",
177 registry,
178 )
179 .unwrap(),
180 sequencing_estimated_latency: register_int_gauge_with_registry!(
181 "sequencing_estimated_latency",
182 "Consensus latency estimated by consensus adapter in milliseconds",
183 registry,
184 )
185 .unwrap(),
186 sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187 "sequencing_resubmission_interval_ms",
188 "Resubmission interval used by consensus adapter in milliseconds",
189 registry,
190 )
191 .unwrap(),
192 }
193 }
194
195 pub fn new_test() -> Self {
196 Self::new(&Registry::default())
197 }
198}
199
200pub enum BlockStatusInternal {
202 Sequenced,
203 GarbageCollected,
204}
205
206impl From<starfish_core::BlockStatus> for BlockStatusInternal {
207 fn from(status: starfish_core::BlockStatus) -> Self {
208 match status {
209 starfish_core::BlockStatus::Sequenced(_) => BlockStatusInternal::Sequenced,
210 starfish_core::BlockStatus::GarbageCollected(_) => {
211 BlockStatusInternal::GarbageCollected
212 }
213 }
214 }
215}
216
217pub type BlockStatusReceiver = oneshot::Receiver<BlockStatusInternal>;
218
219#[mockall::automock]
220pub trait SubmitToConsensus: Sync + Send + 'static {
221 fn submit_to_consensus(
222 &self,
223 transactions: &[ConsensusTransaction],
224 epoch_store: &Arc<AuthorityPerEpochStore>,
225 ) -> IotaResult;
226}
227
228#[mockall::automock]
229#[async_trait::async_trait]
230pub trait ConsensusClient: Sync + Send + 'static {
231 async fn submit(
232 &self,
233 transactions: &[ConsensusTransaction],
234 epoch_store: &Arc<AuthorityPerEpochStore>,
235 ) -> IotaResult<BlockStatusReceiver>;
236}
237
238pub struct ConsensusAdapter {
240 consensus_client: Arc<dyn ConsensusClient>,
242 checkpoint_store: Arc<CheckpointStore>,
244 authority: AuthorityName,
246 max_pending_transactions: usize,
248 num_inflight_transactions: AtomicU64,
250 max_submit_position: Option<usize>,
254 submit_delay_step_override: Option<Duration>,
257 connection_monitor_status: Arc<dyn CheckConnection>,
260 low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
262 metrics: ConsensusAdapterMetrics,
264 submit_semaphore: Semaphore,
266 latency_observer: LatencyObserver,
267}
268
269pub trait CheckConnection: Send + Sync {
270 fn check_connection(
271 &self,
272 ourself: &AuthorityName,
273 authority: &AuthorityName,
274 ) -> Option<ConnectionStatus>;
275 fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
276}
277
278pub struct ConnectionMonitorStatus {
279 pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
281 pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
283}
284
285pub struct ConnectionMonitorStatusForTests {}
286
287impl ConsensusAdapter {
288 pub fn new(
290 consensus_client: Arc<dyn ConsensusClient>,
291 checkpoint_store: Arc<CheckpointStore>,
292 authority: AuthorityName,
293 connection_monitor_status: Arc<dyn CheckConnection>,
294 max_pending_transactions: usize,
295 max_pending_local_submissions: usize,
296 max_submit_position: Option<usize>,
297 submit_delay_step_override: Option<Duration>,
298 metrics: ConsensusAdapterMetrics,
299 ) -> Self {
300 let num_inflight_transactions = Default::default();
301 let low_scoring_authorities =
302 ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
303 Self {
304 consensus_client,
305 checkpoint_store,
306 authority,
307 max_pending_transactions,
308 max_submit_position,
309 submit_delay_step_override,
310 num_inflight_transactions,
311 connection_monitor_status,
312 low_scoring_authorities,
313 metrics,
314 submit_semaphore: Semaphore::new(max_pending_local_submissions),
315 latency_observer: LatencyObserver::new(),
316 }
317 }
318
319 pub fn swap_low_scoring_authorities(
320 &self,
321 new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
322 ) {
323 self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
324 }
325
326 pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
327 let mut recovered = epoch_store.get_all_pending_consensus_transactions();
333
334 let is_pending_consensus_certificates_empty =
335 if epoch_store.protocol_config().enable_pcool_flow() {
336 true
339 } else {
340 epoch_store.pending_consensus_certificates_empty()
341 };
342
343 if epoch_store
344 .get_reconfig_state_read_lock_guard()
345 .is_reject_user_certs()
346 && is_pending_consensus_certificates_empty
347 {
348 if !recovered
350 .iter()
351 .any(ConsensusTransaction::is_end_of_publish)
352 {
353 recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
362 }
363 }
364 debug!(
365 "Submitting {:?} recovered pending consensus transactions to consensus",
366 recovered.len()
367 );
368 for transaction in recovered {
369 if transaction.is_end_of_publish() {
370 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
371 }
372 self.submit_unchecked(&[transaction], epoch_store);
373 }
374 }
375
376 fn await_submit_delay(
377 &self,
378 committee: &Committee,
379 transactions: &[ConsensusTransaction],
380 ) -> (impl Future<Output = ()>, usize, usize, usize) {
381 let min_digest = transactions
383 .iter()
384 .filter_map(|tx| match &tx.kind {
385 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
386 Some(certificate.digest())
387 }
388 ConsensusTransactionKind::UserTransactionV1(_) => {
389 None
392 }
393 _ => None,
394 })
395 .min();
396
397 let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
398 Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
399 _ => (Duration::ZERO, 0, 0, 0),
400 };
401 (
402 tokio::time::sleep(duration),
403 position,
404 positions_moved,
405 preceding_disconnected,
406 )
407 }
408
409 fn await_submit_delay_user_transaction(
410 &self,
411 committee: &Committee,
412 tx_digest: &TransactionDigest,
413 ) -> (Duration, usize, usize, usize) {
414 let (position, positions_moved, preceding_disconnected) =
415 self.submission_position(committee, tx_digest);
416
417 const DEFAULT_LATENCY: Duration = Duration::from_secs(1); const MIN_LATENCY: Duration = Duration::from_millis(150);
419 const MAX_LATENCY: Duration = Duration::from_secs(3);
420
421 let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
422 self.metrics
423 .sequencing_estimated_latency
424 .set(latency.as_millis() as i64);
425
426 let latency = std::cmp::max(latency, MIN_LATENCY);
427 let latency = std::cmp::min(latency, MAX_LATENCY);
428 let latency = latency * 2;
429 let (delay_step, position) =
430 self.override_by_max_submit_position_settings(latency, position);
431
432 self.metrics
433 .sequencing_resubmission_interval_ms
434 .set(delay_step.as_millis() as i64);
435
436 (
437 delay_step * position as u32,
438 position,
439 positions_moved,
440 preceding_disconnected,
441 )
442 }
443
444 fn override_by_max_submit_position_settings(
450 &self,
451 latency: Duration,
452 mut position: usize,
453 ) -> (Duration, usize) {
454 if let Some(max_submit_position) = self.max_submit_position {
456 position = std::cmp::min(position, max_submit_position);
457 }
458
459 let delay_step = self.submit_delay_step_override.unwrap_or(latency);
460 (delay_step, position)
461 }
462
463 fn submission_position(
473 &self,
474 committee: &Committee,
475 tx_digest: &TransactionDigest,
476 ) -> (usize, usize, usize) {
477 let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
478
479 self.check_submission_wrt_connectivity_and_scores(positions)
480 }
481
482 fn check_submission_wrt_connectivity_and_scores(
509 &self,
510 positions: Vec<AuthorityName>,
511 ) -> (usize, usize, usize) {
512 let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
513 if low_scoring_authorities.get(&self.authority).is_some() {
514 return (positions.len(), 0, 0);
515 }
516 let initial_position = get_position_in_list(self.authority, positions.clone());
517 let mut preceding_disconnected = 0;
518 let mut before_our_position = true;
519
520 let filtered_positions: Vec<_> = positions
521 .into_iter()
522 .filter(|authority| {
523 let keep = self.authority == *authority; if keep {
525 before_our_position = false;
526 }
527
528 let connected = self
530 .connection_monitor_status
531 .check_connection(&self.authority, authority)
532 .unwrap_or(ConnectionStatus::Disconnected)
533 == ConnectionStatus::Connected;
534 if !connected && before_our_position {
535 preceding_disconnected += 1; }
537
538 let high_scoring = low_scoring_authorities.get(authority).is_none();
540
541 keep || (connected && high_scoring)
542 })
543 .collect();
544
545 let position = get_position_in_list(self.authority, filtered_positions);
546
547 (
548 position,
549 initial_position - position,
550 preceding_disconnected,
551 )
552 }
553
554 pub fn submit(
565 self: &Arc<Self>,
566 transaction: ConsensusTransaction,
567 lock: Option<&RwLockReadGuard<ReconfigState>>,
568 epoch_store: &Arc<AuthorityPerEpochStore>,
569 ) -> IotaResult<JoinHandle<()>> {
570 self.submit_batch(&[transaction], lock, epoch_store)
571 }
572
573 pub fn submit_batch(
574 self: &Arc<Self>,
575 transactions: &[ConsensusTransaction],
576 lock: Option<&RwLockReadGuard<ReconfigState>>,
577 epoch_store: &Arc<AuthorityPerEpochStore>,
578 ) -> IotaResult<JoinHandle<()>> {
579 if transactions.len() > 1 {
580 for transaction in transactions {
585 fp_ensure!(
586 matches!(
587 transaction.kind,
588 ConsensusTransactionKind::CertifiedTransaction(_)
589 | ConsensusTransactionKind::UserTransactionV1(_)
590 ),
591 IotaError::InvalidTxKindInSoftBundle
592 );
593 }
594 }
595
596 epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
597
598 Ok(self.submit_unchecked(transactions, epoch_store))
599 }
600
601 pub fn num_inflight_transactions(&self) -> u64 {
603 self.num_inflight_transactions.load(Ordering::Relaxed)
604 }
605
606 pub fn check_limits(&self) -> bool {
609 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
611 > self.max_pending_transactions
612 {
613 return false;
614 }
615 self.submit_semaphore.available_permits() > 0
617 }
618
619 pub(crate) fn check_consensus_overload(&self) -> IotaResult {
620 fp_ensure!(
621 self.check_limits(),
622 IotaError::TooManyTransactionsPendingConsensus
623 );
624 Ok(())
625 }
626
627 fn submit_unchecked(
628 self: &Arc<Self>,
629 transactions: &[ConsensusTransaction],
630 epoch_store: &Arc<AuthorityPerEpochStore>,
631 ) -> JoinHandle<()> {
632 let async_stage = self
635 .clone()
636 .submit_and_wait(transactions.to_vec(), epoch_store.clone());
637 let join_handle = spawn_monitored_task!(async_stage);
640 join_handle
641 }
642
643 async fn submit_and_wait(
644 self: Arc<Self>,
645 transactions: Vec<ConsensusTransaction>,
646 epoch_store: Arc<AuthorityPerEpochStore>,
647 ) {
648 epoch_store
662 .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
663 .await
664 .ok(); }
667
668 async fn submit_and_wait_inner(
669 self: Arc<Self>,
670 transactions: Vec<ConsensusTransaction>,
671 epoch_store: &Arc<AuthorityPerEpochStore>,
672 ) {
673 if transactions.is_empty() {
674 return;
675 }
676
677 let is_soft_bundle = transactions.len() > 1;
681
682 let mut transaction_keys = Vec::new();
683
684 for transaction in &transactions {
685 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
686 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
687 epoch_store.record_epoch_pending_certs_process_time_metric();
688 }
689
690 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
691 transaction_keys.push(transaction_key);
692 }
693 let tx_type = if !is_soft_bundle {
694 classify(&transactions[0])
695 } else {
696 "soft_bundle"
697 };
698
699 let mut guard = InflightDropGuard::acquire(&self, tx_type);
700
701 let (await_submit, position, positions_moved, preceding_disconnected) =
703 self.await_submit_delay(epoch_store.committee(), &transactions[..]);
704
705 let processed_via_consensus_or_checkpoint =
708 self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
709 pin_mut!(processed_via_consensus_or_checkpoint);
710
711 let processed_waiter = tokio::select! {
712 _ = await_submit => Some(processed_via_consensus_or_checkpoint),
714
715 _ = epoch_store.user_certs_closed_notify() => {
717 warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
718 Some(processed_via_consensus_or_checkpoint)
719 }
720
721 method = &mut processed_via_consensus_or_checkpoint => {
728 guard.processed_method = method;
729 None
730 }
731 };
732
733 let _monitor = if !is_soft_bundle
735 && matches!(
736 transactions[0].kind,
737 ConsensusTransactionKind::EndOfPublish(_)
738 | ConsensusTransactionKind::CapabilityNotificationV1(_)
739 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
740 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
741 ) {
742 let transaction_keys = transaction_keys.clone();
743 Some(CancelOnDrop(spawn_monitored_task!(async {
744 let mut i = 0u64;
745 loop {
746 i += 1;
747 const WARN_DELAY_S: u64 = 30;
748 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
749 let total_wait = i * WARN_DELAY_S;
750 warn!(
751 "Still waiting {} seconds for transactions {:?} to commit in consensus",
752 total_wait, transaction_keys
753 );
754 }
755 })))
756 } else {
757 None
758 };
759
760 if let Some(processed_waiter) = processed_waiter {
761 debug!("Submitting {:?} to consensus", transaction_keys);
762
763 guard.position = Some(position);
766 guard.positions_moved = Some(positions_moved);
767 guard.preceding_disconnected = Some(preceding_disconnected);
768
769 let _permit: SemaphorePermit = self
770 .submit_semaphore
771 .acquire()
772 .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
773 .await
774 .expect("Consensus adapter does not close semaphore");
775 let _in_flight_submission_guard =
776 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
777
778 let submit_inner = async {
782 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
783
784 loop {
785 let status_waiter = self
788 .submit_inner(
789 &transactions,
790 epoch_store,
791 &transaction_keys,
792 tx_type,
793 is_soft_bundle,
794 )
795 .await;
796
797 match status_waiter.await {
798 Ok(BlockStatusInternal::Sequenced) => {
799 self.metrics
800 .sequencing_certificate_status
801 .with_label_values(&[tx_type, "sequenced"])
802 .inc();
803 trace!(
806 "Transaction {transaction_keys:?} has been sequenced by consensus."
807 );
808 break;
809 }
810 Ok(BlockStatusInternal::GarbageCollected) => {
811 self.metrics
812 .sequencing_certificate_status
813 .with_label_values(&[tx_type, "garbage_collected"])
814 .inc();
815 debug!(
823 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
824 );
825 time::sleep(RETRY_DELAY_STEP).await;
826 continue;
827 }
828 Err(err) => {
829 warn!(
830 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
831 err
832 );
833 time::sleep(RETRY_DELAY_STEP).await;
834 continue;
835 }
836 }
837 }
838 };
839
840 guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
841 Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
842 Either::Right(((), processed_waiter)) => {
843 debug!("Submitted {transaction_keys:?} to consensus");
844 processed_waiter.await
845 }
846 };
847 }
848 debug!("{transaction_keys:?} processed by consensus");
849
850 let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
851 epoch_store
852 .remove_pending_consensus_transactions(&consensus_keys)
853 .expect("Storage error when removing consensus transaction");
854
855 let is_user_tx = is_soft_bundle
856 || if epoch_store.protocol_config().enable_pcool_flow() {
857 matches!(
860 transactions[0].kind,
861 ConsensusTransactionKind::UserTransactionV1(_)
862 )
863 } else {
864 matches!(
867 transactions[0].kind,
868 ConsensusTransactionKind::CertifiedTransaction(_)
869 )
870 };
871 let send_end_of_publish = if is_user_tx {
872 if epoch_store.protocol_config().enable_pcool_flow() {
873 false
880 } else {
881 if epoch_store
896 .get_reconfig_state_read_lock_guard()
897 .is_reject_user_certs()
898 {
899 let pending_count = epoch_store.pending_consensus_certificates_count();
900 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
901
902 pending_count == 0 } else {
904 false
905 }
906 }
907 } else {
908 false
909 };
910 if send_end_of_publish {
911 let adapter = self.clone();
917 let epoch_store = epoch_store.clone();
918 spawn_monitored_task!(async move {
919 if epoch_store
920 .within_alive_epoch(adapter.submit_end_of_publish_with_retry(&epoch_store))
921 .await
922 .is_err()
923 {
924 warn!(
925 epoch = ?epoch_store.epoch(),
926 "EndOfPublish submission cancelled: epoch has ended",
927 );
928 }
929 });
930 }
931 self.metrics
932 .sequencing_certificate_success
933 .with_label_values(&[tx_type])
934 .inc();
935 }
936
937 async fn submit_inner(
938 self: &Arc<Self>,
939 transactions: &[ConsensusTransaction],
940 epoch_store: &Arc<AuthorityPerEpochStore>,
941 transaction_keys: &[SequencedConsensusTransactionKey],
942 tx_type: &str,
943 is_soft_bundle: bool,
944 ) -> BlockStatusReceiver {
945 let ack_start = Instant::now();
946 let mut retries: u32 = 0;
947
948 let status_waiter = loop {
949 match self
950 .consensus_client
951 .submit(transactions, epoch_store)
952 .await
953 {
954 Err(err) => {
955 if retries > 30
958 || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
959 {
960 warn!(
961 "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
962 );
963 }
964 self.metrics
965 .sequencing_certificate_failures
966 .with_label_values(&[tx_type])
967 .inc();
968 retries += 1;
969
970 if !is_soft_bundle && transactions[0].kind.is_dkg() {
971 time::sleep(Duration::from_millis(100)).await;
974 } else {
975 time::sleep(Duration::from_secs(10)).await;
976 };
977 }
978 Ok(status_waiter) => {
979 break status_waiter;
980 }
981 }
982 };
983
984 let bucket = match retries {
988 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
990 21..=50 => "between_20_and_50".to_string(),
991 51..=100 => "between_50_and_100".to_string(),
992 _ => "over_100".to_string(),
993 };
994
995 self.metrics
996 .sequencing_acknowledge_latency
997 .with_label_values(&[bucket.as_str(), tx_type])
998 .observe(ack_start.elapsed().as_secs_f64());
999
1000 status_waiter
1001 }
1002
1003 async fn await_consensus_or_checkpoint(
1008 self: &Arc<Self>,
1009 transaction_keys: Vec<SequencedConsensusTransactionKey>,
1010 epoch_store: &Arc<AuthorityPerEpochStore>,
1011 ) -> ProcessedMethod {
1012 let notifications = FuturesUnordered::new();
1013 for transaction_key in transaction_keys {
1014 let transaction_digests = match transaction_key {
1015 SequencedConsensusTransactionKey::External(
1016 ConsensusTransactionKey::Certificate(digest),
1017 )
1018 | SequencedConsensusTransactionKey::External(
1019 ConsensusTransactionKey::UserTransaction(digest),
1020 ) => vec![digest],
1021 _ => vec![],
1022 };
1023
1024 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
1025 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
1026 ) = transaction_key
1027 {
1028 Either::Left(
1033 self.checkpoint_store
1034 .notify_read_synced_checkpoint(checkpoint_sequence_number),
1035 )
1036 } else {
1037 Either::Right(future::pending())
1038 };
1039
1040 let dropped_digest = transaction_digests.first().copied();
1047 notifications.push(async move {
1048 tokio::select! {
1049 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
1050 processed.expect("Storage error when waiting for consensus message processed");
1051 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
1052 return ProcessedMethod::Consensus;
1053 },
1054 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
1055 processed.expect("Storage error when waiting for transaction executed in checkpoint");
1056 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
1057 }
1058 _ = checkpoint_synced_future => {
1059 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
1060 }
1061 _ = async move {
1062 if let Some(d) = dropped_digest {
1063 let _err = epoch_store.notify_read_dropped_digests(d).await;
1064 } else {
1065 future::pending::<()>().await;
1066 }
1067 } => {
1068 self.metrics.sequencing_certificate_processed.with_label_values(&["dropped"]).inc();
1069 return ProcessedMethod::Dropped;
1070 }
1071 }
1072 ProcessedMethod::Checkpoint
1073 });
1074 }
1075
1076 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1077 if processed_methods.contains(&ProcessedMethod::Dropped) {
1078 ProcessedMethod::Dropped
1079 } else if processed_methods.contains(&ProcessedMethod::Checkpoint) {
1080 ProcessedMethod::Checkpoint
1081 } else {
1082 ProcessedMethod::Consensus
1083 }
1084 }
1085
1086 async fn submit_end_of_publish_with_retry(
1097 self: &Arc<Self>,
1098 epoch_store: &Arc<AuthorityPerEpochStore>,
1099 ) {
1100 const INITIAL_BACKOFF: Duration = Duration::from_millis(100);
1101 const MAX_BACKOFF: Duration = Duration::from_secs(10);
1102
1103 info!(
1104 epoch = ?epoch_store.epoch(),
1105 authority = ?self.authority,
1106 "Sending EndOfPublish message to consensus",
1107 );
1108
1109 let mut attempt: u32 = 0;
1110 loop {
1111 match self.submit(
1112 ConsensusTransaction::new_end_of_publish(self.authority),
1113 None,
1114 epoch_store,
1115 ) {
1116 Ok(_) => return,
1117 Err(IotaError::EpochEnded(_)) => {
1118 warn!(
1119 epoch = ?epoch_store.epoch(),
1120 authority = ?self.authority,
1121 "EndOfPublish submission stopped: epoch has ended",
1122 );
1123 return;
1124 }
1125 Err(err) => {
1126 let backoff = (INITIAL_BACKOFF * 2u32.pow(attempt.min(10))).min(MAX_BACKOFF);
1127 warn!(
1128 epoch = ?epoch_store.epoch(),
1129 authority = ?self.authority,
1130 attempt,
1131 "Failed to submit EndOfPublish, retrying in {:?}: {:?}",
1132 backoff,
1133 err,
1134 );
1135 tokio::time::sleep(backoff).await;
1136 attempt = attempt.saturating_add(1);
1137 }
1138 }
1139 }
1140 }
1141}
1142
1143impl CheckConnection for ConnectionMonitorStatus {
1144 fn check_connection(
1145 &self,
1146 ourself: &AuthorityName,
1147 authority: &AuthorityName,
1148 ) -> Option<ConnectionStatus> {
1149 if ourself == authority {
1150 return Some(ConnectionStatus::Connected);
1151 }
1152
1153 let mapping = self.authority_names_to_peer_ids.load_full();
1154 let peer_id = match mapping.get(authority) {
1155 Some(p) => p,
1156 None => {
1157 warn!(
1158 "failed to find peer {:?} in connection monitor listener",
1159 authority
1160 );
1161 return None;
1162 }
1163 };
1164
1165 let res = match self.connection_statuses.try_get(peer_id) {
1166 TryResult::Present(c) => Some(c.value().clone()),
1167 TryResult::Absent => None,
1168 TryResult::Locked => {
1169 Some(ConnectionStatus::Disconnected)
1171 }
1172 };
1173 res
1174 }
1175 fn update_mapping_for_epoch(
1176 &self,
1177 authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1178 ) {
1179 self.authority_names_to_peer_ids
1180 .swap(Arc::new(authority_names_to_peer_ids));
1181 }
1182}
1183
1184impl CheckConnection for ConnectionMonitorStatusForTests {
1185 fn check_connection(
1186 &self,
1187 _ourself: &AuthorityName,
1188 _authority: &AuthorityName,
1189 ) -> Option<ConnectionStatus> {
1190 Some(ConnectionStatus::Connected)
1191 }
1192 fn update_mapping_for_epoch(
1193 &self,
1194 _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1195 ) {
1196 }
1197}
1198
1199pub fn get_position_in_list(
1200 search_authority: AuthorityName,
1201 positions: Vec<AuthorityName>,
1202) -> usize {
1203 positions
1204 .into_iter()
1205 .find_position(|authority| *authority == search_authority)
1206 .expect("Couldn't find ourselves in shuffled committee")
1207 .0
1208}
1209
1210impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1211 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1218 let send_end_of_publish = {
1219 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1220 if !reconfig_guard.should_accept_user_certs() {
1221 return;
1223 }
1224
1225 let send_end_of_publish = if epoch_store.protocol_config().enable_pcool_flow() {
1226 debug!(epoch=?epoch_store.epoch(), "Closing epoch in P-COOL mode");
1229
1230 true
1231 } else {
1232 let pending_count = epoch_store.pending_consensus_certificates_count();
1235 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1236
1237 pending_count == 0 };
1239
1240 epoch_store.close_user_certs(reconfig_guard);
1241
1242 send_end_of_publish
1243 };
1245
1246 if send_end_of_publish {
1247 let adapter = self.clone();
1254 let epoch_store = epoch_store.clone();
1255 spawn_monitored_task!(async move {
1256 if epoch_store
1257 .within_alive_epoch(adapter.submit_end_of_publish_with_retry(&epoch_store))
1258 .await
1259 .is_err()
1260 {
1261 warn!(
1262 epoch = ?epoch_store.epoch(),
1263 "EndOfPublish submission cancelled: epoch has ended",
1264 );
1265 }
1266 });
1267 }
1268 }
1269}
1270
1271struct CancelOnDrop<T>(JoinHandle<T>);
1272
1273impl<T> Deref for CancelOnDrop<T> {
1274 type Target = JoinHandle<T>;
1275
1276 fn deref(&self) -> &Self::Target {
1277 &self.0
1278 }
1279}
1280
1281impl<T> Drop for CancelOnDrop<T> {
1282 fn drop(&mut self) {
1283 self.0.abort();
1284 }
1285}
1286
1287struct InflightDropGuard<'a> {
1289 adapter: &'a ConsensusAdapter,
1290 start: Instant,
1291 position: Option<usize>,
1292 positions_moved: Option<usize>,
1293 preceding_disconnected: Option<usize>,
1294 tx_type: &'static str,
1295 processed_method: ProcessedMethod,
1296}
1297
1298#[derive(PartialEq, Eq, Copy, Clone)]
1299enum ProcessedMethod {
1300 Consensus,
1301 Checkpoint,
1302 Dropped,
1303}
1304
1305impl<'a> InflightDropGuard<'a> {
1306 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1307 adapter
1308 .num_inflight_transactions
1309 .fetch_add(1, Ordering::SeqCst);
1310 adapter
1311 .metrics
1312 .sequencing_certificate_inflight
1313 .with_label_values(&[tx_type])
1314 .inc();
1315 adapter
1316 .metrics
1317 .sequencing_certificate_attempt
1318 .with_label_values(&[tx_type])
1319 .inc();
1320 Self {
1321 adapter,
1322 start: Instant::now(),
1323 position: None,
1324 positions_moved: None,
1325 preceding_disconnected: None,
1326 tx_type,
1327 processed_method: ProcessedMethod::Consensus,
1328 }
1329 }
1330}
1331
1332impl Drop for InflightDropGuard<'_> {
1333 fn drop(&mut self) {
1334 self.adapter
1335 .num_inflight_transactions
1336 .fetch_sub(1, Ordering::SeqCst);
1337 self.adapter
1338 .metrics
1339 .sequencing_certificate_inflight
1340 .with_label_values(&[self.tx_type])
1341 .dec();
1342
1343 let position = if let Some(position) = self.position {
1344 self.adapter
1345 .metrics
1346 .sequencing_certificate_authority_position
1347 .observe(position as f64);
1348 position.to_string()
1349 } else {
1350 "not_submitted".to_string()
1351 };
1352
1353 if let Some(positions_moved) = self.positions_moved {
1354 self.adapter
1355 .metrics
1356 .sequencing_certificate_positions_moved
1357 .observe(positions_moved as f64);
1358 };
1359
1360 if let Some(preceding_disconnected) = self.preceding_disconnected {
1361 self.adapter
1362 .metrics
1363 .sequencing_certificate_preceding_disconnected
1364 .observe(preceding_disconnected as f64);
1365 };
1366
1367 let latency = self.start.elapsed();
1368 let processed_method = match self.processed_method {
1369 ProcessedMethod::Consensus => "processed_via_consensus",
1370 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1371 ProcessedMethod::Dropped => "dropped",
1372 };
1373 self.adapter
1374 .metrics
1375 .sequencing_certificate_latency
1376 .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1377 .observe(latency.as_secs_f64());
1378
1379 if self.position == Some(0) {
1385 let sampled = matches!(
1388 self.tx_type,
1389 "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1390 );
1391 if sampled && self.processed_method == ProcessedMethod::Consensus {
1395 self.adapter.latency_observer.report(latency);
1396 }
1397 }
1398 }
1399}
1400
1401impl SubmitToConsensus for Arc<ConsensusAdapter> {
1402 fn submit_to_consensus(
1403 &self,
1404 transactions: &[ConsensusTransaction],
1405 epoch_store: &Arc<AuthorityPerEpochStore>,
1406 ) -> IotaResult {
1407 self.submit_batch(transactions, None, epoch_store)
1408 .map(|_| ())
1409 }
1410}
1411
1412pub fn position_submit_certificate(
1413 committee: &Committee,
1414 ourselves: &AuthorityName,
1415 tx_digest: &TransactionDigest,
1416) -> usize {
1417 let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1418 get_position_in_list(*ourselves, validators)
1419}
1420
1421#[cfg(test)]
1422mod adapter_tests {
1423 use std::{sync::Arc, time::Duration};
1424
1425 use fastcrypto::traits::KeyPair;
1426 use iota_types::{
1427 base_types::TransactionDigest,
1428 committee::Committee,
1429 crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1430 };
1431 use rand::{Rng, SeedableRng, rngs::StdRng};
1432
1433 use super::position_submit_certificate;
1434 use crate::{
1435 checkpoints::CheckpointStore,
1436 consensus_adapter::{
1437 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1438 },
1439 starfish_adapter::LazyStarfishClient,
1440 };
1441
1442 fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1443 let authorities = (0..size)
1444 .map(|_k| {
1445 (
1446 AuthorityPublicKeyBytes::from(
1447 get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1448 ),
1449 rng.gen_range(0u64..10u64),
1450 )
1451 })
1452 .collect::<Vec<_>>();
1453 Committee::new_for_testing_with_normalized_voting_power(
1454 0,
1455 authorities.iter().cloned().collect(),
1456 )
1457 }
1458
1459 #[tokio::test]
1460 async fn test_await_submit_delay_user_transaction() {
1461 let mut rng = StdRng::from_seed([0; 32]);
1463 let committee = test_committee(&mut rng, 10);
1464
1465 let consensus_adapter = ConsensusAdapter::new(
1467 Arc::new(LazyStarfishClient::new()),
1468 CheckpointStore::new_for_tests(),
1469 *committee.authority_by_index(0).unwrap(),
1470 Arc::new(ConnectionMonitorStatusForTests {}),
1471 100_000,
1472 100_000,
1473 Some(1),
1474 Some(Duration::from_secs(2)),
1475 ConsensusAdapterMetrics::new_test(),
1476 );
1477
1478 let tx_digest = TransactionDigest::generate(&mut rng);
1480
1481 let (position, positions_moved, _) =
1483 consensus_adapter.submission_position(&committee, &tx_digest);
1484 assert_eq!(position, 7);
1485 assert!(!positions_moved > 0);
1486
1487 let (delay_step, position, positions_moved, _) =
1489 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1490
1491 assert_eq!(position, 1);
1492 assert_eq!(delay_step, Duration::from_secs(2));
1493 assert!(!positions_moved > 0);
1494
1495 let consensus_adapter = ConsensusAdapter::new(
1497 Arc::new(LazyStarfishClient::new()),
1498 CheckpointStore::new_for_tests(),
1499 *committee.authority_by_index(0).unwrap(),
1500 Arc::new(ConnectionMonitorStatusForTests {}),
1501 100_000,
1502 100_000,
1503 None,
1504 None,
1505 ConsensusAdapterMetrics::new_test(),
1506 );
1507
1508 let (delay_step, position, positions_moved, _) =
1509 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1510
1511 assert_eq!(position, 7);
1512
1513 assert_eq!(delay_step, Duration::from_secs(14));
1515 assert!(!positions_moved > 0);
1516 }
1517
1518 #[test]
1519 fn test_position_submit_certificate() {
1520 let mut rng = StdRng::from_seed([0; 32]);
1522 let committee = test_committee(&mut rng, 10);
1523
1524 const NUM_TEST_TRANSACTIONS: usize = 1000;
1526
1527 for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1528 let tx_digest = TransactionDigest::generate(&mut rng);
1529
1530 let mut zero_found = false;
1531 for (name, _) in committee.members() {
1532 let f = position_submit_certificate(&committee, name, &tx_digest);
1533 assert!(f < committee.num_members());
1534 if f == 0 {
1535 assert!(!zero_found);
1537 zero_found = true;
1538 }
1539 }
1540 assert!(zero_found);
1541 }
1542 }
1543}