iota_replay/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{cmp::max, env, io::BufRead, path::PathBuf, str::FromStr};
6
7use async_recursion::async_recursion;
8use clap::Parser;
9use config::ReplayableNetworkConfigSet;
10use fuzz::{ReplayFuzzer, ReplayFuzzerConfig};
11use fuzz_mutations::base_fuzzers;
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_protocol_config::Chain;
14use iota_types::{
15    base_types::{ObjectID, SequenceNumber},
16    digests::{TransactionDigest, get_mainnet_chain_identifier, get_testnet_chain_identifier},
17    message_envelope::Message,
18};
19use move_vm_config::runtime::get_default_output_filepath;
20use tracing::{error, info, warn};
21use transaction_provider::{FuzzStartPoint, TransactionSource};
22
23use crate::{
24    config::get_rpc_url,
25    replay::{ExecutionSandboxState, LocalExec, ProtocolVersionSummary},
26};
27
28pub mod batch_replay;
29pub mod config;
30mod data_fetcher;
31mod displays;
32pub mod fuzz;
33pub mod fuzz_mutations;
34mod replay;
35#[cfg(test)]
36mod tests;
37pub mod transaction_provider;
38pub mod types;
39
40static DEFAULT_SANDBOX_BASE_PATH: &str =
41    concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sandbox_snapshots");
42
43#[derive(Parser, Clone)]
44pub enum ReplayToolCommand {
45    /// Generate a new network config file
46    #[command(name = "gen")]
47    GenerateDefaultConfig,
48
49    /// Persist sandbox state
50    #[command(name = "ps")]
51    PersistSandbox {
52        #[arg(long, short)]
53        tx_digest: String,
54        #[arg(long, short, default_value = DEFAULT_SANDBOX_BASE_PATH)]
55        base_path: PathBuf,
56    },
57
58    /// Replay from sandbox state file
59    /// This is a completely local execution
60    #[command(name = "rs")]
61    ReplaySandbox {
62        #[arg(long, short)]
63        path: PathBuf,
64    },
65
66    /// Profile transaction
67    #[command(name = "rp")]
68    ProfileTransaction {
69        #[arg(long, short)]
70        tx_digest: String,
71        /// Optional version of the executor to use, if not specified defaults
72        /// to the one originally used for the transaction.
73        #[arg(long, short, allow_hyphen_values = true)]
74        executor_version: Option<i64>,
75        /// Optional protocol version to use, if not specified defaults to the
76        /// one originally used for the transaction.
77        #[arg(long, short, allow_hyphen_values = true)]
78        protocol_version: Option<i64>,
79        /// Optional output filepath for the profile generated by this run, if
80        /// not specified defaults to
81        /// `gas_profile_{tx_digest}_{unix_timestamp}.json in the working
82        /// directory.
83        #[arg(long, short, allow_hyphen_values = true)]
84        profile_output: Option<PathBuf>,
85        /// Required config objects and versions of the config objects to use if
86        /// replaying a transaction that utilizes the config object for
87        /// regulated coin types and that has been denied.
88        #[arg(long, num_args = 2..)]
89        config_objects: Option<Vec<String>>,
90    },
91
92    /// Replay transaction
93    #[command(name = "tx")]
94    ReplayTransaction {
95        #[arg(long, short)]
96        tx_digest: String,
97        #[arg(long, short)]
98        show_effects: bool,
99        /// Optional version of the executor to use, if not specified defaults
100        /// to the one originally used for the transaction.
101        #[arg(long, short, allow_hyphen_values = true)]
102        executor_version: Option<i64>,
103        /// Optional protocol version to use, if not specified defaults to the
104        /// one originally used for the transaction.
105        #[arg(long, short, allow_hyphen_values = true)]
106        protocol_version: Option<i64>,
107        /// Required config objects and versions of the config objects to use if
108        /// replaying a transaction that utilizes the config object for
109        /// regulated coin types and that has been denied.
110        #[arg(long, num_args = 2..)]
111        config_objects: Option<Vec<String>>,
112    },
113
114    /// Replay transactions listed in a file
115    #[command(name = "rb")]
116    ReplayBatch {
117        #[arg(long, short)]
118        path: PathBuf,
119        #[arg(long, short)]
120        terminate_early: bool,
121        #[arg(
122            long,
123            short,
124            default_value = "16",
125            help = "Number of tasks to run in parallel"
126        )]
127        num_tasks: u64,
128        #[arg(
129            long,
130            help = "If provided, dump the state of the execution to a file in the given directory. \
131            This will allow faster replay next time."
132        )]
133        persist_path: Option<PathBuf>,
134    },
135
136    /// Replay a transaction from a node state dump
137    #[command(name = "rd")]
138    ReplayDump {
139        #[arg(long, short)]
140        path: String,
141        #[arg(long, short)]
142        show_effects: bool,
143    },
144
145    /// Replay multiple transactions from JSON files that contain the sandbox
146    /// persisted state.
147    #[command(name = "brd")]
148    BatchReplayFromSandbox {
149        #[arg(
150            help = "The path to the directory that contains many JSON files, each representing a persisted sandbox.\
151            These files are typically generated by running the ReplayBatch command with --persist-path specified."
152        )]
153        path: String,
154        #[arg(
155            long,
156            short,
157            default_value = "64",
158            help = "Number of tasks to run in parallel"
159        )]
160        num_tasks: usize,
161    },
162
163    /// Replay all transactions in a range of checkpoints
164    #[command(name = "ch")]
165    ReplayCheckpoints {
166        #[arg(long, short)]
167        start: u64,
168        #[arg(long, short)]
169        end: u64,
170        #[arg(long, short)]
171        terminate_early: bool,
172        #[arg(long, short, default_value = "16")]
173        max_tasks: u64,
174    },
175
176    /// Replay all transactions in an epoch
177    #[command(name = "ep")]
178    ReplayEpoch {
179        #[arg(long, short)]
180        epoch: u64,
181        #[arg(long, short)]
182        terminate_early: bool,
183        #[arg(long, short, default_value = "16")]
184        max_tasks: u64,
185    },
186
187    /// Run the replay based fuzzer
188    #[command(name = "fz")]
189    Fuzz {
190        #[arg(long, short)]
191        start: Option<FuzzStartPoint>,
192        #[arg(long, short)]
193        num_mutations_per_base: u64,
194        #[arg(long, short = 'b', default_value = "18446744073709551614")]
195        num_base_transactions: u64,
196    },
197
198    Report,
199}
200
201#[async_recursion]
202pub async fn execute_replay_command(
203    rpc_url: Option<String>,
204    safety_checks: bool,
205    use_authority: bool,
206    cfg_path: Option<PathBuf>,
207    chain: Option<String>,
208    cmd: ReplayToolCommand,
209) -> anyhow::Result<Option<(u64, u64)>> {
210    let safety = if safety_checks {
211        ExpensiveSafetyCheckConfig::new_enable_all()
212    } else {
213        ExpensiveSafetyCheckConfig::default()
214    };
215    Ok(match cmd {
216        ReplayToolCommand::ReplaySandbox { path } => {
217            let contents = std::fs::read_to_string(path)?;
218            let sandbox_state: ExecutionSandboxState = serde_json::from_str(&contents)?;
219            info!("Executing tx: {}", sandbox_state.transaction_info.tx_digest);
220            let sandbox_state =
221                LocalExec::certificate_execute_with_sandbox_state(&sandbox_state).await?;
222            sandbox_state.check_effects()?;
223            info!("Execution finished successfully. Local and on-chain effects match.");
224            None
225        }
226        ReplayToolCommand::PersistSandbox {
227            tx_digest,
228            base_path,
229        } => {
230            let tx_digest = TransactionDigest::from_str(&tx_digest)?;
231            info!("Executing tx: {}", tx_digest);
232            let sandbox_state = LocalExec::replay_with_network_config(
233                get_rpc_url(rpc_url, cfg_path, chain)?,
234                tx_digest,
235                safety,
236                use_authority,
237                None,
238                None,
239                None,
240                None,
241            )
242            .await?;
243
244            let out = serde_json::to_string(&sandbox_state).unwrap();
245            let path = base_path.join(format!("{}.json", tx_digest));
246            std::fs::write(path, out)?;
247            None
248        }
249        ReplayToolCommand::GenerateDefaultConfig => {
250            let set = ReplayableNetworkConfigSet::default();
251            let path = set.save_config(None).unwrap();
252            println!("Default config saved to: {}", path.to_str().unwrap());
253            warn!("Note: default config nodes might prune epochs/objects");
254            None
255        }
256        ReplayToolCommand::Fuzz {
257            start,
258            num_mutations_per_base,
259            num_base_transactions,
260        } => {
261            let config = ReplayFuzzerConfig {
262                num_mutations_per_base,
263                mutator: Box::new(base_fuzzers(num_mutations_per_base)),
264                tx_source: TransactionSource::TailLatest { start },
265                fail_over_on_err: false,
266                expensive_safety_check_config: Default::default(),
267            };
268            let fuzzer = ReplayFuzzer::new(get_rpc_url(rpc_url, cfg_path, chain)?, config)
269                .await
270                .unwrap();
271            fuzzer.run(num_base_transactions).await.unwrap();
272            None
273        }
274        ReplayToolCommand::ReplayDump { path, show_effects } => {
275            let mut lx = LocalExec::new_for_state_dump(&path, rpc_url).await?;
276            let (sandbox_state, node_dump_state) = lx.execute_state_dump(safety).await?;
277            if show_effects {
278                println!("{:#?}", sandbox_state.local_exec_effects);
279            }
280
281            sandbox_state.check_effects()?;
282
283            let effects = node_dump_state.computed_effects.digest();
284            if effects != node_dump_state.expected_effects_digest {
285                error!(
286                    "Effects digest mismatch for {}: expected: {:?}, got: {:?}",
287                    node_dump_state.tx_digest, node_dump_state.expected_effects_digest, effects,
288                );
289                anyhow::bail!("Effects mismatch");
290            }
291
292            info!("Execution finished successfully. Local and on-chain effects match.");
293            Some((1u64, 1u64))
294        }
295        ReplayToolCommand::ReplayBatch {
296            path,
297            terminate_early,
298            num_tasks,
299            persist_path,
300        } => {
301            let file = std::fs::File::open(path).unwrap();
302            let buf_reader = std::io::BufReader::new(file);
303            let digests = buf_reader.lines().map(|line| {
304                let line = line.unwrap();
305                TransactionDigest::from_str(&line).unwrap_or_else(|err| {
306                    panic!("Error parsing tx digest {:?}: {:?}", line, err);
307                })
308            });
309            batch_replay::batch_replay(
310                digests,
311                num_tasks,
312                get_rpc_url(rpc_url, cfg_path, chain)?,
313                safety,
314                use_authority,
315                terminate_early,
316                persist_path,
317            )
318            .await;
319
320            // TODO: clean this up
321            Some((0u64, 0u64))
322        }
323        ReplayToolCommand::BatchReplayFromSandbox { path, num_tasks } => {
324            let files: Vec<_> = std::fs::read_dir(path)?
325                .filter_map(|entry| {
326                    let path = entry.ok()?.path();
327                    if path.is_file() {
328                        path.to_str().map(|p| p.to_owned())
329                    } else {
330                        None
331                    }
332                })
333                .collect();
334            info!("Replaying {} files", files.len());
335            let chunks = files.chunks(max(files.len() / num_tasks, 1));
336            let tasks = chunks.into_iter().map(|chunk| async move {
337                for file in chunk {
338                    info!("Replaying from state dump file {}", file);
339                    let contents = std::fs::read_to_string(file).unwrap();
340                    let sandbox_state: ExecutionSandboxState =
341                        serde_json::from_str(&contents).unwrap();
342                    let sandbox_state =
343                        LocalExec::certificate_execute_with_sandbox_state(&sandbox_state)
344                            .await
345                            .unwrap();
346                    sandbox_state.check_effects().unwrap();
347                }
348            });
349            futures::future::join_all(tasks).await;
350
351            // TODO: WTF is this
352            Some((0u64, 0u64))
353        }
354        ReplayToolCommand::ProfileTransaction {
355            tx_digest,
356            executor_version,
357            protocol_version,
358            profile_output,
359            config_objects,
360        } => {
361            let output_path = profile_output.or(Some(get_default_output_filepath()));
362
363            let tx_digest = TransactionDigest::from_str(&tx_digest)?;
364            info!("Executing tx: {}", tx_digest);
365            let _sandbox_state = LocalExec::replay_with_network_config(
366                get_rpc_url(rpc_url, cfg_path, chain)?,
367                tx_digest,
368                safety,
369                use_authority,
370                executor_version,
371                protocol_version,
372                output_path,
373                parse_configs_versions(config_objects),
374            )
375            .await?;
376
377            println!("Execution finished successfully.");
378            Some((1u64, 1u64))
379        }
380
381        ReplayToolCommand::ReplayTransaction {
382            tx_digest,
383            show_effects,
384            executor_version,
385            protocol_version,
386            config_objects,
387        } => {
388            let tx_digest = TransactionDigest::from_str(&tx_digest)?;
389            info!("Executing tx: {}", tx_digest);
390            let sandbox_state = LocalExec::replay_with_network_config(
391                get_rpc_url(rpc_url, cfg_path, chain)?,
392                tx_digest,
393                safety,
394                use_authority,
395                executor_version,
396                protocol_version,
397                None,
398                parse_configs_versions(config_objects),
399            )
400            .await?;
401
402            if show_effects {
403                println!("{}", sandbox_state.local_exec_effects);
404            }
405
406            sandbox_state.check_effects()?;
407
408            println!("Execution finished successfully. Local and on-chain effects match.");
409            Some((1u64, 1u64))
410        }
411
412        ReplayToolCommand::Report => {
413            let mut lx =
414                LocalExec::new_from_fn_url(&rpc_url.expect("Url must be provided")).await?;
415            let epoch_table = lx.protocol_ver_to_epoch_map().await?;
416
417            // We need this for other activities in this session
418            lx.current_protocol_version = *epoch_table.keys().peekable().last().unwrap();
419
420            println!(
421                "  Protocol Version  |                Epoch Change TX               |      Epoch Range     |   Checkpoint Range   "
422            );
423            println!(
424                "---------------------------------------------------------------------------------------------------------------"
425            );
426
427            for (
428                protocol_version,
429                ProtocolVersionSummary {
430                    epoch_change_tx: tx_digest,
431                    epoch_start: start_epoch,
432                    epoch_end: end_epoch,
433                    checkpoint_start,
434                    checkpoint_end,
435                    ..
436                },
437            ) in epoch_table
438            {
439                println!(
440                    " {:^16}   | {:^43} | {:^10}-{:^10}| {:^10}-{:^10} ",
441                    protocol_version,
442                    tx_digest,
443                    start_epoch,
444                    end_epoch,
445                    checkpoint_start.unwrap_or(u64::MAX),
446                    checkpoint_end.unwrap_or(u64::MAX)
447                );
448            }
449
450            lx.populate_protocol_version_tables().await?;
451            for x in lx.protocol_version_system_package_table {
452                println!("Protocol version: {}", x.0);
453                for (package_id, seq_num) in x.1 {
454                    println!("Package: {} Seq: {}", package_id, seq_num);
455                }
456            }
457            None
458        }
459
460        ReplayToolCommand::ReplayCheckpoints {
461            start,
462            end,
463            terminate_early,
464            max_tasks,
465        } => {
466            assert!(start <= end, "Start checkpoint must be <= end checkpoint");
467            assert!(max_tasks > 0, "Max tasks must be > 0");
468            let checkpoints_per_task = ((end - start + max_tasks) / max_tasks) as usize;
469            let mut handles = vec![];
470            info!(
471                "Executing checkpoints {} to {} with at most {} tasks and at most {} checkpoints per task",
472                start, end, max_tasks, checkpoints_per_task
473            );
474
475            let range: Vec<_> = (start..=end).collect();
476            for (task_count, checkpoints) in range.chunks(checkpoints_per_task).enumerate() {
477                let checkpoints = checkpoints.to_vec();
478                let rpc_url = rpc_url.clone();
479                let safety = safety.clone();
480                handles.push(tokio::spawn(async move {
481                    info!("Spawning task {task_count} for checkpoints {checkpoints:?}");
482                    let time = std::time::Instant::now();
483                    let (succeeded, total) = LocalExec::new_from_fn_url(&rpc_url.expect("Url must be provided"))
484                        .await
485                        .unwrap()
486                        .init_for_execution()
487                        .await
488                        .unwrap()
489                        .execute_all_in_checkpoints(&checkpoints, &safety, terminate_early, use_authority)
490                        .await
491                        .unwrap();
492                    let time = time.elapsed();
493                    info!(
494                        "Task {task_count}: executed checkpoints {:?} @ {} total transactions, {} succeeded",
495                        checkpoints, total, succeeded
496                    );
497                    (succeeded, total, time)
498                }));
499            }
500
501            let mut total_tx = 0;
502            let mut total_time_ms = 0;
503            let mut total_succeeded = 0;
504            futures::future::join_all(handles)
505                .await
506                .into_iter()
507                .for_each(|x| match x {
508                    Ok((succeeded, total, time)) => {
509                        total_tx += total;
510                        total_time_ms += time.as_millis() as u64;
511                        total_succeeded += succeeded;
512                    }
513                    Err(e) => {
514                        error!("Task failed: {:?}", e);
515                    }
516                });
517            info!(
518                "Executed {} checkpoints @ {}/{} total TXs succeeded in {} ms ({}) avg TX/s",
519                end - start + 1,
520                total_succeeded,
521                total_tx,
522                total_time_ms,
523                (total_tx as f64) / (total_time_ms as f64 / 1000.0)
524            );
525            Some((total_succeeded, total_tx))
526        }
527        ReplayToolCommand::ReplayEpoch {
528            epoch,
529            terminate_early,
530            max_tasks,
531        } => {
532            let lx =
533                LocalExec::new_from_fn_url(&rpc_url.clone().expect("Url must be provided")).await?;
534
535            let (start, end) = lx.checkpoints_for_epoch(epoch).await?;
536
537            info!(
538                "Executing epoch {} (checkpoint range {}-{}) with at most {} tasks",
539                epoch, start, end, max_tasks
540            );
541            let status = execute_replay_command(
542                rpc_url,
543                safety_checks,
544                use_authority,
545                cfg_path,
546                chain,
547                ReplayToolCommand::ReplayCheckpoints {
548                    start,
549                    end,
550                    terminate_early,
551                    max_tasks,
552                },
553            )
554            .await;
555            match status {
556                Ok(Some((succeeded, total))) => {
557                    info!(
558                        "Epoch {} replay finished {} out of {} TXs",
559                        epoch, succeeded, total
560                    );
561
562                    return Ok(Some((succeeded, total)));
563                }
564                Ok(None) => {
565                    return Ok(None);
566                }
567                Err(e) => {
568                    error!("Epoch {} replay failed: {:?}", epoch, e);
569                    return Err(e);
570                }
571            }
572        }
573    })
574}
575
576pub(crate) fn chain_from_chain_id(chain: &str) -> Chain {
577    let mainnet_chain_id = format!("{}", get_mainnet_chain_identifier());
578    // TODO: Since testnet periodically resets, we need to ensure that the chain id
579    // is updated to the latest one.
580    let testnet_chain_id = format!("{}", get_testnet_chain_identifier());
581
582    if mainnet_chain_id == chain {
583        Chain::Mainnet
584    } else if testnet_chain_id == chain {
585        Chain::Testnet
586    } else {
587        Chain::Unknown
588    }
589}
590
591fn parse_configs_versions(
592    configs_and_versions: Option<Vec<String>>,
593) -> Option<Vec<(ObjectID, SequenceNumber)>> {
594    let configs_and_versions = configs_and_versions?;
595
596    assert!(
597        configs_and_versions.len() % 2 == 0,
598        "Invalid number of arguments for configs and version -- you must supply a version for each config"
599    );
600    Some(
601        configs_and_versions
602            .chunks_exact(2)
603            .map(|chunk| {
604                let object_id =
605                    ObjectID::from_str(&chunk[0]).expect("Invalid object id for config");
606                let object_version = SequenceNumber::from_u64(
607                    chunk[1]
608                        .parse::<u64>()
609                        .expect("Invalid object version for config"),
610                );
611                (object_id, object_version)
612            })
613            .collect(),
614    )
615}