iota_replay/
transaction_provider.rs1use std::{
6 collections::VecDeque,
7 fmt::{Debug, Formatter},
8 str::FromStr,
9};
10
11use iota_sdk::IotaClientBuilder;
12use iota_types::digests::TransactionDigest;
13use tracing::info;
14
15use crate::{
16 data_fetcher::{DataFetcher, RemoteFetcher},
17 types::{MAX_CONCURRENT_REQUESTS, RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD, ReplayEngineError},
18};
19
20const VALID_CHECKPOINT_START: u64 = 1;
21
22#[derive(Clone, Debug)]
23pub enum TransactionSource {
24 Random,
26 FromCheckpoint(u64),
28 TailLatest { start: Option<FuzzStartPoint> },
30 FromInclusiveCheckpointRange {
32 checkpoint_start: u64,
33 checkpoint_end: u64,
34 },
35}
36
37#[derive(Clone)]
38pub struct TransactionProvider {
39 pub fetcher: RemoteFetcher,
40 pub source: TransactionSource,
41 pub last_checkpoint: Option<u64>,
42 pub transactions_left: VecDeque<TransactionDigest>,
43}
44
45#[derive(Eq, PartialEq, Clone, Copy, PartialOrd, Ord, Hash, Debug)]
46pub enum FuzzStartPoint {
47 Checkpoint(u64),
48 TxDigest(TransactionDigest),
49}
50
51impl FromStr for FuzzStartPoint {
52 type Err = anyhow::Error;
53
54 fn from_str(s: &str) -> Result<Self, Self::Err> {
55 match s.parse::<u64>() {
56 Ok(n) => Ok(FuzzStartPoint::Checkpoint(n)),
57 Err(u64_err) => match TransactionDigest::from_str(s) {
58 Ok(d) => Ok(FuzzStartPoint::TxDigest(d)),
59 Err(tx_err) => {
60 info!(
61 "{} is not a valid checkpoint (err: {:?}) or transaction digest (err: {:?})",
62 s, u64_err, tx_err
63 );
64 Err(tx_err)
65 }
66 },
67 }
68 }
69}
70
71impl Debug for TransactionProvider {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("TransactionProvider")
74 .field("source", &self.source)
77 .field("last_checkpoint", &self.last_checkpoint)
78 .field("transactions_left", &self.transactions_left)
79 .finish()
80 }
81}
82
83impl TransactionProvider {
84 pub async fn new(http_url: &str, source: TransactionSource) -> Result<Self, ReplayEngineError> {
85 Ok(Self {
86 fetcher: RemoteFetcher::new(
87 IotaClientBuilder::default()
88 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
89 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
90 .build(http_url)
91 .await?,
92 ),
93 source,
94 last_checkpoint: None,
95 transactions_left: VecDeque::new(),
96 })
97 }
98
99 pub async fn next(&mut self) -> Result<Option<TransactionDigest>, ReplayEngineError> {
100 let tx = match self.source {
101 TransactionSource::Random => {
102 let tx = self.fetcher.fetch_random_transaction(None, None).await?;
103 Some(tx)
104 }
105 TransactionSource::FromCheckpoint(checkpoint_id) => {
106 let tx = self
107 .fetcher
108 .fetch_random_transaction(Some(checkpoint_id), Some(checkpoint_id))
109 .await?;
110 Some(tx)
111 }
112 TransactionSource::TailLatest { start } => {
113 if let Some(tx) = self.transactions_left.pop_front() {
114 Some(tx)
115 } else {
116 let next_checkpoint = match start {
117 Some(x) => match x {
118 FuzzStartPoint::Checkpoint(checkpoint_id) => {
120 self.source = TransactionSource::TailLatest {
121 start: Some(FuzzStartPoint::Checkpoint(checkpoint_id + 1)),
122 };
123 Some(checkpoint_id)
124 }
125 FuzzStartPoint::TxDigest(tx_digest) => {
127 let ch = self
128 .fetcher
129 .get_transaction(&tx_digest)
130 .await?
131 .checkpoint
132 .expect("Transaction must have a checkpoint");
133 self.source = TransactionSource::TailLatest {
135 start: Some(FuzzStartPoint::Checkpoint(ch + 1)),
136 };
137 Some(ch)
138 }
139 },
140 None => self.last_checkpoint.map(|c| c + 1),
142 }
143 .unwrap_or(VALID_CHECKPOINT_START);
144
145 self.transactions_left = self
146 .fetcher
147 .get_checkpoint_txs(next_checkpoint)
148 .await?
149 .into();
150 self.last_checkpoint = Some(next_checkpoint);
151 self.transactions_left.pop_front()
152 }
153 }
154 TransactionSource::FromInclusiveCheckpointRange {
155 checkpoint_start,
156 checkpoint_end,
157 } => {
158 let tx = self
159 .fetcher
160 .fetch_random_transaction(Some(checkpoint_start), Some(checkpoint_end))
161 .await?;
162 Some(tx)
163 }
164 };
165
166 Ok(tx)
167 }
168}