iota_replay/
batch_replay.rs1use std::{
6 collections::VecDeque,
7 path::PathBuf,
8 sync::{Arc, atomic::AtomicUsize},
9};
10
11use futures::{FutureExt, future::join_all};
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_types::base_types::TransactionDigest;
14use parking_lot::Mutex;
15use tokio::time::Instant;
16use tracing::{error, info};
17
18use crate::{
19 replay::{ExecutionSandboxState, LocalExec},
20 types::ReplayEngineError,
21};
22
23pub async fn batch_replay(
28 tx_digests: impl Iterator<Item = TransactionDigest>,
29 num_tasks: u64,
30 rpc_url: String,
31 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
32 use_authority: bool,
33 terminate_early: bool,
34 persist_path: Option<PathBuf>,
35) {
36 let provider = Arc::new(TransactionDigestProvider::new(tx_digests));
37 let cancel = tokio_util::sync::CancellationToken::new();
38 let mut tasks = vec![];
39 let cur_time = Instant::now();
40 for _ in 0..num_tasks {
41 let provider = provider.clone();
42 let expensive_safety_check_config = expensive_safety_check_config.clone();
43 let rpc_url_ref = rpc_url.as_ref();
44 let cancel = cancel.clone();
45 let persist_path_ref = persist_path.as_ref();
46 tasks.push(run_task(
47 provider,
48 rpc_url_ref,
49 expensive_safety_check_config,
50 use_authority,
51 terminate_early,
52 cancel,
53 persist_path_ref,
54 ));
55 }
56 let all_failed_transactions: Vec<_> = join_all(tasks).await.into_iter().flatten().collect();
57 info!(
58 "Finished replaying {} transactions, took {:?}",
59 provider.get_executed_count(),
60 cur_time.elapsed()
61 );
62 if all_failed_transactions.is_empty() {
63 info!("All transactions passed");
64 } else {
65 error!("Some transactions failed: {:?}", all_failed_transactions);
66 }
67}
68
69struct TransactionDigestProvider {
70 digests: Mutex<VecDeque<TransactionDigest>>,
71 total_count: usize,
72 executed_count: AtomicUsize,
73}
74
75impl TransactionDigestProvider {
76 pub fn new(digests: impl Iterator<Item = TransactionDigest>) -> Self {
77 let digests: VecDeque<_> = digests.collect();
78 let total_count = digests.len();
79 Self {
80 digests: Mutex::new(digests),
81 total_count,
82 executed_count: AtomicUsize::new(0),
83 }
84 }
85
86 pub fn get_total_count(&self) -> usize {
87 self.total_count
88 }
89
90 pub fn get_executed_count(&self) -> usize {
91 self.executed_count
92 .load(std::sync::atomic::Ordering::Relaxed)
93 }
94
95 pub fn next_digest(&self) -> Option<(usize, TransactionDigest)> {
97 let next_digest = self.digests.lock().pop_front();
98 next_digest.map(|digest| {
99 let executed_count = self
100 .executed_count
101 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
102 (executed_count + 1, digest)
103 })
104 }
105}
106
107async fn run_task(
108 tx_digest_provider: Arc<TransactionDigestProvider>,
109 http_url: &str,
110 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
111 use_authority: bool,
112 terminate_early: bool,
113 cancel: tokio_util::sync::CancellationToken,
114 persist_path: Option<&PathBuf>,
115) -> Vec<ReplayEngineError> {
116 let total_count = tx_digest_provider.get_total_count();
117 let mut failed_transactions = vec![];
118 let mut executor = LocalExec::new_from_fn_url(http_url).await.unwrap();
119 while let Some((index, digest)) = tx_digest_provider.next_digest() {
120 if cancel.is_cancelled() {
121 break;
122 }
123 info!(
124 "[{}/{}] Replaying transaction {:?}...",
125 index, total_count, digest
126 );
127 let sandbox_persist_path = persist_path.map(|path| path.join(format!("{}.json", digest,)));
128 if let Some(p) = sandbox_persist_path.as_ref() {
129 if p.exists() {
130 info!(
131 "Skipping transaction {:?} as it has been replayed before",
132 digest
133 );
134 continue;
135 }
136 }
137 let async_func = execute_transaction(
138 &mut executor,
139 &digest,
140 expensive_safety_check_config.clone(),
141 use_authority,
142 )
143 .fuse();
144 let result = tokio::select! {
145 result = async_func => result,
146 _ = cancel.cancelled() => {
147 break;
148 }
149 };
150 match result {
151 Err(err) => {
152 error!("Replaying transaction {:?} failed: {:?}", digest, err);
153 failed_transactions.push(err.clone());
154 if terminate_early {
155 cancel.cancel();
156 break;
157 }
158 }
159 Ok(sandbox_state) => {
160 info!("Replaying transaction {:?} succeeded", digest);
161 if let Some(p) = sandbox_persist_path {
162 let out = serde_json::to_string(&sandbox_state).unwrap();
163 std::fs::write(p, out).unwrap();
164 }
165 }
166 }
167 }
168 failed_transactions
169}
170
171async fn execute_transaction(
172 executor: &mut LocalExec,
173 digest: &TransactionDigest,
174 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
175 use_authority: bool,
176) -> Result<ExecutionSandboxState, ReplayEngineError> {
177 *executor = loop {
178 match executor.clone().reset_for_new_execution_with_client().await {
179 Ok(executor) => break executor,
180 Err(err) => {
181 error!("Failed to reset executor: {:?}. Retrying in 3s", err);
182 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
183 }
184 }
185 };
186 let sandbox_state = loop {
187 let result = executor
188 .execute_transaction(
189 digest,
190 expensive_safety_check_config.clone(),
191 use_authority,
192 None,
193 None,
194 None,
195 None,
196 )
197 .await;
198 match result {
199 Ok(sandbox_state) => break sandbox_state,
200 err @ Err(ReplayEngineError::TransactionNotSupported { .. }) => {
201 return err;
202 }
203 Err(err) => {
204 error!("Failed to execute transaction: {:?}. Retrying in 3s", err);
205 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
206 }
207 }
208 };
209 sandbox_state.check_effects()?;
210 Ok(sandbox_state)
211}