iota_core/
mysticeti_adapter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use arc_swap::{ArcSwapOption, Guard};
8use consensus_core::{ClientError, TransactionClient};
9use iota_types::{
10    error::{IotaError, IotaResult},
11    messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
12};
13use tap::prelude::*;
14use tokio::time::{Instant, sleep};
15use tracing::{error, info, warn};
16
17use crate::{
18    authority::authority_per_epoch_store::AuthorityPerEpochStore,
19    consensus_adapter::{BlockStatusReceiver, ConsensusClient},
20    consensus_handler::SequencedConsensusTransactionKey,
21};
22
23/// Gets a client to submit transactions to Mysticeti, or waits for one to be
24/// available. This hides the complexities of async consensus initialization and
25/// submitting to different instances of consensus across epochs.
26#[derive(Default, Clone)]
27pub struct LazyMysticetiClient {
28    client: Arc<ArcSwapOption<TransactionClient>>,
29}
30
31impl LazyMysticetiClient {
32    pub fn new() -> Self {
33        Self {
34            client: Arc::new(ArcSwapOption::empty()),
35        }
36    }
37
38    async fn get(&self) -> Guard<Option<Arc<TransactionClient>>> {
39        let client = self.client.load();
40        if client.is_some() {
41            return client;
42        }
43
44        // Consensus client is initialized after validators or epoch starts, and cleared
45        // after an epoch ends. But calls to get() can happen during validator
46        // startup or epoch change, before consensus finished initializations.
47        // TODO: maybe listen to updates from consensus manager instead of polling.
48        let mut count = 0;
49        let start = Instant::now();
50        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
51        loop {
52            let client = self.client.load();
53            if client.is_some() {
54                return client;
55            } else {
56                sleep(RETRY_INTERVAL).await;
57                count += 1;
58                if count % 100 == 0 {
59                    warn!(
60                        "Waiting for consensus to initialize after {:?}",
61                        Instant::now() - start
62                    );
63                }
64            }
65        }
66    }
67
68    pub fn set(&self, client: Arc<TransactionClient>) {
69        self.client.store(Some(client));
70    }
71
72    pub fn clear(&self) {
73        self.client.store(None);
74    }
75}
76
77#[async_trait::async_trait]
78impl ConsensusClient for LazyMysticetiClient {
79    async fn submit(
80        &self,
81        transactions: &[ConsensusTransaction],
82        _epoch_store: &Arc<AuthorityPerEpochStore>,
83    ) -> IotaResult<BlockStatusReceiver> {
84        // TODO(mysticeti): confirm comment is still true
85        // The retrieved TransactionClient can be from the past epoch. Submit would fail
86        // after Mysticeti shuts down, so there should be no correctness issue.
87        let client = self.get().await;
88        let transactions_bytes = transactions
89            .iter()
90            .map(|t| bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail"))
91            .collect::<Vec<_>>();
92        let (block_ref, status_waiter) = client
93            .as_ref()
94            .expect("Client should always be returned")
95            .submit(transactions_bytes)
96            .await
97            .tap_err(|err| {
98                // Will be logged by caller as well.
99                let msg = format!("Transaction submission failed with: {:?}", err);
100                match err {
101                    ClientError::ConsensusShuttingDown(_) => {
102                        info!("{}", msg);
103                    }
104                    ClientError::OversizedTransaction(_, _)
105                    | ClientError::OversizedTransactionBundleBytes(_, _)
106                    | ClientError::OversizedTransactionBundleCount(_, _) => {
107                        if cfg!(debug_assertions) {
108                            panic!("{}", msg);
109                        } else {
110                            error!("{}", msg);
111                        }
112                    }
113                };
114            })
115            .map_err(|err| IotaError::FailedToSubmitToConsensus(err.to_string()))?;
116
117        let is_soft_bundle = transactions.len() > 1;
118
119        if !is_soft_bundle
120            && matches!(
121                transactions[0].kind,
122                ConsensusTransactionKind::EndOfPublish(_)
123                    | ConsensusTransactionKind::CapabilityNotificationV1(_)
124                    | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
125                    | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
126            )
127        {
128            let transaction_key = SequencedConsensusTransactionKey::External(transactions[0].key());
129            tracing::info!("Transaction {transaction_key:?} was included in {block_ref}",)
130        };
131        Ok(status_waiter)
132    }
133}