iota_core/
mysticeti_adapter.rs1use std::{sync::Arc, time::Duration};
6
7use arc_swap::{ArcSwapOption, Guard};
8use consensus_core::{ClientError, TransactionClient};
9use iota_types::{
10 error::{IotaError, IotaResult},
11 messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
12};
13use tap::prelude::*;
14use tokio::time::{Instant, sleep};
15use tracing::{error, info, warn};
16
17use crate::{
18 authority::authority_per_epoch_store::AuthorityPerEpochStore,
19 consensus_adapter::{BlockStatusReceiver, ConsensusClient},
20 consensus_handler::SequencedConsensusTransactionKey,
21};
22
23#[derive(Default, Clone)]
27pub struct LazyMysticetiClient {
28 client: Arc<ArcSwapOption<TransactionClient>>,
29}
30
31impl LazyMysticetiClient {
32 pub fn new() -> Self {
33 Self {
34 client: Arc::new(ArcSwapOption::empty()),
35 }
36 }
37
38 async fn get(&self) -> Guard<Option<Arc<TransactionClient>>> {
39 let client = self.client.load();
40 if client.is_some() {
41 return client;
42 }
43
44 let mut count = 0;
49 let start = Instant::now();
50 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
51 loop {
52 let client = self.client.load();
53 if client.is_some() {
54 return client;
55 } else {
56 sleep(RETRY_INTERVAL).await;
57 count += 1;
58 if count % 100 == 0 {
59 warn!(
60 "Waiting for consensus to initialize after {:?}",
61 Instant::now() - start
62 );
63 }
64 }
65 }
66 }
67
68 pub fn set(&self, client: Arc<TransactionClient>) {
69 self.client.store(Some(client));
70 }
71
72 pub fn clear(&self) {
73 self.client.store(None);
74 }
75}
76
77#[async_trait::async_trait]
78impl ConsensusClient for LazyMysticetiClient {
79 async fn submit(
80 &self,
81 transactions: &[ConsensusTransaction],
82 _epoch_store: &Arc<AuthorityPerEpochStore>,
83 ) -> IotaResult<BlockStatusReceiver> {
84 let client = self.get().await;
88 let transactions_bytes = transactions
89 .iter()
90 .map(|t| bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail"))
91 .collect::<Vec<_>>();
92 let (block_ref, status_waiter) = client
93 .as_ref()
94 .expect("Client should always be returned")
95 .submit(transactions_bytes)
96 .await
97 .tap_err(|err| {
98 let msg = format!("Transaction submission failed with: {:?}", err);
100 match err {
101 ClientError::ConsensusShuttingDown(_) => {
102 info!("{}", msg);
103 }
104 ClientError::OversizedTransaction(_, _)
105 | ClientError::OversizedTransactionBundleBytes(_, _)
106 | ClientError::OversizedTransactionBundleCount(_, _) => {
107 if cfg!(debug_assertions) {
108 panic!("{}", msg);
109 } else {
110 error!("{}", msg);
111 }
112 }
113 };
114 })
115 .map_err(|err| IotaError::FailedToSubmitToConsensus(err.to_string()))?;
116
117 let is_soft_bundle = transactions.len() > 1;
118
119 if !is_soft_bundle
120 && matches!(
121 transactions[0].kind,
122 ConsensusTransactionKind::EndOfPublish(_)
123 | ConsensusTransactionKind::CapabilityNotificationV1(_)
124 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
125 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
126 )
127 {
128 let transaction_key = SequencedConsensusTransactionKey::External(transactions[0].key());
129 tracing::info!("Transaction {transaction_key:?} was included in {block_ref}",)
130 };
131 Ok(status_waiter)
132 }
133}