1use std::{cmp::max, env, io::BufRead, path::PathBuf, str::FromStr};
6
7use async_recursion::async_recursion;
8use clap::Parser;
9use config::ReplayableNetworkConfigSet;
10use fuzz::{ReplayFuzzer, ReplayFuzzerConfig};
11use fuzz_mutations::base_fuzzers;
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_protocol_config::Chain;
14use iota_types::{
15 base_types::{ObjectID, SequenceNumber},
16 digests::{TransactionDigest, get_mainnet_chain_identifier, get_testnet_chain_identifier},
17 message_envelope::Message,
18};
19use move_vm_config::runtime::get_default_output_filepath;
20use tracing::{error, info, warn};
21use transaction_provider::{FuzzStartPoint, TransactionSource};
22
23use crate::{
24 config::get_rpc_url,
25 replay::{ExecutionSandboxState, LocalExec, ProtocolVersionSummary},
26};
27
28pub mod batch_replay;
29pub mod config;
30mod data_fetcher;
31mod displays;
32pub mod fuzz;
33pub mod fuzz_mutations;
34mod replay;
35#[cfg(test)]
36mod tests;
37pub mod transaction_provider;
38pub mod types;
39
40static DEFAULT_SANDBOX_BASE_PATH: &str =
41 concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sandbox_snapshots");
42
43#[derive(Parser, Clone)]
44pub enum ReplayToolCommand {
45 #[command(name = "gen")]
47 GenerateDefaultConfig,
48
49 #[command(name = "ps")]
51 PersistSandbox {
52 #[arg(long, short)]
53 tx_digest: String,
54 #[arg(long, short, default_value = DEFAULT_SANDBOX_BASE_PATH)]
55 base_path: PathBuf,
56 },
57
58 #[command(name = "rs")]
61 ReplaySandbox {
62 #[arg(long, short)]
63 path: PathBuf,
64 },
65
66 #[command(name = "rp")]
68 ProfileTransaction {
69 #[arg(long, short)]
70 tx_digest: String,
71 #[arg(long, short, allow_hyphen_values = true)]
74 executor_version: Option<i64>,
75 #[arg(long, short, allow_hyphen_values = true)]
78 protocol_version: Option<i64>,
79 #[arg(long, short, allow_hyphen_values = true)]
84 profile_output: Option<PathBuf>,
85 #[arg(long, num_args = 2..)]
89 config_objects: Option<Vec<String>>,
90 },
91
92 #[command(name = "tx")]
94 ReplayTransaction {
95 #[arg(long, short)]
96 tx_digest: String,
97 #[arg(long, short)]
98 show_effects: bool,
99 #[arg(long, short, allow_hyphen_values = true)]
102 executor_version: Option<i64>,
103 #[arg(long, short, allow_hyphen_values = true)]
106 protocol_version: Option<i64>,
107 #[arg(long, num_args = 2..)]
111 config_objects: Option<Vec<String>>,
112 },
113
114 #[command(name = "rb")]
116 ReplayBatch {
117 #[arg(long, short)]
118 path: PathBuf,
119 #[arg(long, short)]
120 terminate_early: bool,
121 #[arg(
122 long,
123 short,
124 default_value = "16",
125 help = "Number of tasks to run in parallel"
126 )]
127 num_tasks: u64,
128 #[arg(
129 long,
130 help = "If provided, dump the state of the execution to a file in the given directory. \
131 This will allow faster replay next time."
132 )]
133 persist_path: Option<PathBuf>,
134 },
135
136 #[command(name = "rd")]
138 ReplayDump {
139 #[arg(long, short)]
140 path: String,
141 #[arg(long, short)]
142 show_effects: bool,
143 },
144
145 #[command(name = "brd")]
148 BatchReplayFromSandbox {
149 #[arg(
150 help = "The path to the directory that contains many JSON files, each representing a persisted sandbox.\
151 These files are typically generated by running the ReplayBatch command with --persist-path specified."
152 )]
153 path: String,
154 #[arg(
155 long,
156 short,
157 default_value = "64",
158 help = "Number of tasks to run in parallel"
159 )]
160 num_tasks: usize,
161 },
162
163 #[command(name = "ch")]
165 ReplayCheckpoints {
166 #[arg(long, short)]
167 start: u64,
168 #[arg(long, short)]
169 end: u64,
170 #[arg(long, short)]
171 terminate_early: bool,
172 #[arg(long, short, default_value = "16")]
173 max_tasks: u64,
174 },
175
176 #[command(name = "ep")]
178 ReplayEpoch {
179 #[arg(long, short)]
180 epoch: u64,
181 #[arg(long, short)]
182 terminate_early: bool,
183 #[arg(long, short, default_value = "16")]
184 max_tasks: u64,
185 },
186
187 #[command(name = "fz")]
189 Fuzz {
190 #[arg(long, short)]
191 start: Option<FuzzStartPoint>,
192 #[arg(long, short)]
193 num_mutations_per_base: u64,
194 #[arg(long, short = 'b', default_value = "18446744073709551614")]
195 num_base_transactions: u64,
196 },
197
198 Report,
199}
200
201#[async_recursion]
202pub async fn execute_replay_command(
203 rpc_url: Option<String>,
204 safety_checks: bool,
205 use_authority: bool,
206 cfg_path: Option<PathBuf>,
207 chain: Option<String>,
208 cmd: ReplayToolCommand,
209) -> anyhow::Result<Option<(u64, u64)>> {
210 let safety = if safety_checks {
211 ExpensiveSafetyCheckConfig::new_enable_all()
212 } else {
213 ExpensiveSafetyCheckConfig::default()
214 };
215 Ok(match cmd {
216 ReplayToolCommand::ReplaySandbox { path } => {
217 let contents = std::fs::read_to_string(path)?;
218 let sandbox_state: ExecutionSandboxState = serde_json::from_str(&contents)?;
219 info!("Executing tx: {}", sandbox_state.transaction_info.tx_digest);
220 let sandbox_state =
221 LocalExec::certificate_execute_with_sandbox_state(&sandbox_state).await?;
222 sandbox_state.check_effects()?;
223 info!("Execution finished successfully. Local and on-chain effects match.");
224 None
225 }
226 ReplayToolCommand::PersistSandbox {
227 tx_digest,
228 base_path,
229 } => {
230 let tx_digest = TransactionDigest::from_str(&tx_digest)?;
231 info!("Executing tx: {}", tx_digest);
232 let sandbox_state = LocalExec::replay_with_network_config(
233 get_rpc_url(rpc_url, cfg_path, chain)?,
234 tx_digest,
235 safety,
236 use_authority,
237 None,
238 None,
239 None,
240 None,
241 )
242 .await?;
243
244 let out = serde_json::to_string(&sandbox_state).unwrap();
245 let path = base_path.join(format!("{}.json", tx_digest));
246 std::fs::write(path, out)?;
247 None
248 }
249 ReplayToolCommand::GenerateDefaultConfig => {
250 let set = ReplayableNetworkConfigSet::default();
251 let path = set.save_config(None).unwrap();
252 println!("Default config saved to: {}", path.to_str().unwrap());
253 warn!("Note: default config nodes might prune epochs/objects");
254 None
255 }
256 ReplayToolCommand::Fuzz {
257 start,
258 num_mutations_per_base,
259 num_base_transactions,
260 } => {
261 let config = ReplayFuzzerConfig {
262 num_mutations_per_base,
263 mutator: Box::new(base_fuzzers(num_mutations_per_base)),
264 tx_source: TransactionSource::TailLatest { start },
265 fail_over_on_err: false,
266 expensive_safety_check_config: Default::default(),
267 };
268 let fuzzer = ReplayFuzzer::new(get_rpc_url(rpc_url, cfg_path, chain)?, config)
269 .await
270 .unwrap();
271 fuzzer.run(num_base_transactions).await.unwrap();
272 None
273 }
274 ReplayToolCommand::ReplayDump { path, show_effects } => {
275 let mut lx = LocalExec::new_for_state_dump(&path, rpc_url).await?;
276 let (sandbox_state, node_dump_state) = lx.execute_state_dump(safety).await?;
277 if show_effects {
278 println!("{:#?}", sandbox_state.local_exec_effects);
279 }
280
281 sandbox_state.check_effects()?;
282
283 let effects = node_dump_state.computed_effects.digest();
284 if effects != node_dump_state.expected_effects_digest {
285 error!(
286 "Effects digest mismatch for {}: expected: {:?}, got: {:?}",
287 node_dump_state.tx_digest, node_dump_state.expected_effects_digest, effects,
288 );
289 anyhow::bail!("Effects mismatch");
290 }
291
292 info!("Execution finished successfully. Local and on-chain effects match.");
293 Some((1u64, 1u64))
294 }
295 ReplayToolCommand::ReplayBatch {
296 path,
297 terminate_early,
298 num_tasks,
299 persist_path,
300 } => {
301 let file = std::fs::File::open(path).unwrap();
302 let buf_reader = std::io::BufReader::new(file);
303 let digests = buf_reader.lines().map(|line| {
304 let line = line.unwrap();
305 TransactionDigest::from_str(&line).unwrap_or_else(|err| {
306 panic!("Error parsing tx digest {:?}: {:?}", line, err);
307 })
308 });
309 batch_replay::batch_replay(
310 digests,
311 num_tasks,
312 get_rpc_url(rpc_url, cfg_path, chain)?,
313 safety,
314 use_authority,
315 terminate_early,
316 persist_path,
317 )
318 .await;
319
320 Some((0u64, 0u64))
322 }
323 ReplayToolCommand::BatchReplayFromSandbox { path, num_tasks } => {
324 let files: Vec<_> = std::fs::read_dir(path)?
325 .filter_map(|entry| {
326 let path = entry.ok()?.path();
327 if path.is_file() {
328 path.to_str().map(|p| p.to_owned())
329 } else {
330 None
331 }
332 })
333 .collect();
334 info!("Replaying {} files", files.len());
335 let chunks = files.chunks(max(files.len() / num_tasks, 1));
336 let tasks = chunks.into_iter().map(|chunk| async move {
337 for file in chunk {
338 info!("Replaying from state dump file {}", file);
339 let contents = std::fs::read_to_string(file).unwrap();
340 let sandbox_state: ExecutionSandboxState =
341 serde_json::from_str(&contents).unwrap();
342 let sandbox_state =
343 LocalExec::certificate_execute_with_sandbox_state(&sandbox_state)
344 .await
345 .unwrap();
346 sandbox_state.check_effects().unwrap();
347 }
348 });
349 futures::future::join_all(tasks).await;
350
351 Some((0u64, 0u64))
353 }
354 ReplayToolCommand::ProfileTransaction {
355 tx_digest,
356 executor_version,
357 protocol_version,
358 profile_output,
359 config_objects,
360 } => {
361 let output_path = profile_output.or(Some(get_default_output_filepath()));
362
363 let tx_digest = TransactionDigest::from_str(&tx_digest)?;
364 info!("Executing tx: {}", tx_digest);
365 let _sandbox_state = LocalExec::replay_with_network_config(
366 get_rpc_url(rpc_url, cfg_path, chain)?,
367 tx_digest,
368 safety,
369 use_authority,
370 executor_version,
371 protocol_version,
372 output_path,
373 parse_configs_versions(config_objects),
374 )
375 .await?;
376
377 println!("Execution finished successfully.");
378 Some((1u64, 1u64))
379 }
380
381 ReplayToolCommand::ReplayTransaction {
382 tx_digest,
383 show_effects,
384 executor_version,
385 protocol_version,
386 config_objects,
387 } => {
388 let tx_digest = TransactionDigest::from_str(&tx_digest)?;
389 info!("Executing tx: {}", tx_digest);
390 let sandbox_state = LocalExec::replay_with_network_config(
391 get_rpc_url(rpc_url, cfg_path, chain)?,
392 tx_digest,
393 safety,
394 use_authority,
395 executor_version,
396 protocol_version,
397 None,
398 parse_configs_versions(config_objects),
399 )
400 .await?;
401
402 if show_effects {
403 println!("{}", sandbox_state.local_exec_effects);
404 }
405
406 sandbox_state.check_effects()?;
407
408 println!("Execution finished successfully. Local and on-chain effects match.");
409 Some((1u64, 1u64))
410 }
411
412 ReplayToolCommand::Report => {
413 let mut lx =
414 LocalExec::new_from_fn_url(&rpc_url.expect("Url must be provided")).await?;
415 let epoch_table = lx.protocol_ver_to_epoch_map().await?;
416
417 lx.current_protocol_version = *epoch_table.keys().peekable().last().unwrap();
419
420 println!(
421 " Protocol Version | Epoch Change TX | Epoch Range | Checkpoint Range "
422 );
423 println!(
424 "---------------------------------------------------------------------------------------------------------------"
425 );
426
427 for (
428 protocol_version,
429 ProtocolVersionSummary {
430 epoch_change_tx: tx_digest,
431 epoch_start: start_epoch,
432 epoch_end: end_epoch,
433 checkpoint_start,
434 checkpoint_end,
435 ..
436 },
437 ) in epoch_table
438 {
439 println!(
440 " {:^16} | {:^43} | {:^10}-{:^10}| {:^10}-{:^10} ",
441 protocol_version,
442 tx_digest,
443 start_epoch,
444 end_epoch,
445 checkpoint_start.unwrap_or(u64::MAX),
446 checkpoint_end.unwrap_or(u64::MAX)
447 );
448 }
449
450 lx.populate_protocol_version_tables().await?;
451 for x in lx.protocol_version_system_package_table {
452 println!("Protocol version: {}", x.0);
453 for (package_id, seq_num) in x.1 {
454 println!("Package: {} Seq: {}", package_id, seq_num);
455 }
456 }
457 None
458 }
459
460 ReplayToolCommand::ReplayCheckpoints {
461 start,
462 end,
463 terminate_early,
464 max_tasks,
465 } => {
466 assert!(start <= end, "Start checkpoint must be <= end checkpoint");
467 assert!(max_tasks > 0, "Max tasks must be > 0");
468 let checkpoints_per_task = ((end - start + max_tasks) / max_tasks) as usize;
469 let mut handles = vec![];
470 info!(
471 "Executing checkpoints {} to {} with at most {} tasks and at most {} checkpoints per task",
472 start, end, max_tasks, checkpoints_per_task
473 );
474
475 let range: Vec<_> = (start..=end).collect();
476 for (task_count, checkpoints) in range.chunks(checkpoints_per_task).enumerate() {
477 let checkpoints = checkpoints.to_vec();
478 let rpc_url = rpc_url.clone();
479 let safety = safety.clone();
480 handles.push(tokio::spawn(async move {
481 info!("Spawning task {task_count} for checkpoints {checkpoints:?}");
482 let time = std::time::Instant::now();
483 let (succeeded, total) = LocalExec::new_from_fn_url(&rpc_url.expect("Url must be provided"))
484 .await
485 .unwrap()
486 .init_for_execution()
487 .await
488 .unwrap()
489 .execute_all_in_checkpoints(&checkpoints, &safety, terminate_early, use_authority)
490 .await
491 .unwrap();
492 let time = time.elapsed();
493 info!(
494 "Task {task_count}: executed checkpoints {:?} @ {} total transactions, {} succeeded",
495 checkpoints, total, succeeded
496 );
497 (succeeded, total, time)
498 }));
499 }
500
501 let mut total_tx = 0;
502 let mut total_time_ms = 0;
503 let mut total_succeeded = 0;
504 futures::future::join_all(handles)
505 .await
506 .into_iter()
507 .for_each(|x| match x {
508 Ok((succeeded, total, time)) => {
509 total_tx += total;
510 total_time_ms += time.as_millis() as u64;
511 total_succeeded += succeeded;
512 }
513 Err(e) => {
514 error!("Task failed: {:?}", e);
515 }
516 });
517 info!(
518 "Executed {} checkpoints @ {}/{} total TXs succeeded in {} ms ({}) avg TX/s",
519 end - start + 1,
520 total_succeeded,
521 total_tx,
522 total_time_ms,
523 (total_tx as f64) / (total_time_ms as f64 / 1000.0)
524 );
525 Some((total_succeeded, total_tx))
526 }
527 ReplayToolCommand::ReplayEpoch {
528 epoch,
529 terminate_early,
530 max_tasks,
531 } => {
532 let lx =
533 LocalExec::new_from_fn_url(&rpc_url.clone().expect("Url must be provided")).await?;
534
535 let (start, end) = lx.checkpoints_for_epoch(epoch).await?;
536
537 info!(
538 "Executing epoch {} (checkpoint range {}-{}) with at most {} tasks",
539 epoch, start, end, max_tasks
540 );
541 let status = execute_replay_command(
542 rpc_url,
543 safety_checks,
544 use_authority,
545 cfg_path,
546 chain,
547 ReplayToolCommand::ReplayCheckpoints {
548 start,
549 end,
550 terminate_early,
551 max_tasks,
552 },
553 )
554 .await;
555 match status {
556 Ok(Some((succeeded, total))) => {
557 info!(
558 "Epoch {} replay finished {} out of {} TXs",
559 epoch, succeeded, total
560 );
561
562 return Ok(Some((succeeded, total)));
563 }
564 Ok(None) => {
565 return Ok(None);
566 }
567 Err(e) => {
568 error!("Epoch {} replay failed: {:?}", epoch, e);
569 return Err(e);
570 }
571 }
572 }
573 })
574}
575
576pub(crate) fn chain_from_chain_id(chain: &str) -> Chain {
577 let mainnet_chain_id = format!("{}", get_mainnet_chain_identifier());
578 let testnet_chain_id = format!("{}", get_testnet_chain_identifier());
581
582 if mainnet_chain_id == chain {
583 Chain::Mainnet
584 } else if testnet_chain_id == chain {
585 Chain::Testnet
586 } else {
587 Chain::Unknown
588 }
589}
590
591fn parse_configs_versions(
592 configs_and_versions: Option<Vec<String>>,
593) -> Option<Vec<(ObjectID, SequenceNumber)>> {
594 let configs_and_versions = configs_and_versions?;
595
596 assert!(
597 configs_and_versions.len() % 2 == 0,
598 "Invalid number of arguments for configs and version -- you must supply a version for each config"
599 );
600 Some(
601 configs_and_versions
602 .chunks_exact(2)
603 .map(|chunk| {
604 let object_id =
605 ObjectID::from_str(&chunk[0]).expect("Invalid object id for config");
606 let object_version = SequenceNumber::from_u64(
607 chunk[1]
608 .parse::<u64>()
609 .expect("Invalid object version for config"),
610 );
611 (object_id, object_version)
612 })
613 .collect(),
614 )
615}