iota_replay/
transaction_provider.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::VecDeque,
7    fmt::{Debug, Formatter},
8    str::FromStr,
9};
10
11use iota_sdk::IotaClientBuilder;
12use iota_types::digests::TransactionDigest;
13use tracing::info;
14
15use crate::{
16    data_fetcher::{DataFetcher, RemoteFetcher},
17    types::{MAX_CONCURRENT_REQUESTS, RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD, ReplayEngineError},
18};
19
20const VALID_CHECKPOINT_START: u64 = 1;
21
22#[derive(Clone, Debug)]
23pub enum TransactionSource {
24    /// Fetch a random transaction from the network
25    Random,
26    /// Fetch a transaction from the network with a specific checkpoint ID
27    FromCheckpoint(u64),
28    /// Use the latest transaction from the network
29    TailLatest { start: Option<FuzzStartPoint> },
30    /// Use a random transaction from an inclusive range of checkpoint IDs
31    FromInclusiveCheckpointRange {
32        checkpoint_start: u64,
33        checkpoint_end: u64,
34    },
35}
36
37#[derive(Clone)]
38pub struct TransactionProvider {
39    pub fetcher: RemoteFetcher,
40    pub source: TransactionSource,
41    pub last_checkpoint: Option<u64>,
42    pub transactions_left: VecDeque<TransactionDigest>,
43}
44
45#[derive(Eq, PartialEq, Clone, Copy, PartialOrd, Ord, Hash, Debug)]
46pub enum FuzzStartPoint {
47    Checkpoint(u64),
48    TxDigest(TransactionDigest),
49}
50
51impl FromStr for FuzzStartPoint {
52    type Err = anyhow::Error;
53
54    fn from_str(s: &str) -> Result<Self, Self::Err> {
55        match s.parse::<u64>() {
56            Ok(n) => Ok(FuzzStartPoint::Checkpoint(n)),
57            Err(u64_err) => match TransactionDigest::from_str(s) {
58                Ok(d) => Ok(FuzzStartPoint::TxDigest(d)),
59                Err(tx_err) => {
60                    info!(
61                        "{} is not a valid checkpoint (err: {:?}) or transaction digest (err: {:?})",
62                        s, u64_err, tx_err
63                    );
64                    Err(tx_err)
65                }
66            },
67        }
68    }
69}
70
71impl Debug for TransactionProvider {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("TransactionProvider")
74            // TODO: impl Debug for fetcher
75            //.field("fetcher", &self.fetcher)
76            .field("source", &self.source)
77            .field("last_checkpoint", &self.last_checkpoint)
78            .field("transactions_left", &self.transactions_left)
79            .finish()
80    }
81}
82
83impl TransactionProvider {
84    pub async fn new(http_url: &str, source: TransactionSource) -> Result<Self, ReplayEngineError> {
85        Ok(Self {
86            fetcher: RemoteFetcher::new(
87                IotaClientBuilder::default()
88                    .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
89                    .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
90                    .build(http_url)
91                    .await?,
92            ),
93            source,
94            last_checkpoint: None,
95            transactions_left: VecDeque::new(),
96        })
97    }
98
99    pub async fn next(&mut self) -> Result<Option<TransactionDigest>, ReplayEngineError> {
100        let tx = match self.source {
101            TransactionSource::Random => {
102                let tx = self.fetcher.fetch_random_transaction(None, None).await?;
103                Some(tx)
104            }
105            TransactionSource::FromCheckpoint(checkpoint_id) => {
106                let tx = self
107                    .fetcher
108                    .fetch_random_transaction(Some(checkpoint_id), Some(checkpoint_id))
109                    .await?;
110                Some(tx)
111            }
112            TransactionSource::TailLatest { start } => {
113                if let Some(tx) = self.transactions_left.pop_front() {
114                    Some(tx)
115                } else {
116                    let next_checkpoint = match start {
117                        Some(x) => match x {
118                            // Checkpoint specified
119                            FuzzStartPoint::Checkpoint(checkpoint_id) => {
120                                self.source = TransactionSource::TailLatest {
121                                    start: Some(FuzzStartPoint::Checkpoint(checkpoint_id + 1)),
122                                };
123                                Some(checkpoint_id)
124                            }
125                            // Digest specified. Find the checkpoint for the digest
126                            FuzzStartPoint::TxDigest(tx_digest) => {
127                                let ch = self
128                                    .fetcher
129                                    .get_transaction(&tx_digest)
130                                    .await?
131                                    .checkpoint
132                                    .expect("Transaction must have a checkpoint");
133                                // For the next round
134                                self.source = TransactionSource::TailLatest {
135                                    start: Some(FuzzStartPoint::Checkpoint(ch + 1)),
136                                };
137                                Some(ch)
138                            }
139                        },
140                        // Advance to next checkpoint if available
141                        None => self.last_checkpoint.map(|c| c + 1),
142                    }
143                    .unwrap_or(VALID_CHECKPOINT_START);
144
145                    self.transactions_left = self
146                        .fetcher
147                        .get_checkpoint_txs(next_checkpoint)
148                        .await?
149                        .into();
150                    self.last_checkpoint = Some(next_checkpoint);
151                    self.transactions_left.pop_front()
152                }
153            }
154            TransactionSource::FromInclusiveCheckpointRange {
155                checkpoint_start,
156                checkpoint_end,
157            } => {
158                let tx = self
159                    .fetcher
160                    .fetch_random_transaction(Some(checkpoint_start), Some(checkpoint_end))
161                    .await?;
162                Some(tx)
163            }
164        };
165
166        Ok(tx)
167    }
168}