iota_replay/
batch_replay.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::VecDeque,
7    path::PathBuf,
8    sync::{Arc, atomic::AtomicUsize},
9};
10
11use futures::{FutureExt, future::join_all};
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_types::base_types::TransactionDigest;
14use parking_lot::Mutex;
15use tokio::time::Instant;
16use tracing::{error, info};
17
18use crate::{
19    replay::{ExecutionSandboxState, LocalExec},
20    types::ReplayEngineError,
21};
22
23/// Given a list of transaction digests, replay them in parallel using
24/// `num_tasks` tasks. If `terminate_early` is true, the replay will terminate
25/// early if any transaction fails; otherwise it will try to finish all
26/// transactions.
27pub async fn batch_replay(
28    tx_digests: impl Iterator<Item = TransactionDigest>,
29    num_tasks: u64,
30    rpc_url: String,
31    expensive_safety_check_config: ExpensiveSafetyCheckConfig,
32    use_authority: bool,
33    terminate_early: bool,
34    persist_path: Option<PathBuf>,
35) {
36    let provider = Arc::new(TransactionDigestProvider::new(tx_digests));
37    let cancel = tokio_util::sync::CancellationToken::new();
38    let mut tasks = vec![];
39    let cur_time = Instant::now();
40    for _ in 0..num_tasks {
41        let provider = provider.clone();
42        let expensive_safety_check_config = expensive_safety_check_config.clone();
43        let rpc_url_ref = rpc_url.as_ref();
44        let cancel = cancel.clone();
45        let persist_path_ref = persist_path.as_ref();
46        tasks.push(run_task(
47            provider,
48            rpc_url_ref,
49            expensive_safety_check_config,
50            use_authority,
51            terminate_early,
52            cancel,
53            persist_path_ref,
54        ));
55    }
56    let all_failed_transactions: Vec<_> = join_all(tasks).await.into_iter().flatten().collect();
57    info!(
58        "Finished replaying {} transactions, took {:?}",
59        provider.get_executed_count(),
60        cur_time.elapsed()
61    );
62    if all_failed_transactions.is_empty() {
63        info!("All transactions passed");
64    } else {
65        error!("Some transactions failed: {:?}", all_failed_transactions);
66    }
67}
68
69struct TransactionDigestProvider {
70    digests: Mutex<VecDeque<TransactionDigest>>,
71    total_count: usize,
72    executed_count: AtomicUsize,
73}
74
75impl TransactionDigestProvider {
76    pub fn new(digests: impl Iterator<Item = TransactionDigest>) -> Self {
77        let digests: VecDeque<_> = digests.collect();
78        let total_count = digests.len();
79        Self {
80            digests: Mutex::new(digests),
81            total_count,
82            executed_count: AtomicUsize::new(0),
83        }
84    }
85
86    pub fn get_total_count(&self) -> usize {
87        self.total_count
88    }
89
90    pub fn get_executed_count(&self) -> usize {
91        self.executed_count
92            .load(std::sync::atomic::Ordering::Relaxed)
93    }
94
95    /// Returns the index and digest of the next transaction, if any.
96    pub fn next_digest(&self) -> Option<(usize, TransactionDigest)> {
97        let next_digest = self.digests.lock().pop_front();
98        next_digest.map(|digest| {
99            let executed_count = self
100                .executed_count
101                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
102            (executed_count + 1, digest)
103        })
104    }
105}
106
107async fn run_task(
108    tx_digest_provider: Arc<TransactionDigestProvider>,
109    http_url: &str,
110    expensive_safety_check_config: ExpensiveSafetyCheckConfig,
111    use_authority: bool,
112    terminate_early: bool,
113    cancel: tokio_util::sync::CancellationToken,
114    persist_path: Option<&PathBuf>,
115) -> Vec<ReplayEngineError> {
116    let total_count = tx_digest_provider.get_total_count();
117    let mut failed_transactions = vec![];
118    let mut executor = LocalExec::new_from_fn_url(http_url).await.unwrap();
119    while let Some((index, digest)) = tx_digest_provider.next_digest() {
120        if cancel.is_cancelled() {
121            break;
122        }
123        info!(
124            "[{}/{}] Replaying transaction {:?}...",
125            index, total_count, digest
126        );
127        let sandbox_persist_path = persist_path.map(|path| path.join(format!("{}.json", digest,)));
128        if let Some(p) = sandbox_persist_path.as_ref() {
129            if p.exists() {
130                info!(
131                    "Skipping transaction {:?} as it has been replayed before",
132                    digest
133                );
134                continue;
135            }
136        }
137        let async_func = execute_transaction(
138            &mut executor,
139            &digest,
140            expensive_safety_check_config.clone(),
141            use_authority,
142        )
143        .fuse();
144        let result = tokio::select! {
145            result = async_func => result,
146            _ = cancel.cancelled() => {
147                break;
148            }
149        };
150        match result {
151            Err(err) => {
152                error!("Replaying transaction {:?} failed: {:?}", digest, err);
153                failed_transactions.push(err.clone());
154                if terminate_early {
155                    cancel.cancel();
156                    break;
157                }
158            }
159            Ok(sandbox_state) => {
160                info!("Replaying transaction {:?} succeeded", digest);
161                if let Some(p) = sandbox_persist_path {
162                    let out = serde_json::to_string(&sandbox_state).unwrap();
163                    std::fs::write(p, out).unwrap();
164                }
165            }
166        }
167    }
168    failed_transactions
169}
170
171async fn execute_transaction(
172    executor: &mut LocalExec,
173    digest: &TransactionDigest,
174    expensive_safety_check_config: ExpensiveSafetyCheckConfig,
175    use_authority: bool,
176) -> Result<ExecutionSandboxState, ReplayEngineError> {
177    *executor = loop {
178        match executor.clone().reset_for_new_execution_with_client().await {
179            Ok(executor) => break executor,
180            Err(err) => {
181                error!("Failed to reset executor: {:?}. Retrying in 3s", err);
182                tokio::time::sleep(std::time::Duration::from_secs(3)).await;
183            }
184        }
185    };
186    let sandbox_state = loop {
187        let result = executor
188            .execute_transaction(
189                digest,
190                expensive_safety_check_config.clone(),
191                use_authority,
192                None,
193                None,
194                None,
195                None,
196            )
197            .await;
198        match result {
199            Ok(sandbox_state) => break sandbox_state,
200            err @ Err(ReplayEngineError::TransactionNotSupported { .. }) => {
201                return err;
202            }
203            Err(err) => {
204                error!("Failed to execute transaction: {:?}. Retrying in 3s", err);
205                tokio::time::sleep(std::time::Duration::from_secs(3)).await;
206            }
207        }
208    };
209    sandbox_state.check_effects()?;
210    Ok(sandbox_state)
211}