1use std::{
6 collections::HashMap,
7 future::Future,
8 ops::Deref,
9 sync::{
10 Arc,
11 atomic::{AtomicU64, Ordering},
12 },
13 time::Instant,
14};
15
16use arc_swap::ArcSwap;
17use dashmap::{DashMap, try_result::TryResult};
18use futures::{
19 FutureExt, StreamExt,
20 future::{self, Either, select},
21 pin_mut,
22 stream::FuturesUnordered,
23};
24use iota_metrics::{GaugeGuard, GaugeGuardFutureExt, LATENCY_SEC_BUCKETS, spawn_monitored_task};
25use iota_simulator::anemo::PeerId;
26use iota_types::{
27 base_types::{AuthorityName, TransactionDigest},
28 committee::Committee,
29 error::{IotaError, IotaResult},
30 fp_ensure,
31 messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
32};
33use itertools::Itertools;
34use parking_lot::RwLockReadGuard;
35use prometheus::{
36 Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
37 register_histogram_vec_with_registry, register_histogram_with_registry,
38 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
39 register_int_gauge_with_registry,
40};
41use tokio::{
42 sync::{Semaphore, SemaphorePermit, oneshot},
43 task::JoinHandle,
44 time::{
45 Duration, {self},
46 },
47};
48use tracing::{debug, info, trace, warn};
49
50use crate::{
51 authority::authority_per_epoch_store::AuthorityPerEpochStore,
52 checkpoints::CheckpointStore,
53 connection_monitor::ConnectionStatus,
54 consensus_handler::{SequencedConsensusTransactionKey, classify},
55 epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator},
56 metrics::LatencyObserver,
57};
58
59#[cfg(test)]
60#[path = "unit_tests/consensus_tests.rs"]
61pub mod consensus_tests;
62
63const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
64 0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
65];
66
67pub struct ConsensusAdapterMetrics {
68 pub sequencing_certificate_attempt: IntCounterVec,
70 pub sequencing_certificate_success: IntCounterVec,
71 pub sequencing_certificate_failures: IntCounterVec,
72 pub sequencing_certificate_status: IntCounterVec,
73 pub sequencing_certificate_inflight: IntGaugeVec,
74 pub sequencing_acknowledge_latency: HistogramVec,
75 pub sequencing_certificate_latency: HistogramVec,
76 pub sequencing_certificate_authority_position: Histogram,
77 pub sequencing_certificate_positions_moved: Histogram,
78 pub sequencing_certificate_preceding_disconnected: Histogram,
79 pub sequencing_certificate_processed: IntCounterVec,
80 pub sequencing_in_flight_semaphore_wait: IntGauge,
81 pub sequencing_in_flight_submissions: IntGauge,
82 pub sequencing_estimated_latency: IntGauge,
83 pub sequencing_resubmission_interval_ms: IntGauge,
84}
85
86impl ConsensusAdapterMetrics {
87 pub fn new(registry: &Registry) -> Self {
88 Self {
89 sequencing_certificate_attempt: register_int_counter_vec_with_registry!(
90 "sequencing_certificate_attempt",
91 "Counts the number of certificates the validator attempts to sequence.",
92 &["tx_type"],
93 registry,
94 )
95 .unwrap(),
96 sequencing_certificate_success: register_int_counter_vec_with_registry!(
97 "sequencing_certificate_success",
98 "Counts the number of successfully sequenced certificates.",
99 &["tx_type"],
100 registry,
101 )
102 .unwrap(),
103 sequencing_certificate_failures: register_int_counter_vec_with_registry!(
104 "sequencing_certificate_failures",
105 "Counts the number of sequenced certificates that failed other than by timeout.",
106 &["tx_type"],
107 registry,
108 )
109 .unwrap(),
110 sequencing_certificate_status: register_int_counter_vec_with_registry!(
111 "sequencing_certificate_status",
112 "The status of the certificate sequencing as reported by consensus. The status can be either sequenced or garbage collected.",
113 &["tx_type", "status"],
114 registry,
115 )
116 .unwrap(),
117 sequencing_certificate_inflight: register_int_gauge_vec_with_registry!(
118 "sequencing_certificate_inflight",
119 "The inflight requests to sequence certificates.",
120 &["tx_type"],
121 registry,
122 )
123 .unwrap(),
124 sequencing_acknowledge_latency: register_histogram_vec_with_registry!(
125 "sequencing_acknowledge_latency",
126 "The latency for acknowledgement from sequencing engine. The overall sequencing latency is measured by the sequencing_certificate_latency metric",
127 &["retry", "tx_type"],
128 LATENCY_SEC_BUCKETS.to_vec(),
129 registry,
130 )
131 .unwrap(),
132 sequencing_certificate_latency: register_histogram_vec_with_registry!(
133 "sequencing_certificate_latency",
134 "The latency for sequencing a certificate.",
135 &["position", "tx_type", "processed_method"],
136 LATENCY_SEC_BUCKETS.to_vec(),
137 registry,
138 )
139 .unwrap(),
140 sequencing_certificate_authority_position: register_histogram_with_registry!(
141 "sequencing_certificate_authority_position",
142 "The position of the authority when submitted a certificate to consensus.",
143 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
144 registry,
145 )
146 .unwrap(),
147 sequencing_certificate_positions_moved: register_histogram_with_registry!(
148 "sequencing_certificate_positions_moved",
149 "The number of authorities ahead of ourselves that were filtered out when submitting a certificate to consensus.",
150 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
151 registry,
152 )
153 .unwrap(),
154 sequencing_certificate_preceding_disconnected: register_histogram_with_registry!(
155 "sequencing_certificate_preceding_disconnected",
156 "The number of authorities that were hashed to an earlier position that were filtered out due to being disconnected when submitting to consensus.",
157 SEQUENCING_CERTIFICATE_POSITION_BUCKETS.to_vec(),
158 registry,
159 )
160 .unwrap(),
161 sequencing_certificate_processed: register_int_counter_vec_with_registry!(
162 "sequencing_certificate_processed",
163 "The number of certificates that have been processed either by consensus or checkpoint.",
164 &["source"],
165 registry
166 )
167 .unwrap(),
168 sequencing_in_flight_semaphore_wait: register_int_gauge_with_registry!(
169 "sequencing_in_flight_semaphore_wait",
170 "How many requests are blocked on submit_permit.",
171 registry,
172 )
173 .unwrap(),
174 sequencing_in_flight_submissions: register_int_gauge_with_registry!(
175 "sequencing_in_flight_submissions",
176 "Number of transactions submitted to local consensus instance and not yet sequenced",
177 registry,
178 )
179 .unwrap(),
180 sequencing_estimated_latency: register_int_gauge_with_registry!(
181 "sequencing_estimated_latency",
182 "Consensus latency estimated by consensus adapter in milliseconds",
183 registry,
184 )
185 .unwrap(),
186 sequencing_resubmission_interval_ms: register_int_gauge_with_registry!(
187 "sequencing_resubmission_interval_ms",
188 "Resubmission interval used by consensus adapter in milliseconds",
189 registry,
190 )
191 .unwrap(),
192 }
193 }
194
195 pub fn new_test() -> Self {
196 Self::new(&Registry::default())
197 }
198}
199
200pub enum BlockStatusInternal {
202 Sequenced,
203 GarbageCollected,
204}
205
206impl From<starfish_core::BlockStatus> for BlockStatusInternal {
207 fn from(status: starfish_core::BlockStatus) -> Self {
208 match status {
209 starfish_core::BlockStatus::Sequenced(_) => BlockStatusInternal::Sequenced,
210 starfish_core::BlockStatus::GarbageCollected(_) => {
211 BlockStatusInternal::GarbageCollected
212 }
213 }
214 }
215}
216
217pub type BlockStatusReceiver = oneshot::Receiver<BlockStatusInternal>;
218
219#[mockall::automock]
220pub trait SubmitToConsensus: Sync + Send + 'static {
221 fn submit_to_consensus(
222 &self,
223 transactions: &[ConsensusTransaction],
224 epoch_store: &Arc<AuthorityPerEpochStore>,
225 ) -> IotaResult;
226}
227
228#[mockall::automock]
229#[async_trait::async_trait]
230pub trait ConsensusClient: Sync + Send + 'static {
231 async fn submit(
232 &self,
233 transactions: &[ConsensusTransaction],
234 epoch_store: &Arc<AuthorityPerEpochStore>,
235 ) -> IotaResult<BlockStatusReceiver>;
236}
237
238pub struct ConsensusAdapter {
240 consensus_client: Arc<dyn ConsensusClient>,
242 checkpoint_store: Arc<CheckpointStore>,
244 authority: AuthorityName,
246 max_pending_transactions: usize,
248 num_inflight_transactions: AtomicU64,
250 max_submit_position: Option<usize>,
254 submit_delay_step_override: Option<Duration>,
257 connection_monitor_status: Arc<dyn CheckConnection>,
260 low_scoring_authorities: ArcSwap<Arc<ArcSwap<HashMap<AuthorityName, u64>>>>,
262 metrics: ConsensusAdapterMetrics,
264 submit_semaphore: Semaphore,
266 latency_observer: LatencyObserver,
267}
268
269pub trait CheckConnection: Send + Sync {
270 fn check_connection(
271 &self,
272 ourself: &AuthorityName,
273 authority: &AuthorityName,
274 ) -> Option<ConnectionStatus>;
275 fn update_mapping_for_epoch(&self, authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>);
276}
277
278pub struct ConnectionMonitorStatus {
279 pub connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
281 pub authority_names_to_peer_ids: ArcSwap<HashMap<AuthorityName, PeerId>>,
283}
284
285pub struct ConnectionMonitorStatusForTests {}
286
287impl ConsensusAdapter {
288 pub fn new(
290 consensus_client: Arc<dyn ConsensusClient>,
291 checkpoint_store: Arc<CheckpointStore>,
292 authority: AuthorityName,
293 connection_monitor_status: Arc<dyn CheckConnection>,
294 max_pending_transactions: usize,
295 max_pending_local_submissions: usize,
296 max_submit_position: Option<usize>,
297 submit_delay_step_override: Option<Duration>,
298 metrics: ConsensusAdapterMetrics,
299 ) -> Self {
300 let num_inflight_transactions = Default::default();
301 let low_scoring_authorities =
302 ArcSwap::from_pointee(Arc::new(ArcSwap::from_pointee(HashMap::new())));
303 Self {
304 consensus_client,
305 checkpoint_store,
306 authority,
307 max_pending_transactions,
308 max_submit_position,
309 submit_delay_step_override,
310 num_inflight_transactions,
311 connection_monitor_status,
312 low_scoring_authorities,
313 metrics,
314 submit_semaphore: Semaphore::new(max_pending_local_submissions),
315 latency_observer: LatencyObserver::new(),
316 }
317 }
318
319 pub fn swap_low_scoring_authorities(
320 &self,
321 new_low_scoring: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
322 ) {
323 self.low_scoring_authorities.swap(Arc::new(new_low_scoring));
324 }
325
326 pub fn submit_recovered(self: &Arc<Self>, epoch_store: &Arc<AuthorityPerEpochStore>) {
327 let mut recovered = epoch_store.get_all_pending_consensus_transactions();
333
334 if epoch_store
335 .get_reconfig_state_read_lock_guard()
336 .is_reject_user_certs()
337 && epoch_store.pending_consensus_certificates_empty()
338 {
339 if !recovered
341 .iter()
342 .any(ConsensusTransaction::is_end_of_publish)
343 {
344 recovered.push(ConsensusTransaction::new_end_of_publish(self.authority));
353 }
354 }
355 debug!(
356 "Submitting {:?} recovered pending consensus transactions to consensus",
357 recovered.len()
358 );
359 for transaction in recovered {
360 if transaction.is_end_of_publish() {
361 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
362 }
363 self.submit_unchecked(&[transaction], epoch_store);
364 }
365 }
366
367 fn await_submit_delay(
368 &self,
369 committee: &Committee,
370 transactions: &[ConsensusTransaction],
371 ) -> (impl Future<Output = ()>, usize, usize, usize) {
372 let min_digest = transactions
374 .iter()
375 .filter_map(|tx| match &tx.kind {
376 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
377 Some(certificate.digest())
378 }
379 _ => None,
380 })
381 .min();
382
383 let (duration, position, positions_moved, preceding_disconnected) = match min_digest {
384 Some(digest) => self.await_submit_delay_user_transaction(committee, digest),
385 _ => (Duration::ZERO, 0, 0, 0),
386 };
387 (
388 tokio::time::sleep(duration),
389 position,
390 positions_moved,
391 preceding_disconnected,
392 )
393 }
394
395 fn await_submit_delay_user_transaction(
396 &self,
397 committee: &Committee,
398 tx_digest: &TransactionDigest,
399 ) -> (Duration, usize, usize, usize) {
400 let (position, positions_moved, preceding_disconnected) =
401 self.submission_position(committee, tx_digest);
402
403 const DEFAULT_LATENCY: Duration = Duration::from_secs(1); const MIN_LATENCY: Duration = Duration::from_millis(150);
405 const MAX_LATENCY: Duration = Duration::from_secs(3);
406
407 let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
408 self.metrics
409 .sequencing_estimated_latency
410 .set(latency.as_millis() as i64);
411
412 let latency = std::cmp::max(latency, MIN_LATENCY);
413 let latency = std::cmp::min(latency, MAX_LATENCY);
414 let latency = latency * 2;
415 let (delay_step, position) =
416 self.override_by_max_submit_position_settings(latency, position);
417
418 self.metrics
419 .sequencing_resubmission_interval_ms
420 .set(delay_step.as_millis() as i64);
421
422 (
423 delay_step * position as u32,
424 position,
425 positions_moved,
426 preceding_disconnected,
427 )
428 }
429
430 fn override_by_max_submit_position_settings(
436 &self,
437 latency: Duration,
438 mut position: usize,
439 ) -> (Duration, usize) {
440 if let Some(max_submit_position) = self.max_submit_position {
442 position = std::cmp::min(position, max_submit_position);
443 }
444
445 let delay_step = self.submit_delay_step_override.unwrap_or(latency);
446 (delay_step, position)
447 }
448
449 fn submission_position(
459 &self,
460 committee: &Committee,
461 tx_digest: &TransactionDigest,
462 ) -> (usize, usize, usize) {
463 let positions = committee.shuffle_by_stake_from_tx_digest(tx_digest);
464
465 self.check_submission_wrt_connectivity_and_scores(positions)
466 }
467
468 fn check_submission_wrt_connectivity_and_scores(
495 &self,
496 positions: Vec<AuthorityName>,
497 ) -> (usize, usize, usize) {
498 let low_scoring_authorities = self.low_scoring_authorities.load().load_full();
499 if low_scoring_authorities.get(&self.authority).is_some() {
500 return (positions.len(), 0, 0);
501 }
502 let initial_position = get_position_in_list(self.authority, positions.clone());
503 let mut preceding_disconnected = 0;
504 let mut before_our_position = true;
505
506 let filtered_positions: Vec<_> = positions
507 .into_iter()
508 .filter(|authority| {
509 let keep = self.authority == *authority; if keep {
511 before_our_position = false;
512 }
513
514 let connected = self
516 .connection_monitor_status
517 .check_connection(&self.authority, authority)
518 .unwrap_or(ConnectionStatus::Disconnected)
519 == ConnectionStatus::Connected;
520 if !connected && before_our_position {
521 preceding_disconnected += 1; }
523
524 let high_scoring = low_scoring_authorities.get(authority).is_none();
526
527 keep || (connected && high_scoring)
528 })
529 .collect();
530
531 let position = get_position_in_list(self.authority, filtered_positions);
532
533 (
534 position,
535 initial_position - position,
536 preceding_disconnected,
537 )
538 }
539
540 pub fn submit(
551 self: &Arc<Self>,
552 transaction: ConsensusTransaction,
553 lock: Option<&RwLockReadGuard<ReconfigState>>,
554 epoch_store: &Arc<AuthorityPerEpochStore>,
555 ) -> IotaResult<JoinHandle<()>> {
556 self.submit_batch(&[transaction], lock, epoch_store)
557 }
558
559 pub fn submit_batch(
560 self: &Arc<Self>,
561 transactions: &[ConsensusTransaction],
562 lock: Option<&RwLockReadGuard<ReconfigState>>,
563 epoch_store: &Arc<AuthorityPerEpochStore>,
564 ) -> IotaResult<JoinHandle<()>> {
565 if transactions.len() > 1 {
566 for transaction in transactions {
570 fp_ensure!(
571 matches!(
572 transaction.kind,
573 ConsensusTransactionKind::CertifiedTransaction(_)
574 ),
575 IotaError::InvalidTxKindInSoftBundle
576 );
577 }
578 }
579
580 epoch_store.insert_pending_consensus_transactions(transactions, lock)?;
581 Ok(self.submit_unchecked(transactions, epoch_store))
582 }
583
584 pub fn check_limits(&self) -> bool {
587 if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
589 > self.max_pending_transactions
590 {
591 return false;
592 }
593 self.submit_semaphore.available_permits() > 0
595 }
596
597 pub(crate) fn check_consensus_overload(&self) -> IotaResult {
598 fp_ensure!(
599 self.check_limits(),
600 IotaError::TooManyTransactionsPendingConsensus
601 );
602 Ok(())
603 }
604
605 fn submit_unchecked(
606 self: &Arc<Self>,
607 transactions: &[ConsensusTransaction],
608 epoch_store: &Arc<AuthorityPerEpochStore>,
609 ) -> JoinHandle<()> {
610 let async_stage = self
613 .clone()
614 .submit_and_wait(transactions.to_vec(), epoch_store.clone());
615 let join_handle = spawn_monitored_task!(async_stage);
618 join_handle
619 }
620
621 async fn submit_and_wait(
622 self: Arc<Self>,
623 transactions: Vec<ConsensusTransaction>,
624 epoch_store: Arc<AuthorityPerEpochStore>,
625 ) {
626 epoch_store
640 .within_alive_epoch(self.submit_and_wait_inner(transactions, &epoch_store))
641 .await
642 .ok(); }
645
646 async fn submit_and_wait_inner(
647 self: Arc<Self>,
648 transactions: Vec<ConsensusTransaction>,
649 epoch_store: &Arc<AuthorityPerEpochStore>,
650 ) {
651 if transactions.is_empty() {
652 return;
653 }
654
655 let is_soft_bundle = transactions.len() > 1;
662
663 let mut transaction_keys = Vec::new();
664
665 for transaction in &transactions {
666 if matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..)) {
667 info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
668 epoch_store.record_epoch_pending_certs_process_time_metric();
669 }
670
671 let transaction_key = SequencedConsensusTransactionKey::External(transaction.key());
672 transaction_keys.push(transaction_key);
673 }
674 let tx_type = if !is_soft_bundle {
675 classify(&transactions[0])
676 } else {
677 "soft_bundle"
678 };
679
680 let mut guard = InflightDropGuard::acquire(&self, tx_type);
681
682 let (await_submit, position, positions_moved, preceding_disconnected) =
684 self.await_submit_delay(epoch_store.committee(), &transactions[..]);
685
686 let processed_via_consensus_or_checkpoint =
689 self.await_consensus_or_checkpoint(transaction_keys.clone(), epoch_store);
690 pin_mut!(processed_via_consensus_or_checkpoint);
691
692 let processed_waiter = tokio::select! {
693 _ = await_submit => Some(processed_via_consensus_or_checkpoint),
695
696 _ = epoch_store.user_certs_closed_notify() => {
698 warn!(epoch = ?epoch_store.epoch(), "Epoch ended, skipping submission delay");
699 Some(processed_via_consensus_or_checkpoint)
700 }
701
702 _ = &mut processed_via_consensus_or_checkpoint => {
704 None
705 }
706 };
707
708 let _monitor = if !is_soft_bundle
710 && matches!(
711 transactions[0].kind,
712 ConsensusTransactionKind::EndOfPublish(_)
713 | ConsensusTransactionKind::CapabilityNotificationV1(_)
714 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
715 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
716 ) {
717 let transaction_keys = transaction_keys.clone();
718 Some(CancelOnDrop(spawn_monitored_task!(async {
719 let mut i = 0u64;
720 loop {
721 i += 1;
722 const WARN_DELAY_S: u64 = 30;
723 tokio::time::sleep(Duration::from_secs(WARN_DELAY_S)).await;
724 let total_wait = i * WARN_DELAY_S;
725 warn!(
726 "Still waiting {} seconds for transactions {:?} to commit in consensus",
727 total_wait, transaction_keys
728 );
729 }
730 })))
731 } else {
732 None
733 };
734
735 if let Some(processed_waiter) = processed_waiter {
736 debug!("Submitting {:?} to consensus", transaction_keys);
737
738 guard.position = Some(position);
741 guard.positions_moved = Some(positions_moved);
742 guard.preceding_disconnected = Some(preceding_disconnected);
743
744 let _permit: SemaphorePermit = self
745 .submit_semaphore
746 .acquire()
747 .count_in_flight(&self.metrics.sequencing_in_flight_semaphore_wait)
748 .await
749 .expect("Consensus adapter does not close semaphore");
750 let _in_flight_submission_guard =
751 GaugeGuard::acquire(&self.metrics.sequencing_in_flight_submissions);
752
753 let submit_inner = async {
757 const RETRY_DELAY_STEP: Duration = Duration::from_secs(1);
758
759 loop {
760 let status_waiter = self
763 .submit_inner(
764 &transactions,
765 epoch_store,
766 &transaction_keys,
767 tx_type,
768 is_soft_bundle,
769 )
770 .await;
771
772 match status_waiter.await {
773 Ok(BlockStatusInternal::Sequenced) => {
774 self.metrics
775 .sequencing_certificate_status
776 .with_label_values(&[tx_type, "sequenced"])
777 .inc();
778 trace!(
781 "Transaction {transaction_keys:?} has been sequenced by consensus."
782 );
783 break;
784 }
785 Ok(BlockStatusInternal::GarbageCollected) => {
786 self.metrics
787 .sequencing_certificate_status
788 .with_label_values(&[tx_type, "garbage_collected"])
789 .inc();
790 debug!(
798 "Transaction {transaction_keys:?} was garbage collected before being sequenced. Will be retried."
799 );
800 time::sleep(RETRY_DELAY_STEP).await;
801 continue;
802 }
803 Err(err) => {
804 warn!(
805 "Error while waiting for status from consensus for transactions {transaction_keys:?}, with error {:?}. Will be retried.",
806 err
807 );
808 time::sleep(RETRY_DELAY_STEP).await;
809 continue;
810 }
811 }
812 }
813 };
814
815 guard.processed_method = match select(processed_waiter, submit_inner.boxed()).await {
816 Either::Left((observed_via_consensus, _submit_inner)) => observed_via_consensus,
817 Either::Right(((), processed_waiter)) => {
818 debug!("Submitted {transaction_keys:?} to consensus");
819 processed_waiter.await
820 }
821 };
822 }
823 debug!("{transaction_keys:?} processed by consensus");
824
825 let consensus_keys: Vec<_> = transactions.iter().map(|t| t.key()).collect();
826 epoch_store
827 .remove_pending_consensus_transactions(&consensus_keys)
828 .expect("Storage error when removing consensus transaction");
829
830 let is_user_tx = is_soft_bundle
831 || matches!(
832 transactions[0].kind,
833 ConsensusTransactionKind::CertifiedTransaction(_)
834 );
835 let send_end_of_publish = if is_user_tx {
836 if epoch_store
846 .get_reconfig_state_read_lock_guard()
847 .is_reject_user_certs()
848 {
849 let pending_count = epoch_store.pending_consensus_certificates_count();
850 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Deciding whether to send EndOfPublish");
851 pending_count == 0 } else {
853 false
854 }
855 } else {
856 false
857 };
858 if send_end_of_publish {
859 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
861 if let Err(err) = self.submit(
862 ConsensusTransaction::new_end_of_publish(self.authority),
863 None,
864 epoch_store,
865 ) {
866 warn!("Error when sending end of publish message: {:?}", err);
867 }
868 }
869 self.metrics
870 .sequencing_certificate_success
871 .with_label_values(&[tx_type])
872 .inc();
873 }
874
875 async fn submit_inner(
876 self: &Arc<Self>,
877 transactions: &[ConsensusTransaction],
878 epoch_store: &Arc<AuthorityPerEpochStore>,
879 transaction_keys: &[SequencedConsensusTransactionKey],
880 tx_type: &str,
881 is_soft_bundle: bool,
882 ) -> BlockStatusReceiver {
883 let ack_start = Instant::now();
884 let mut retries: u32 = 0;
885
886 let status_waiter = loop {
887 match self
888 .consensus_client
889 .submit(transactions, epoch_store)
890 .await
891 {
892 Err(err) => {
893 if retries > 30
896 || (retries > 3 && (is_soft_bundle || !transactions[0].kind.is_dkg()))
897 {
898 warn!(
899 "Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
900 );
901 }
902 self.metrics
903 .sequencing_certificate_failures
904 .with_label_values(&[tx_type])
905 .inc();
906 retries += 1;
907
908 if !is_soft_bundle && transactions[0].kind.is_dkg() {
909 time::sleep(Duration::from_millis(100)).await;
912 } else {
913 time::sleep(Duration::from_secs(10)).await;
914 };
915 }
916 Ok(status_waiter) => {
917 break status_waiter;
918 }
919 }
920 };
921
922 let bucket = match retries {
926 0..=10 => retries.to_string(), 11..=20 => "between_10_and_20".to_string(),
928 21..=50 => "between_20_and_50".to_string(),
929 51..=100 => "between_50_and_100".to_string(),
930 _ => "over_100".to_string(),
931 };
932
933 self.metrics
934 .sequencing_acknowledge_latency
935 .with_label_values(&[bucket.as_str(), tx_type])
936 .observe(ack_start.elapsed().as_secs_f64());
937
938 status_waiter
939 }
940
941 async fn await_consensus_or_checkpoint(
946 self: &Arc<Self>,
947 transaction_keys: Vec<SequencedConsensusTransactionKey>,
948 epoch_store: &Arc<AuthorityPerEpochStore>,
949 ) -> ProcessedMethod {
950 let notifications = FuturesUnordered::new();
951 for transaction_key in transaction_keys {
952 let transaction_digests = if let SequencedConsensusTransactionKey::External(
953 ConsensusTransactionKey::Certificate(digest),
954 ) = transaction_key
955 {
956 vec![digest]
957 } else {
958 vec![]
959 };
960
961 let checkpoint_synced_future = if let SequencedConsensusTransactionKey::External(
962 ConsensusTransactionKey::CheckpointSignature(_, checkpoint_sequence_number),
963 ) = transaction_key
964 {
965 Either::Left(
970 self.checkpoint_store
971 .notify_read_synced_checkpoint(checkpoint_sequence_number),
972 )
973 } else {
974 Either::Right(future::pending())
975 };
976
977 notifications.push(async move {
984 tokio::select! {
985 processed = epoch_store.consensus_messages_processed_notify(vec![transaction_key]) => {
986 processed.expect("Storage error when waiting for consensus message processed");
987 self.metrics.sequencing_certificate_processed.with_label_values(&["consensus"]).inc();
988 return ProcessedMethod::Consensus;
989 },
990 processed = epoch_store.transactions_executed_in_checkpoint_notify(transaction_digests), if !transaction_digests.is_empty() => {
991 processed.expect("Storage error when waiting for transaction executed in checkpoint");
992 self.metrics.sequencing_certificate_processed.with_label_values(&["checkpoint"]).inc();
993 }
994 _ = checkpoint_synced_future => {
995 self.metrics.sequencing_certificate_processed.with_label_values(&["synced_checkpoint"]).inc();
996 }
997 }
998 ProcessedMethod::Checkpoint
999 });
1000 }
1001
1002 let processed_methods = notifications.collect::<Vec<ProcessedMethod>>().await;
1003 for method in processed_methods {
1004 if method == ProcessedMethod::Checkpoint {
1005 return ProcessedMethod::Checkpoint;
1006 }
1007 }
1008 ProcessedMethod::Consensus
1009 }
1010}
1011
1012impl CheckConnection for ConnectionMonitorStatus {
1013 fn check_connection(
1014 &self,
1015 ourself: &AuthorityName,
1016 authority: &AuthorityName,
1017 ) -> Option<ConnectionStatus> {
1018 if ourself == authority {
1019 return Some(ConnectionStatus::Connected);
1020 }
1021
1022 let mapping = self.authority_names_to_peer_ids.load_full();
1023 let peer_id = match mapping.get(authority) {
1024 Some(p) => p,
1025 None => {
1026 warn!(
1027 "failed to find peer {:?} in connection monitor listener",
1028 authority
1029 );
1030 return None;
1031 }
1032 };
1033
1034 let res = match self.connection_statuses.try_get(peer_id) {
1035 TryResult::Present(c) => Some(c.value().clone()),
1036 TryResult::Absent => None,
1037 TryResult::Locked => {
1038 Some(ConnectionStatus::Disconnected)
1040 }
1041 };
1042 res
1043 }
1044 fn update_mapping_for_epoch(
1045 &self,
1046 authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1047 ) {
1048 self.authority_names_to_peer_ids
1049 .swap(Arc::new(authority_names_to_peer_ids));
1050 }
1051}
1052
1053impl CheckConnection for ConnectionMonitorStatusForTests {
1054 fn check_connection(
1055 &self,
1056 _ourself: &AuthorityName,
1057 _authority: &AuthorityName,
1058 ) -> Option<ConnectionStatus> {
1059 Some(ConnectionStatus::Connected)
1060 }
1061 fn update_mapping_for_epoch(
1062 &self,
1063 _authority_names_to_peer_ids: HashMap<AuthorityName, PeerId>,
1064 ) {
1065 }
1066}
1067
1068pub fn get_position_in_list(
1069 search_authority: AuthorityName,
1070 positions: Vec<AuthorityName>,
1071) -> usize {
1072 positions
1073 .into_iter()
1074 .find_position(|authority| *authority == search_authority)
1075 .expect("Couldn't find ourselves in shuffled committee")
1076 .0
1077}
1078
1079impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
1080 fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) {
1085 let send_end_of_publish = {
1086 let reconfig_guard = epoch_store.get_reconfig_state_write_lock_guard();
1087 if !reconfig_guard.should_accept_user_certs() {
1088 return;
1090 }
1091 let pending_count = epoch_store.pending_consensus_certificates_count();
1092 debug!(epoch=?epoch_store.epoch(), ?pending_count, "Trying to close epoch");
1093 let send_end_of_publish = pending_count == 0;
1094 epoch_store.close_user_certs(reconfig_guard);
1095 send_end_of_publish
1096 };
1098 if send_end_of_publish {
1099 info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
1100 if let Err(err) = self.submit(
1101 ConsensusTransaction::new_end_of_publish(self.authority),
1102 None,
1103 epoch_store,
1104 ) {
1105 warn!("Error when sending end of publish message: {:?}", err);
1106 }
1107 }
1108 }
1109}
1110
1111struct CancelOnDrop<T>(JoinHandle<T>);
1112
1113impl<T> Deref for CancelOnDrop<T> {
1114 type Target = JoinHandle<T>;
1115
1116 fn deref(&self) -> &Self::Target {
1117 &self.0
1118 }
1119}
1120
1121impl<T> Drop for CancelOnDrop<T> {
1122 fn drop(&mut self) {
1123 self.0.abort();
1124 }
1125}
1126
1127struct InflightDropGuard<'a> {
1129 adapter: &'a ConsensusAdapter,
1130 start: Instant,
1131 position: Option<usize>,
1132 positions_moved: Option<usize>,
1133 preceding_disconnected: Option<usize>,
1134 tx_type: &'static str,
1135 processed_method: ProcessedMethod,
1136}
1137
1138#[derive(PartialEq, Eq)]
1139enum ProcessedMethod {
1140 Consensus,
1141 Checkpoint,
1142}
1143
1144impl<'a> InflightDropGuard<'a> {
1145 pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
1146 adapter
1147 .num_inflight_transactions
1148 .fetch_add(1, Ordering::SeqCst);
1149 adapter
1150 .metrics
1151 .sequencing_certificate_inflight
1152 .with_label_values(&[tx_type])
1153 .inc();
1154 adapter
1155 .metrics
1156 .sequencing_certificate_attempt
1157 .with_label_values(&[tx_type])
1158 .inc();
1159 Self {
1160 adapter,
1161 start: Instant::now(),
1162 position: None,
1163 positions_moved: None,
1164 preceding_disconnected: None,
1165 tx_type,
1166 processed_method: ProcessedMethod::Consensus,
1167 }
1168 }
1169}
1170
1171impl Drop for InflightDropGuard<'_> {
1172 fn drop(&mut self) {
1173 self.adapter
1174 .num_inflight_transactions
1175 .fetch_sub(1, Ordering::SeqCst);
1176 self.adapter
1177 .metrics
1178 .sequencing_certificate_inflight
1179 .with_label_values(&[self.tx_type])
1180 .dec();
1181
1182 let position = if let Some(position) = self.position {
1183 self.adapter
1184 .metrics
1185 .sequencing_certificate_authority_position
1186 .observe(position as f64);
1187 position.to_string()
1188 } else {
1189 "not_submitted".to_string()
1190 };
1191
1192 if let Some(positions_moved) = self.positions_moved {
1193 self.adapter
1194 .metrics
1195 .sequencing_certificate_positions_moved
1196 .observe(positions_moved as f64);
1197 };
1198
1199 if let Some(preceding_disconnected) = self.preceding_disconnected {
1200 self.adapter
1201 .metrics
1202 .sequencing_certificate_preceding_disconnected
1203 .observe(preceding_disconnected as f64);
1204 };
1205
1206 let latency = self.start.elapsed();
1207 let processed_method = match self.processed_method {
1208 ProcessedMethod::Consensus => "processed_via_consensus",
1209 ProcessedMethod::Checkpoint => "processed_via_checkpoint",
1210 };
1211 self.adapter
1212 .metrics
1213 .sequencing_certificate_latency
1214 .with_label_values(&[position.as_str(), self.tx_type, processed_method])
1215 .observe(latency.as_secs_f64());
1216
1217 if self.position == Some(0) {
1223 let sampled = matches!(
1226 self.tx_type,
1227 "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
1228 );
1229 if sampled && self.processed_method == ProcessedMethod::Consensus {
1232 self.adapter.latency_observer.report(latency);
1233 }
1234 }
1235 }
1236}
1237
1238impl SubmitToConsensus for Arc<ConsensusAdapter> {
1239 fn submit_to_consensus(
1240 &self,
1241 transactions: &[ConsensusTransaction],
1242 epoch_store: &Arc<AuthorityPerEpochStore>,
1243 ) -> IotaResult {
1244 self.submit_batch(transactions, None, epoch_store)
1245 .map(|_| ())
1246 }
1247}
1248
1249pub fn position_submit_certificate(
1250 committee: &Committee,
1251 ourselves: &AuthorityName,
1252 tx_digest: &TransactionDigest,
1253) -> usize {
1254 let validators = committee.shuffle_by_stake_from_tx_digest(tx_digest);
1255 get_position_in_list(*ourselves, validators)
1256}
1257
1258#[cfg(test)]
1259mod adapter_tests {
1260 use std::{sync::Arc, time::Duration};
1261
1262 use fastcrypto::traits::KeyPair;
1263 use iota_types::{
1264 base_types::TransactionDigest,
1265 committee::Committee,
1266 crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes, get_key_pair_from_rng},
1267 };
1268 use rand::{Rng, SeedableRng, rngs::StdRng};
1269
1270 use super::position_submit_certificate;
1271 use crate::{
1272 checkpoints::CheckpointStore,
1273 consensus_adapter::{
1274 ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
1275 },
1276 starfish_adapter::LazyStarfishClient,
1277 };
1278
1279 fn test_committee(rng: &mut StdRng, size: usize) -> Committee {
1280 let authorities = (0..size)
1281 .map(|_k| {
1282 (
1283 AuthorityPublicKeyBytes::from(
1284 get_key_pair_from_rng::<AuthorityKeyPair, _>(rng).1.public(),
1285 ),
1286 rng.gen_range(0u64..10u64),
1287 )
1288 })
1289 .collect::<Vec<_>>();
1290 Committee::new_for_testing_with_normalized_voting_power(
1291 0,
1292 authorities.iter().cloned().collect(),
1293 )
1294 }
1295
1296 #[tokio::test]
1297 async fn test_await_submit_delay_user_transaction() {
1298 let mut rng = StdRng::from_seed([0; 32]);
1300 let committee = test_committee(&mut rng, 10);
1301
1302 let consensus_adapter = ConsensusAdapter::new(
1304 Arc::new(LazyStarfishClient::new()),
1305 CheckpointStore::new_for_tests(),
1306 *committee.authority_by_index(0).unwrap(),
1307 Arc::new(ConnectionMonitorStatusForTests {}),
1308 100_000,
1309 100_000,
1310 Some(1),
1311 Some(Duration::from_secs(2)),
1312 ConsensusAdapterMetrics::new_test(),
1313 );
1314
1315 let tx_digest = TransactionDigest::generate(&mut rng);
1317
1318 let (position, positions_moved, _) =
1320 consensus_adapter.submission_position(&committee, &tx_digest);
1321 assert_eq!(position, 7);
1322 assert!(!positions_moved > 0);
1323
1324 let (delay_step, position, positions_moved, _) =
1326 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1327
1328 assert_eq!(position, 1);
1329 assert_eq!(delay_step, Duration::from_secs(2));
1330 assert!(!positions_moved > 0);
1331
1332 let consensus_adapter = ConsensusAdapter::new(
1334 Arc::new(LazyStarfishClient::new()),
1335 CheckpointStore::new_for_tests(),
1336 *committee.authority_by_index(0).unwrap(),
1337 Arc::new(ConnectionMonitorStatusForTests {}),
1338 100_000,
1339 100_000,
1340 None,
1341 None,
1342 ConsensusAdapterMetrics::new_test(),
1343 );
1344
1345 let (delay_step, position, positions_moved, _) =
1346 consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest);
1347
1348 assert_eq!(position, 7);
1349
1350 assert_eq!(delay_step, Duration::from_secs(14));
1352 assert!(!positions_moved > 0);
1353 }
1354
1355 #[test]
1356 fn test_position_submit_certificate() {
1357 let mut rng = StdRng::from_seed([0; 32]);
1359 let committee = test_committee(&mut rng, 10);
1360
1361 const NUM_TEST_TRANSACTIONS: usize = 1000;
1363
1364 for _tx_idx in 0..NUM_TEST_TRANSACTIONS {
1365 let tx_digest = TransactionDigest::generate(&mut rng);
1366
1367 let mut zero_found = false;
1368 for (name, _) in committee.members() {
1369 let f = position_submit_certificate(&committee, name, &tx_digest);
1370 assert!(f < committee.num_members());
1371 if f == 0 {
1372 assert!(!zero_found);
1374 zero_found = true;
1375 }
1376 }
1377 assert!(zero_found);
1378 }
1379 }
1380}