1use std::{
6 collections::{BTreeMap, HashSet},
7 path::PathBuf,
8 sync::{Arc, Mutex},
9};
10
11use futures::executor::block_on;
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_core::authority::NodeStateDump;
14use iota_execution::Executor;
15use iota_framework::BuiltInFramework;
16use iota_json_rpc_types::{
17 IotaExecutionStatus, IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI,
18};
19use iota_protocol_config::{Chain, ProtocolConfig};
20use iota_sdk::{IotaClient, IotaClientBuilder};
21use iota_types::{
22 IOTA_DENY_LIST_OBJECT_ID,
23 base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
24 committee::EpochId,
25 digests::{ObjectDigest, TransactionDigest},
26 error::{ExecutionError, IotaError, IotaResult},
27 executable_transaction::VerifiedExecutableTransaction,
28 gas::IotaGasStatus,
29 in_memory_storage::InMemoryStorage,
30 inner_temporary_store::InnerTemporaryStore,
31 message_envelope::Message,
32 metrics::LimitsMetrics,
33 object::{Data, Object, Owner},
34 storage::{
35 BackingPackageStore, ChildObjectResolver, ObjectStore, PackageObject, get_module,
36 get_module_by_id,
37 },
38 transaction::{
39 CheckedInputObjects, GasData, InputObjectKind, InputObjects, ObjectReadResult,
40 ObjectReadResultKind, SenderSignedData, Transaction, TransactionDataAPI,
41 TransactionKind::{self, ProgrammableTransaction},
42 VerifiedTransaction,
43 },
44};
45use move_binary_format::CompiledModule;
46use move_bytecode_utils::module_cache::GetModule;
47use move_core_types::{
48 account_address::AccountAddress,
49 language_storage::{ModuleId, StructTag},
50 resolver::{ModuleResolver, ResourceResolver},
51};
52use prometheus::Registry;
53use serde::{Deserialize, Serialize};
54use similar::{ChangeTag, TextDiff};
55use tracing::{error, info, trace, warn};
56
57use crate::{
58 chain_from_chain_id,
59 data_fetcher::{
60 DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
61 },
62 displays::{
63 Pretty,
64 transaction_displays::{FullPTB, transform_command_results_to_annotated},
65 },
66 types::*,
67};
68
69#[derive(Debug, Serialize, Deserialize)]
72pub struct ExecutionSandboxState {
73 pub transaction_info: OnChainTransactionInfo,
75 pub required_objects: Vec<Object>,
77 #[serde(skip)]
80 pub local_exec_temporary_store: Option<InnerTemporaryStore>,
81 pub local_exec_effects: IotaTransactionBlockEffects,
83 #[serde(skip)]
85 pub local_exec_status: Option<Result<(), ExecutionError>>,
86}
87
88impl ExecutionSandboxState {
89 pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
90 if self.transaction_info.effects != self.local_exec_effects {
91 error!("Replay tool forked {}", self.transaction_info.tx_digest);
92 let diff = self.diff_effects();
93 println!("{diff}");
94 return Err(ReplayEngineError::EffectsForked {
95 digest: self.transaction_info.tx_digest,
96 diff: format!("\n{diff}"),
97 on_chain: Box::new(self.transaction_info.effects.clone()),
98 local: Box::new(self.local_exec_effects.clone()),
99 });
100 }
101 Ok(())
102 }
103
104 pub fn diff_effects(&self) -> String {
106 let eff1 = &self.transaction_info.effects;
107 let eff2 = &self.local_exec_effects;
108 let on_chain_str = format!("{eff1:#?}");
109 let local_chain_str = format!("{eff2:#?}");
110 let mut res = vec![];
111
112 let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
113 for change in diff.iter_all_changes() {
114 let sign = match change.tag() {
115 ChangeTag::Delete => "---",
116 ChangeTag::Insert => "+++",
117 ChangeTag::Equal => " ",
118 };
119 res.push(format!("{sign}{change}"));
120 }
121
122 res.join("")
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub struct ProtocolVersionSummary {
128 pub protocol_version: u64,
130 pub epoch_start: u64,
132 pub epoch_end: u64,
134 pub checkpoint_start: Option<u64>,
136 pub checkpoint_end: Option<u64>,
138 pub epoch_change_tx: TransactionDigest,
140}
141
142#[derive(Clone)]
143pub struct Storage {
144 pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
149
150 pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
153 pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
156}
157
158impl std::fmt::Display for Storage {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 writeln!(f, "Live object store")?;
161 for (id, obj) in self
162 .live_objects_store
163 .lock()
164 .expect("Unable to lock")
165 .iter()
166 {
167 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
168 }
169 writeln!(f, "Package cache")?;
170 for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
171 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
172 }
173 writeln!(f, "Object version cache")?;
174 for (id, _) in self
175 .object_version_cache
176 .lock()
177 .expect("Unable to lock")
178 .iter()
179 {
180 writeln!(f, "{}: {}", id.0, id.1)?;
181 }
182
183 write!(f, "")
184 }
185}
186
187impl Storage {
188 pub fn default() -> Self {
189 Self {
190 live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
191 package_cache: Arc::new(Mutex::new(BTreeMap::new())),
192 object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
193 }
194 }
195
196 pub fn all_objects(&self) -> Vec<Object> {
197 self.live_objects_store
198 .lock()
199 .expect("Unable to lock")
200 .values()
201 .cloned()
202 .chain(
203 self.package_cache
204 .lock()
205 .expect("Unable to lock")
206 .values()
207 .cloned(),
208 )
209 .chain(
210 self.object_version_cache
211 .lock()
212 .expect("Unable to lock")
213 .values()
214 .cloned(),
215 )
216 .collect::<Vec<_>>()
217 }
218}
219
220#[derive(Clone)]
221pub struct LocalExec {
222 pub client: Option<IotaClient>,
223 pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
226 pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
228 pub current_protocol_version: u64,
230 pub storage: Storage,
232 pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
234 pub metrics: Arc<LimitsMetrics>,
236 pub fetcher: Fetchers,
238
239 pub executor_version: Option<i64>,
242 pub protocol_version: Option<i64>,
246 pub enable_profiler: Option<PathBuf>,
249 pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
250 pub num_retries_for_timeout: u32,
252 pub sleep_period_for_timeout: std::time::Duration,
253}
254
255impl LocalExec {
256 pub async fn multi_download(
259 &self,
260 objs: &[(ObjectID, SequenceNumber)],
261 ) -> Result<Vec<Object>, ReplayEngineError> {
262 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
263 while num_retries_for_timeout >= 0 {
264 match self.fetcher.multi_get_versioned(objs).await {
265 Ok(objs) => return Ok(objs),
266 Err(ReplayEngineError::IotaRpcRequestTimeout) => {
267 warn!(
268 "RPC request timed out. Retries left {}. Sleeping for {}s",
269 num_retries_for_timeout,
270 self.sleep_period_for_timeout.as_secs()
271 );
272 num_retries_for_timeout -= 1;
273 tokio::time::sleep(self.sleep_period_for_timeout).await;
274 }
275 Err(e) => return Err(e),
276 }
277 }
278 Err(ReplayEngineError::IotaRpcRequestTimeout)
279 }
280 pub async fn multi_download_latest(
283 &self,
284 objs: &[ObjectID],
285 ) -> Result<Vec<Object>, ReplayEngineError> {
286 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
287 while num_retries_for_timeout >= 0 {
288 match self.fetcher.multi_get_latest(objs).await {
289 Ok(objs) => return Ok(objs),
290 Err(ReplayEngineError::IotaRpcRequestTimeout) => {
291 warn!(
292 "RPC request timed out. Retries left {}. Sleeping for {}s",
293 num_retries_for_timeout,
294 self.sleep_period_for_timeout.as_secs()
295 );
296 num_retries_for_timeout -= 1;
297 tokio::time::sleep(self.sleep_period_for_timeout).await;
298 }
299 Err(e) => return Err(e),
300 }
301 }
302 Err(ReplayEngineError::IotaRpcRequestTimeout)
303 }
304
305 pub async fn fetch_loaded_child_refs(
306 &self,
307 tx_digest: &TransactionDigest,
308 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
309 self.fetcher.get_loaded_child_objects(tx_digest).await
311 }
312
313 pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
314 Self::new_for_remote(
315 IotaClientBuilder::default()
316 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
317 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
318 .build(http_url)
319 .await?,
320 None,
321 )
322 .await
323 }
324
325 pub async fn replay_with_network_config(
326 rpc_url: String,
327 tx_digest: TransactionDigest,
328 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
329 use_authority: bool,
330 executor_version: Option<i64>,
331 protocol_version: Option<i64>,
332 enable_profiler: Option<PathBuf>,
333 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
334 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
335 info!("Using RPC URL: {}", rpc_url);
336 LocalExec::new_from_fn_url(&rpc_url)
337 .await?
338 .init_for_execution()
339 .await?
340 .execute_transaction(
341 &tx_digest,
342 expensive_safety_check_config,
343 use_authority,
344 executor_version,
345 protocol_version,
346 enable_profiler,
347 config_and_versions,
348 )
349 .await
350 }
351
352 pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
357 self.populate_protocol_version_tables().await?;
358 tokio::task::yield_now().await;
359 Ok(self)
360 }
361
362 pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
363 Self::new_for_remote(
364 self.client.expect("Remote client not initialized"),
365 Some(self.fetcher.into_remote()),
366 )
367 .await?
368 .init_for_execution()
369 .await
370 }
371
372 pub async fn new_for_remote(
373 client: IotaClient,
374 remote_fetcher: Option<RemoteFetcher>,
375 ) -> Result<Self, ReplayEngineError> {
376 let registry = prometheus::Registry::new();
378 let metrics = Arc::new(LimitsMetrics::new(®istry));
379
380 let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
381
382 Ok(Self {
383 client: Some(client),
384 protocol_version_epoch_table: BTreeMap::new(),
385 protocol_version_system_package_table: BTreeMap::new(),
386 current_protocol_version: 0,
387 exec_store_events: Arc::new(Mutex::new(Vec::new())),
388 metrics,
389 storage: Storage::default(),
390 fetcher: Fetchers::Remote(fetcher),
391 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
393 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
394 executor_version: None,
395 protocol_version: None,
396 enable_profiler: None,
397 config_and_versions: None,
398 })
399 }
400
401 pub async fn new_for_state_dump(
402 path: &str,
403 backup_rpc_url: Option<String>,
404 ) -> Result<Self, ReplayEngineError> {
405 let registry = prometheus::Registry::new();
407 let metrics = Arc::new(LimitsMetrics::new(®istry));
408
409 let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
410 let current_protocol_version = state.protocol_version;
411 let fetcher = match backup_rpc_url {
412 Some(url) => NodeStateDumpFetcher::new(
413 state,
414 Some(RemoteFetcher::new(
415 IotaClientBuilder::default()
416 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
417 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
418 .build(url)
419 .await?,
420 )),
421 ),
422 None => NodeStateDumpFetcher::new(state, None),
423 };
424
425 Ok(Self {
426 client: None,
427 protocol_version_epoch_table: BTreeMap::new(),
428 protocol_version_system_package_table: BTreeMap::new(),
429 current_protocol_version,
430 exec_store_events: Arc::new(Mutex::new(Vec::new())),
431 metrics,
432 storage: Storage::default(),
433 fetcher: Fetchers::NodeStateDump(fetcher),
434 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
436 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
437 executor_version: None,
438 protocol_version: None,
439 enable_profiler: None,
440 config_and_versions: None,
441 })
442 }
443
444 pub async fn multi_download_and_store(
445 &mut self,
446 objs: &[(ObjectID, SequenceNumber)],
447 ) -> Result<Vec<Object>, ReplayEngineError> {
448 let objs = self.multi_download(objs).await?;
449
450 for obj in objs.iter() {
452 let o_ref = obj.compute_object_reference();
453 self.storage
454 .live_objects_store
455 .lock()
456 .expect("Can't lock")
457 .insert(o_ref.0, obj.clone());
458 self.storage
459 .object_version_cache
460 .lock()
461 .expect("Cannot lock")
462 .insert((o_ref.0, o_ref.1), obj.clone());
463 if obj.is_package() {
464 self.storage
465 .package_cache
466 .lock()
467 .expect("Cannot lock")
468 .insert(o_ref.0, obj.clone());
469 }
470 }
471 tokio::task::yield_now().await;
472 Ok(objs)
473 }
474
475 pub async fn multi_download_relevant_packages_and_store(
476 &mut self,
477 objs: Vec<ObjectID>,
478 protocol_version: u64,
479 ) -> Result<Vec<Object>, ReplayEngineError> {
480 let syst_packages_objs = if self.protocol_version.is_some_and(|i| i < 0) {
481 BuiltInFramework::genesis_objects().collect()
482 } else {
483 let syst_packages =
484 self.system_package_versions_for_protocol_version(protocol_version)?;
485 self.multi_download(&syst_packages).await?
486 };
487
488 let non_system_package_objs: Vec<_> = objs
491 .into_iter()
492 .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
493 .collect();
494 let objs = self
495 .multi_download_latest(&non_system_package_objs)
496 .await?
497 .into_iter()
498 .chain(syst_packages_objs.into_iter());
499
500 for obj in objs.clone() {
501 let o_ref = obj.compute_object_reference();
502 self.storage
505 .object_version_cache
506 .lock()
507 .expect("Cannot lock")
508 .insert((o_ref.0, o_ref.1), obj.clone());
509 if obj.is_package() {
510 self.storage
511 .package_cache
512 .lock()
513 .expect("Cannot lock")
514 .insert(o_ref.0, obj.clone());
515 }
516 }
517 Ok(objs.collect())
518 }
519
520 #[expect(clippy::disallowed_methods)]
522 pub fn download_object(
523 &self,
524 object_id: &ObjectID,
525 version: SequenceNumber,
526 ) -> Result<Object, ReplayEngineError> {
527 if self
528 .storage
529 .object_version_cache
530 .lock()
531 .expect("Cannot lock")
532 .contains_key(&(*object_id, version))
533 {
534 return Ok(self
535 .storage
536 .object_version_cache
537 .lock()
538 .expect("Cannot lock")
539 .get(&(*object_id, version))
540 .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
541 id: *object_id,
542 version: Some(version),
543 })?
544 .clone());
545 }
546
547 let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
548 q.pop().unwrap_or_else(|| {
549 panic!(
550 "Downloaded obj response cannot be empty {:?}",
551 (*object_id, version)
552 )
553 })
554 })?;
555
556 let o_ref = o.compute_object_reference();
557 self.storage
558 .object_version_cache
559 .lock()
560 .expect("Cannot lock")
561 .insert((o_ref.0, o_ref.1), o.clone());
562 Ok(o)
563 }
564
565 #[expect(clippy::disallowed_methods)]
567 pub fn download_latest_object(
568 &self,
569 object_id: &ObjectID,
570 ) -> Result<Option<Object>, ReplayEngineError> {
571 let resp = block_on({
572 self.multi_download_latest(&[*object_id])
574 })
575 .map(|mut q| {
576 q.pop()
577 .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
578 });
579
580 match resp {
581 Ok(v) => Ok(Some(v)),
582 Err(ReplayEngineError::ObjectNotExist { id }) => {
583 error!(
584 "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
585 );
586 Ok(None)
587 }
588 Err(ReplayEngineError::ObjectDeleted {
589 id,
590 version,
591 digest,
592 }) => {
593 error!("Object {id} {version} {digest} was deleted on RPC server.");
594 Ok(None)
595 }
596 Err(err) => Err(ReplayEngineError::IotaRpcError {
597 err: err.to_string(),
598 }),
599 }
600 }
601
602 #[expect(clippy::disallowed_methods)]
603 pub fn download_object_by_upper_bound(
604 &self,
605 object_id: &ObjectID,
606 version_upper_bound: VersionNumber,
607 ) -> Result<Option<Object>, ReplayEngineError> {
608 let local_object = self
609 .storage
610 .live_objects_store
611 .lock()
612 .expect("Can't lock")
613 .get(object_id)
614 .cloned();
615 if local_object.is_some() {
616 return Ok(local_object);
617 }
618 let response = block_on({
619 self.fetcher
620 .get_child_object(object_id, version_upper_bound)
621 });
622 match response {
623 Ok(object) => {
624 let obj_ref = object.compute_object_reference();
625 self.storage
626 .live_objects_store
627 .lock()
628 .expect("Can't lock")
629 .insert(*object_id, object.clone());
630 self.storage
631 .object_version_cache
632 .lock()
633 .expect("Can't lock")
634 .insert((obj_ref.0, obj_ref.1), object.clone());
635 Ok(Some(object))
636 }
637 Err(ReplayEngineError::ObjectNotExist { id }) => {
638 error!(
639 "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
640 );
641 Ok(None)
642 }
643 Err(ReplayEngineError::ObjectDeleted {
644 id,
645 version,
646 digest,
647 }) => {
648 error!("Object {id} {version} {digest} was deleted on RPC server.");
649 Ok(None)
650 }
651 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
654 info!(
655 "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
656 );
657 Ok(None)
658 }
659 Err(err) => Err(ReplayEngineError::IotaRpcError {
660 err: err.to_string(),
661 }),
662 }
663 }
664
665 pub async fn get_checkpoint_txs(
666 &self,
667 checkpoint_id: u64,
668 ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
669 self.fetcher
670 .get_checkpoint_txs(checkpoint_id)
671 .await
672 .map_err(|e| ReplayEngineError::IotaRpcError { err: e.to_string() })
673 }
674
675 pub async fn execute_all_in_checkpoints(
676 &mut self,
677 checkpoint_ids: &[u64],
678 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
679 terminate_early: bool,
680 use_authority: bool,
681 ) -> Result<(u64, u64), ReplayEngineError> {
682 let mut txs = Vec::new();
684 for checkpoint_id in checkpoint_ids {
685 txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
686 }
687 let num = txs.len();
688 let mut succeeded = 0;
689 for tx in txs {
690 match self
691 .execute_transaction(
692 &tx,
693 expensive_safety_check_config.clone(),
694 use_authority,
695 None,
696 None,
697 None,
698 None,
699 )
700 .await
701 .map(|q| q.check_effects())
702 {
703 Err(e) | Ok(Err(e)) => {
704 if terminate_early {
705 return Err(e);
706 }
707 error!("Error executing tx: {}, {:#?}", tx, e);
708 continue;
709 }
710 _ => (),
711 }
712
713 succeeded += 1;
714 }
715 Ok((succeeded, num as u64))
716 }
717
718 pub async fn execution_engine_execute_with_tx_info_impl(
719 &mut self,
720 tx_info: &OnChainTransactionInfo,
721 override_transaction_kind: Option<TransactionKind>,
722 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
723 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
724 let tx_digest = &tx_info.tx_digest;
725
726 let input_objects = self.initialize_execution_env_state(tx_info).await?;
729 assert_eq!(
730 &input_objects.filter_shared_objects().len(),
731 &tx_info.shared_object_refs.len()
732 );
733 let protocol_config =
738 &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
739
740 let metrics = self.metrics.clone();
741
742 let ov = self.executor_version;
743
744 let executor = get_executor(
746 ov,
747 protocol_config,
748 expensive_safety_check_config,
749 self.enable_profiler.clone(),
750 );
751
752 let expensive_checks = true;
754 let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
755 let certificate_deny_set = HashSet::new();
756 let gas_status = if tx_info.kind.is_system_tx() {
757 IotaGasStatus::new_unmetered()
758 } else {
759 IotaGasStatus::new(
760 tx_info.gas_budget,
761 tx_info.gas_price,
762 tx_info.reference_gas_price,
763 protocol_config,
764 )
765 .expect("Failed to create gas status")
766 };
767 let gas_data = GasData {
768 payment: tx_info.gas.clone(),
769 owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
770 price: tx_info.gas_price,
771 budget: tx_info.gas_budget,
772 };
773 let (inner_store, gas_status, effects, result) = executor.execute_transaction_to_effects(
774 &self,
775 protocol_config,
776 metrics.clone(),
777 expensive_checks,
778 &certificate_deny_set,
779 &tx_info.executed_epoch,
780 tx_info.epoch_start_timestamp,
781 CheckedInputObjects::new_for_replay(input_objects.clone()),
782 gas_data,
783 gas_status,
784 transaction_kind.clone(),
785 tx_info.sender,
786 *tx_digest,
787 &mut None,
788 );
789
790 if let Err(err) = self.pretty_print_for_tracing(
791 &gas_status,
792 &executor,
793 tx_info,
794 &transaction_kind,
795 protocol_config,
796 metrics,
797 expensive_checks,
798 input_objects,
799 ) {
800 error!("Failed to pretty print for tracing: {:?}", err);
801 }
802
803 let all_required_objects = self.storage.all_objects();
804
805 let effects =
806 IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
807
808 Ok(ExecutionSandboxState {
809 transaction_info: tx_info.clone(),
810 required_objects: all_required_objects,
811 local_exec_temporary_store: Some(inner_store),
812 local_exec_effects: effects,
813 local_exec_status: Some(result),
814 })
815 }
816
817 fn pretty_print_for_tracing(
818 &self,
819 gas_status: &IotaGasStatus,
820 executor: &Arc<dyn Executor + Send + Sync>,
821 tx_info: &OnChainTransactionInfo,
822 transaction_kind: &TransactionKind,
823 protocol_config: &ProtocolConfig,
824 metrics: Arc<LimitsMetrics>,
825 expensive_checks: bool,
826 input_objects: InputObjects,
827 ) -> anyhow::Result<()> {
828 trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
829
830 let skip_checks = true;
831 let gas_data = GasData {
832 payment: tx_info.gas.clone(),
833 owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
834 price: tx_info.gas_price,
835 budget: tx_info.gas_budget,
836 };
837 if let ProgrammableTransaction(pt) = transaction_kind {
838 trace!(
839 target: "replay_ptb_info",
840 "{}",
841 Pretty(&FullPTB {
842 ptb: pt.clone(),
843 results: transform_command_results_to_annotated(
844 executor,
845 &self.clone(),
846 executor.dev_inspect_transaction(
847 &self,
848 protocol_config,
849 metrics,
850 expensive_checks,
851 &HashSet::new(),
852 &tx_info.executed_epoch,
853 tx_info.epoch_start_timestamp,
854 CheckedInputObjects::new_for_replay(input_objects),
855 gas_data,
856 IotaGasStatus::new(
857 tx_info.gas_budget,
858 tx_info.gas_price,
859 tx_info.reference_gas_price,
860 protocol_config,
861 )?,
862 transaction_kind.clone(),
863 tx_info.sender,
864 tx_info.sender_signed_data.digest(),
865 skip_checks,
866 )
867 .3
868 .unwrap_or_default(),
869 )?,
870 }));
871 }
872 Ok(())
873 }
874
875 pub async fn execution_engine_execute_impl(
877 &mut self,
878 tx_digest: &TransactionDigest,
879 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
880 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
881 if self.is_remote_replay() {
882 assert!(
883 !self.protocol_version_system_package_table.is_empty()
884 || !self.protocol_version_epoch_table.is_empty(),
885 "Required tables not populated. Must call `init_for_execution` before executing transactions"
886 );
887 }
888
889 let tx_info = if self.is_remote_replay() {
890 self.resolve_tx_components(tx_digest).await?
891 } else {
892 self.resolve_tx_components_from_dump(tx_digest).await?
893 };
894 self.execution_engine_execute_with_tx_info_impl(
895 &tx_info,
896 None,
897 expensive_safety_check_config,
898 )
899 .await
900 }
901
902 pub async fn certificate_execute_with_sandbox_state(
906 pre_run_sandbox: &ExecutionSandboxState,
907 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
908 let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
910 let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
911 let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
912 let protocol_config = ProtocolConfig::get_for_version(
913 pre_run_sandbox.transaction_info.protocol_version,
914 pre_run_sandbox.transaction_info.chain,
915 );
916 let required_objects = pre_run_sandbox.required_objects.clone();
917 let store = InMemoryStorage::new(required_objects.clone());
918
919 let transaction =
920 Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
921
922 let input_objects = store.read_input_objects_for_transaction(&transaction);
928 let executable = VerifiedExecutableTransaction::new_from_quorum_execution(
929 VerifiedTransaction::new_unchecked(transaction),
930 executed_epoch,
931 );
932 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
933 &executable,
934 input_objects,
935 &protocol_config,
936 reference_gas_price,
937 )
938 .unwrap();
939 let (kind, signer, gas_data) = executable.transaction_data().execution_parts();
940 let executor = iota_execution::executor(&protocol_config, true, None).unwrap();
941 let (_, _, effects, exec_res) = executor.execute_transaction_to_effects(
942 &store,
943 &protocol_config,
944 Arc::new(LimitsMetrics::new(&Registry::new())),
945 true,
946 &HashSet::new(),
947 &executed_epoch,
948 epoch_start_timestamp,
949 input_objects,
950 gas_data,
951 gas_status,
952 kind,
953 signer,
954 *executable.digest(),
955 &mut None,
956 );
957
958 let effects =
959 IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
960
961 Ok(ExecutionSandboxState {
962 transaction_info: pre_run_sandbox.transaction_info.clone(),
963 required_objects,
964 local_exec_temporary_store: None, local_exec_effects: effects,
966 local_exec_status: Some(exec_res),
967 })
968 }
969
970 pub async fn certificate_execute(
974 &mut self,
975 tx_digest: &TransactionDigest,
976 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
977 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
978 let pre_run_sandbox = self
980 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
981 .await?;
982 Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
983 }
984
985 pub async fn execution_engine_execute(
989 &mut self,
990 tx_digest: &TransactionDigest,
991 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
992 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
993 let sandbox_state = self
994 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
995 .await?;
996
997 Ok(sandbox_state)
998 }
999
1000 pub async fn execute_state_dump(
1001 &mut self,
1002 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1003 ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
1004 assert!(!self.is_remote_replay());
1005
1006 let d = match self.fetcher.clone() {
1007 Fetchers::NodeStateDump(d) => d,
1008 _ => panic!("Invalid fetcher for state dump"),
1009 };
1010 let tx_digest = d.node_state_dump.clone().tx_digest;
1011 let sandbox_state = self
1012 .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1013 .await?;
1014
1015 Ok((sandbox_state, d.node_state_dump))
1016 }
1017
1018 pub async fn execute_transaction(
1019 &mut self,
1020 tx_digest: &TransactionDigest,
1021 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1022 use_authority: bool,
1023 executor_version: Option<i64>,
1024 protocol_version: Option<i64>,
1025 enable_profiler: Option<PathBuf>,
1026 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1027 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1028 self.executor_version = executor_version;
1029 self.protocol_version = protocol_version;
1030 self.enable_profiler = enable_profiler;
1031 self.config_and_versions = config_and_versions;
1032 if use_authority {
1033 self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1034 .await
1035 } else {
1036 self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1037 .await
1038 }
1039 }
1040 fn system_package_ids(_protocol_version: u64) -> Vec<ObjectID> {
1041 BuiltInFramework::all_package_ids()
1042 }
1043
1044 pub fn get_or_download_object(
1046 &self,
1047 obj_id: &ObjectID,
1048 package_expected: bool,
1049 ) -> Result<Option<Object>, ReplayEngineError> {
1050 if package_expected {
1051 if let Some(obj) = self
1052 .storage
1053 .package_cache
1054 .lock()
1055 .expect("Cannot lock")
1056 .get(obj_id)
1057 {
1058 return Ok(Some(obj.clone()));
1059 };
1060 } else if let Some(obj) = self
1067 .storage
1068 .live_objects_store
1069 .lock()
1070 .expect("Can't lock")
1071 .get(obj_id)
1072 {
1073 return Ok(Some(obj.clone()));
1074 }
1075
1076 let Some(o) = self.download_latest_object(obj_id)? else {
1077 return Ok(None);
1078 };
1079
1080 if o.is_package() {
1081 assert!(
1082 package_expected,
1083 "Did not expect package but downloaded object is a package: {obj_id}"
1084 );
1085
1086 self.storage
1087 .package_cache
1088 .lock()
1089 .expect("Cannot lock")
1090 .insert(*obj_id, o.clone());
1091 }
1092 let o_ref = o.compute_object_reference();
1093 self.storage
1094 .object_version_cache
1095 .lock()
1096 .expect("Cannot lock")
1097 .insert((o_ref.0, o_ref.1), o.clone());
1098 Ok(Some(o))
1099 }
1100
1101 pub fn is_remote_replay(&self) -> bool {
1102 matches!(self.fetcher, Fetchers::Remote(_))
1103 }
1104
1105 pub fn system_package_versions_for_protocol_version(
1107 &self,
1108 protocol_version: u64,
1109 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1110 match &self.fetcher {
1111 Fetchers::Remote(_) => Ok(self
1112 .protocol_version_system_package_table
1113 .get(&protocol_version)
1114 .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1115 protocol_version,
1116 })?
1117 .clone()
1118 .into_iter()
1119 .collect()),
1120
1121 Fetchers::NodeStateDump(d) => Ok(d
1122 .node_state_dump
1123 .relevant_system_packages
1124 .iter()
1125 .map(|w| (w.id, w.version, w.digest))
1126 .map(|q| (q.0, q.1))
1127 .collect()),
1128 }
1129 }
1130
1131 pub async fn protocol_ver_to_epoch_map(
1132 &self,
1133 ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1134 let mut range_map = BTreeMap::new();
1135 let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1136
1137 let mut tx_digest = *self
1139 .fetcher
1140 .get_checkpoint_txs(0)
1141 .await?
1142 .first()
1143 .expect("Genesis TX must be in first checkpoint");
1144 let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1147 (0, 1, Some(0u64));
1148
1149 let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1150 (start_epoch, start_protocol_version, start_checkpoint);
1151
1152 (start_epoch, start_protocol_version, start_checkpoint) =
1153 (curr_epoch, curr_protocol_version, curr_checkpoint);
1154
1155 let mut end_epoch_tx_digest = tx_digest;
1158
1159 for event in epoch_change_events {
1160 (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1161 end_epoch_tx_digest = event.id.tx_digest;
1162
1163 if start_protocol_version == curr_protocol_version {
1164 continue;
1166 }
1167
1168 curr_checkpoint = self
1171 .fetcher
1172 .get_transaction(&event.id.tx_digest)
1173 .await?
1174 .checkpoint;
1175 range_map.insert(
1177 start_protocol_version,
1178 ProtocolVersionSummary {
1179 protocol_version: start_protocol_version,
1180 epoch_start: start_epoch,
1181 epoch_end: curr_epoch - 1,
1182 checkpoint_start: start_checkpoint,
1183 checkpoint_end: curr_checkpoint.map(|x| x - 1),
1184 epoch_change_tx: tx_digest,
1185 },
1186 );
1187
1188 start_epoch = curr_epoch;
1189 start_protocol_version = curr_protocol_version;
1190 tx_digest = event.id.tx_digest;
1191 start_checkpoint = curr_checkpoint;
1192 }
1193
1194 range_map.insert(
1196 curr_protocol_version,
1197 ProtocolVersionSummary {
1198 protocol_version: curr_protocol_version,
1199 epoch_start: start_epoch,
1200 epoch_end: curr_epoch,
1201 checkpoint_start: curr_checkpoint,
1202 checkpoint_end: self
1203 .fetcher
1204 .get_transaction(&end_epoch_tx_digest)
1205 .await?
1206 .checkpoint,
1207 epoch_change_tx: tx_digest,
1208 },
1209 );
1210
1211 Ok(range_map)
1212 }
1213
1214 pub fn protocol_version_for_epoch(
1215 epoch: u64,
1216 mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1217 ) -> u64 {
1218 let mut version = 1;
1221 for (k, v) in mp.iter().rev() {
1222 if v.1 <= epoch {
1223 version = *k;
1224 break;
1225 }
1226 }
1227 version
1228 }
1229
1230 pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1231 self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1232
1233 let system_package_revisions = self.system_package_versions().await?;
1234
1235 for (
1238 prot_ver,
1239 ProtocolVersionSummary {
1240 epoch_change_tx: tx_digest,
1241 ..
1242 },
1243 ) in self.protocol_version_epoch_table.clone()
1244 {
1245 let mut working = if prot_ver <= 1 {
1247 BTreeMap::new()
1248 } else {
1249 self.protocol_version_system_package_table
1250 .iter()
1251 .rev()
1252 .find(|(ver, _)| **ver <= prot_ver)
1253 .expect("Prev entry must exist")
1254 .1
1255 .clone()
1256 };
1257
1258 for (id, versions) in system_package_revisions.iter() {
1259 for ver in versions.iter().rev() {
1261 if ver.1 == tx_digest {
1262 working.insert(*id, ver.0);
1264 break;
1265 }
1266 }
1267 }
1268 self.protocol_version_system_package_table
1269 .insert(prot_ver, working);
1270 }
1271 Ok(())
1272 }
1273
1274 pub async fn system_package_versions(
1275 &self,
1276 ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1277 {
1278 let system_package_ids = Self::system_package_ids(
1279 *self
1280 .protocol_version_epoch_table
1281 .keys()
1282 .peekable()
1283 .last()
1284 .expect("Protocol version epoch table not populated"),
1285 );
1286 let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1287
1288 let mut mapping = BTreeMap::new();
1289
1290 while !system_package_objs.is_empty() {
1292 let previous_txs: Vec<_> = system_package_objs
1295 .iter()
1296 .map(|o| (o.compute_object_reference(), o.previous_transaction))
1297 .collect();
1298
1299 previous_txs.iter().for_each(|((id, ver, _), tx)| {
1300 mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1301 });
1302
1303 let previous_ver_refs: Vec<_> = previous_txs
1306 .iter()
1307 .filter_map(|(q, _)| {
1308 let prev_ver = u64::from(q.1) - 1;
1309 if prev_ver == 0 {
1310 None
1311 } else {
1312 Some((q.0, SequenceNumber::from(prev_ver)))
1313 }
1314 })
1315 .collect();
1316 system_package_objs = match self.multi_download(&previous_ver_refs).await {
1317 Ok(packages) => packages,
1318 Err(ReplayEngineError::ObjectNotExist { id }) => {
1319 warn!(
1323 "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1324 id
1325 );
1326 break;
1327 }
1328 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1329 warn!(
1333 "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1334 id, version
1335 );
1336 break;
1337 }
1338 Err(ReplayEngineError::ObjectVersionTooHigh {
1339 id,
1340 asked_version,
1341 latest_version,
1342 }) => {
1343 warn!(
1344 "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1345 id, asked_version, latest_version
1346 );
1347 break;
1348 }
1349 Err(ReplayEngineError::ObjectDeleted {
1350 id,
1351 version,
1352 digest,
1353 }) => {
1354 warn!(
1358 "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1359 id, version, digest
1360 );
1361 break;
1362 }
1363 Err(e) => return Err(e),
1364 };
1365 }
1366 Ok(mapping)
1367 }
1368
1369 pub async fn get_protocol_config(
1370 &self,
1371 epoch_id: EpochId,
1372 chain: Chain,
1373 ) -> Result<ProtocolConfig, ReplayEngineError> {
1374 match self.protocol_version {
1375 Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1376 Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1377 None => self
1378 .protocol_version_epoch_table
1379 .iter()
1380 .rev()
1381 .find(|(_, rg)| epoch_id >= rg.epoch_start)
1382 .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1383 .unwrap_or_else(|| {
1384 Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1385 }),
1386 }
1387 }
1388
1389 pub async fn checkpoints_for_epoch(
1390 &self,
1391 epoch_id: u64,
1392 ) -> Result<(u64, u64), ReplayEngineError> {
1393 let epoch_change_events = self
1394 .fetcher
1395 .get_epoch_change_events(true)
1396 .await?
1397 .into_iter()
1398 .collect::<Vec<_>>();
1399 let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1400 (0, 1)
1401 } else {
1402 let idx = epoch_change_events
1403 .iter()
1404 .position(|ev| match extract_epoch_and_version(ev.clone()) {
1405 Ok((epoch, _)) => epoch == epoch_id,
1406 Err(_) => false,
1407 })
1408 .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1409 let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1410 (
1411 self.fetcher
1412 .get_transaction(&epoch_change_tx)
1413 .await?
1414 .checkpoint
1415 .unwrap_or_else(|| {
1416 panic!(
1417 "Checkpoint for transaction {epoch_change_tx} not present. Could be due to pruning"
1418 )
1419 }),
1420 idx,
1421 )
1422 };
1423
1424 let next_epoch_change_tx = epoch_change_events
1425 .get(start_epoch_idx + 1)
1426 .map(|v| v.id.tx_digest)
1427 .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1428
1429 let next_epoch_checkpoint = self
1430 .fetcher
1431 .get_transaction(&next_epoch_change_tx)
1432 .await?
1433 .checkpoint
1434 .unwrap_or_else(|| {
1435 panic!(
1436 "Checkpoint for transaction {next_epoch_change_tx} not present. Could be due to pruning"
1437 )
1438 });
1439
1440 Ok((start_checkpoint, next_epoch_checkpoint - 1))
1441 }
1442
1443 pub async fn get_epoch_start_timestamp_and_rgp(
1444 &self,
1445 epoch_id: u64,
1446 tx_digest: &TransactionDigest,
1447 ) -> Result<(u64, u64), ReplayEngineError> {
1448 if epoch_id == 0 {
1449 return Err(ReplayEngineError::TransactionNotSupported {
1450 digest: *tx_digest,
1451 reason: "Transactions from epoch 0 not supported".to_string(),
1452 });
1453 }
1454 self.fetcher
1455 .get_epoch_start_timestamp_and_rgp(epoch_id)
1456 .await
1457 }
1458
1459 fn add_config_objects_if_needed(
1460 &self,
1461 status: &IotaExecutionStatus,
1462 ) -> Vec<(ObjectID, SequenceNumber)> {
1463 match parse_effect_error_for_denied_coins(status) {
1464 Some(coin_type) => {
1465 let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1466 panic!(
1467 "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1468 );
1469 };
1470 if !config_id_and_version
1472 .iter()
1473 .any(|(id, _)| id == &IOTA_DENY_LIST_OBJECT_ID)
1474 {
1475 let deny_list_oid_version = self.download_latest_object(&IOTA_DENY_LIST_OBJECT_ID)
1476 .ok()
1477 .flatten()
1478 .expect("Unable to download the deny list object for a transaction that requires it")
1479 .version();
1480 config_id_and_version.push((IOTA_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1481 }
1482 config_id_and_version
1483 }
1484 None => vec![],
1485 }
1486 }
1487
1488 async fn resolve_tx_components(
1489 &self,
1490 tx_digest: &TransactionDigest,
1491 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1492 assert!(self.is_remote_replay());
1493 let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1495 let sender = match tx_info.clone().transaction.unwrap().data {
1496 iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.sender,
1497 };
1498 let IotaTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1499
1500 let config_objects = self.add_config_objects_if_needed(effects.status());
1501
1502 let raw_tx_bytes = tx_info.clone().raw_transaction;
1503 let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1504 let input_objs = orig_tx
1505 .transaction_data()
1506 .input_objects()
1507 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1508 let tx_kind_orig = orig_tx.transaction_data().kind();
1509
1510 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1512
1513 let shared_object_refs: Vec<ObjectRef> = effects
1514 .shared_objects()
1515 .iter()
1516 .map(|so_ref| {
1517 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1518 unimplemented!(
1519 "Replay of deleted shared object transactions is not supported yet"
1520 );
1521 } else {
1522 so_ref.to_object_ref()
1523 }
1524 })
1525 .collect();
1526 let gas_data = match tx_info.clone().transaction.unwrap().data {
1527 iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.gas_data,
1528 };
1529 let gas_object_refs: Vec<_> = gas_data
1530 .payment
1531 .iter()
1532 .map(|obj_ref| obj_ref.to_object_ref())
1533 .collect();
1534 let receiving_objs = orig_tx
1535 .transaction_data()
1536 .receiving_objects()
1537 .into_iter()
1538 .map(|(obj_id, version, _)| (obj_id, version))
1539 .collect();
1540
1541 let epoch_id = effects.executed_epoch;
1542 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1543
1544 let (epoch_start_timestamp, reference_gas_price) = self
1546 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1547 .await?;
1548
1549 Ok(OnChainTransactionInfo {
1550 kind: tx_kind_orig.clone(),
1551 sender,
1552 modified_at_versions,
1553 input_objects: input_objs,
1554 shared_object_refs,
1555 gas: gas_object_refs,
1556 gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1557 gas_budget: gas_data.budget,
1558 gas_price: gas_data.price,
1559 executed_epoch: epoch_id,
1560 dependencies: effects.dependencies().to_vec(),
1561 effects: IotaTransactionBlockEffects::V1(effects),
1562 receiving_objs,
1563 config_objects,
1564 protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1568 tx_digest: *tx_digest,
1569 epoch_start_timestamp,
1570 sender_signed_data: orig_tx.clone(),
1571 reference_gas_price,
1572 chain,
1573 })
1574 }
1575
1576 async fn resolve_tx_components_from_dump(
1577 &self,
1578 tx_digest: &TransactionDigest,
1579 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1580 assert!(!self.is_remote_replay());
1581
1582 let dp = self.fetcher.as_node_state_dump();
1583
1584 let sender = dp
1585 .node_state_dump
1586 .sender_signed_data
1587 .transaction_data()
1588 .sender();
1589 let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1590 let effects = dp.node_state_dump.computed_effects.clone();
1591 let effects = IotaTransactionBlockEffects::try_from(effects).unwrap();
1592 let config_objects = self.add_config_objects_if_needed(effects.status());
1595
1596 let input_objs = orig_tx
1600 .transaction_data()
1601 .input_objects()
1602 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1603 let tx_kind_orig = orig_tx.transaction_data().kind();
1604
1605 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1607
1608 let shared_object_refs: Vec<ObjectRef> = effects
1609 .shared_objects()
1610 .iter()
1611 .map(|so_ref| {
1612 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1613 unimplemented!(
1614 "Replay of deleted shared object transactions is not supported yet"
1615 );
1616 } else {
1617 so_ref.to_object_ref()
1618 }
1619 })
1620 .collect();
1621 let receiving_objs = orig_tx
1622 .transaction_data()
1623 .receiving_objects()
1624 .into_iter()
1625 .map(|(obj_id, version, _)| (obj_id, version))
1626 .collect();
1627
1628 let epoch_id = dp.node_state_dump.executed_epoch;
1629
1630 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1631
1632 let protocol_config =
1633 ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1634 let (epoch_start_timestamp, reference_gas_price) = self
1636 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1637 .await?;
1638 let gas_data = orig_tx.transaction_data().gas_data();
1639 let gas_object_refs: Vec<_> = gas_data.clone().payment;
1640
1641 Ok(OnChainTransactionInfo {
1642 kind: tx_kind_orig.clone(),
1643 sender,
1644 modified_at_versions,
1645 input_objects: input_objs,
1646 shared_object_refs,
1647 gas: gas_object_refs,
1648 gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1649 gas_budget: gas_data.budget,
1650 gas_price: gas_data.price,
1651 executed_epoch: epoch_id,
1652 dependencies: effects.dependencies().to_vec(),
1653 effects,
1654 receiving_objs,
1655 config_objects,
1656 protocol_version: protocol_config.version,
1657 tx_digest: *tx_digest,
1658 epoch_start_timestamp,
1659 sender_signed_data: orig_tx.clone(),
1660 reference_gas_price,
1661 chain,
1662 })
1663 }
1664
1665 async fn resolve_download_input_objects(
1666 &mut self,
1667 tx_info: &OnChainTransactionInfo,
1668 deleted_shared_objects: Vec<ObjectRef>,
1669 ) -> Result<InputObjects, ReplayEngineError> {
1670 let mut package_inputs = vec![];
1672 let mut imm_owned_inputs = vec![];
1673 let mut shared_inputs = vec![];
1674 let mut deleted_shared_info_map = BTreeMap::new();
1675
1676 if !deleted_shared_objects.is_empty() {
1680 for tx_digest in tx_info.dependencies.iter() {
1681 let tx_info = self.resolve_tx_components(tx_digest).await?;
1682 for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1683 deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1684 }
1685 }
1686 }
1687
1688 tx_info
1689 .input_objects
1690 .iter()
1691 .map(|kind| match kind {
1692 InputObjectKind::MovePackage(i) => {
1693 package_inputs.push(*i);
1694 Ok(())
1695 }
1696 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1697 imm_owned_inputs.push((o_ref.0, o_ref.1));
1698 Ok(())
1699 }
1700 InputObjectKind::SharedMoveObject {
1701 id,
1702 initial_shared_version: _,
1703 mutable: _,
1704 } if !deleted_shared_info_map.contains_key(id) => {
1705 if let Some(o) = self
1707 .storage
1708 .live_objects_store
1709 .lock()
1710 .expect("Can't lock")
1711 .get(id)
1712 {
1713 shared_inputs.push(o.clone());
1714 Ok(())
1715 } else {
1716 Err(ReplayEngineError::InternalCacheInvariantViolation {
1717 id: *id,
1718 version: None,
1719 })
1720 }
1721 }
1722 _ => Ok(()),
1723 })
1724 .collect::<Result<Vec<_>, _>>()?;
1725
1726 let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1728
1729 in_objs.extend(
1732 self.multi_download_relevant_packages_and_store(
1733 package_inputs,
1734 tx_info.protocol_version.as_u64(),
1735 )
1736 .await?,
1737 );
1738 in_objs.extend(shared_inputs);
1740
1741 let resolved_input_objs = tx_info
1744 .input_objects
1745 .iter()
1746 .flat_map(|kind| match kind {
1747 InputObjectKind::MovePackage(i) => {
1748 Some(ObjectReadResult::new(
1750 *kind,
1751 self.storage
1752 .package_cache
1753 .lock()
1754 .expect("Cannot lock")
1755 .get(i)
1756 .unwrap_or(
1757 &self
1758 .download_latest_object(i)
1759 .expect("Object download failed")
1760 .expect("Object not found on chain"),
1761 )
1762 .clone()
1763 .into(),
1764 ))
1765 }
1766 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1767 *kind,
1768 self.storage
1769 .object_version_cache
1770 .lock()
1771 .expect("Cannot lock")
1772 .get(&(o_ref.0, o_ref.1))
1773 .unwrap()
1774 .clone()
1775 .into(),
1776 )),
1777 InputObjectKind::SharedMoveObject { id, .. }
1778 if !deleted_shared_info_map.contains_key(id) =>
1779 {
1780 Some(ObjectReadResult::new(
1782 *kind,
1783 self.storage
1784 .live_objects_store
1785 .lock()
1786 .expect("Can't lock")
1787 .get(id)
1788 .unwrap()
1789 .clone()
1790 .into(),
1791 ))
1792 }
1793 InputObjectKind::SharedMoveObject { id, .. } => {
1794 let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1795 Some(ObjectReadResult::new(
1796 *kind,
1797 ObjectReadResultKind::DeletedSharedObject(*version, *digest),
1798 ))
1799 }
1800 })
1801 .collect();
1802
1803 Ok(InputObjects::new(resolved_input_objs))
1804 }
1805
1806 async fn initialize_execution_env_state(
1809 &mut self,
1810 tx_info: &OnChainTransactionInfo,
1811 ) -> Result<InputObjects, ReplayEngineError> {
1812 self.current_protocol_version = tx_info.protocol_version.as_u64();
1814
1815 self.multi_download_and_store(&tx_info.modified_at_versions)
1817 .await?;
1818
1819 let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1820 .shared_object_refs
1821 .iter()
1822 .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1823
1824 let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1826 self.multi_download_and_store(&shared_refs).await?;
1827
1828 let gas_refs: Vec<_> = tx_info
1831 .gas
1832 .iter()
1833 .filter_map(|w| (w.0 != ObjectID::ZERO).then_some((w.0, w.1)))
1834 .collect();
1835 self.multi_download_and_store(&gas_refs).await?;
1836
1837 let input_objs = self
1839 .resolve_download_input_objects(tx_info, deleted_shared_refs)
1840 .await?;
1841
1842 self.multi_download_and_store(&tx_info.receiving_objs)
1844 .await?;
1845
1846 self.multi_download_and_store(&tx_info.config_objects)
1848 .await?;
1849
1850 let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1854 self.multi_download_and_store(&loaded_child_refs).await?;
1855 tokio::task::yield_now().await;
1856
1857 Ok(input_objs)
1858 }
1859}
1860
1861impl BackingPackageStore for LocalExec {
1865 fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1869 fn inner(self_: &LocalExec, package_id: &ObjectID) -> IotaResult<Option<Object>> {
1870 self_
1872 .get_or_download_object(package_id, true )
1873 .map_err(|e| IotaError::Storage(e.to_string()))
1874 }
1875
1876 let res = inner(self, package_id);
1877 self.exec_store_events
1878 .lock()
1879 .expect("Unable to lock events list")
1880 .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1881 package_id: *package_id,
1882 result: res.clone(),
1883 });
1884 res.map(|o| o.map(PackageObject::new))
1885 }
1886}
1887
1888impl ChildObjectResolver for LocalExec {
1889 fn read_child_object(
1892 &self,
1893 parent: &ObjectID,
1894 child: &ObjectID,
1895 child_version_upper_bound: SequenceNumber,
1896 ) -> IotaResult<Option<Object>> {
1897 fn inner(
1898 self_: &LocalExec,
1899 parent: &ObjectID,
1900 child: &ObjectID,
1901 child_version_upper_bound: SequenceNumber,
1902 ) -> IotaResult<Option<Object>> {
1903 let child_object =
1904 match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1905 None => return Ok(None),
1906 Some(o) => o,
1907 };
1908 let child_version = child_object.version();
1909 if child_object.version() > child_version_upper_bound {
1910 return Err(IotaError::Unknown(format!(
1911 "Invariant Violation. Replay loaded child_object {child} at version \
1912 {child_version} but expected the version to be <= {child_version_upper_bound}"
1913 )));
1914 }
1915 let parent = *parent;
1916 if child_object.owner != Owner::ObjectOwner(parent.into()) {
1917 return Err(IotaError::InvalidChildObjectAccess {
1918 object: *child,
1919 given_parent: parent,
1920 actual_owner: child_object.owner,
1921 });
1922 }
1923 Ok(Some(child_object))
1924 }
1925
1926 let res = inner(self, parent, child, child_version_upper_bound);
1927 self.exec_store_events
1928 .lock()
1929 .expect("Unable to lock events list")
1930 .push(
1931 ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1932 parent: *parent,
1933 child: *child,
1934 result: res.clone(),
1935 },
1936 );
1937 res
1938 }
1939
1940 fn get_object_received_at_version(
1941 &self,
1942 owner: &ObjectID,
1943 receiving_object_id: &ObjectID,
1944 receive_object_at_version: SequenceNumber,
1945 _epoch_id: EpochId,
1946 ) -> IotaResult<Option<Object>> {
1947 fn inner(
1948 self_: &LocalExec,
1949 owner: &ObjectID,
1950 receiving_object_id: &ObjectID,
1951 receive_object_at_version: SequenceNumber,
1952 ) -> IotaResult<Option<Object>> {
1953 let recv_object = match self_.try_get_object(receiving_object_id)? {
1954 None => return Ok(None),
1955 Some(o) => o,
1956 };
1957 if recv_object.version() != receive_object_at_version {
1958 return Err(IotaError::Unknown(format!(
1959 "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
1960 {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
1961 )));
1962 }
1963 if recv_object.owner != Owner::AddressOwner((*owner).into()) {
1964 return Ok(None);
1965 }
1966 Ok(Some(recv_object))
1967 }
1968
1969 let res = inner(self, owner, receiving_object_id, receive_object_at_version);
1970 self.exec_store_events
1971 .lock()
1972 .expect("Unable to lock events list")
1973 .push(ExecutionStoreEvent::ReceiveObject {
1974 owner: *owner,
1975 receive: *receiving_object_id,
1976 receive_at_version: receive_object_at_version,
1977 result: res.clone(),
1978 });
1979 res
1980 }
1981}
1982
1983impl ResourceResolver for LocalExec {
1984 type Error = IotaError;
1985
1986 fn get_resource(
1990 &self,
1991 address: &AccountAddress,
1992 type_: &StructTag,
1993 ) -> IotaResult<Option<Vec<u8>>> {
1994 fn inner(
1995 self_: &LocalExec,
1996 address: &AccountAddress,
1997 type_: &StructTag,
1998 ) -> IotaResult<Option<Vec<u8>>> {
1999 let Some(object) = self_.get_or_download_object(
2001 &ObjectID::from(*address),
2002 false, )?
2004 else {
2005 return Ok(None);
2006 };
2007
2008 match &object.data {
2009 Data::Move(m) => {
2010 assert!(
2011 m.is_type(type_),
2012 "Invariant violation: ill-typed object in storage \
2013 or bad object request from caller"
2014 );
2015 Ok(Some(m.contents().to_vec()))
2016 }
2017 other => unimplemented!(
2018 "Bad object lookup: expected Move object, but got {:#?}",
2019 other
2020 ),
2021 }
2022 }
2023
2024 let res = inner(self, address, type_);
2025 self.exec_store_events
2026 .lock()
2027 .expect("Unable to lock events list")
2028 .push(ExecutionStoreEvent::ResourceResolverGetResource {
2029 address: *address,
2030 typ: type_.clone(),
2031 result: res.clone(),
2032 });
2033 res
2034 }
2035}
2036
2037impl ModuleResolver for LocalExec {
2038 type Error = IotaError;
2039
2040 fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2043 fn inner(self_: &LocalExec, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2044 get_module(self_, module_id)
2045 }
2046
2047 let res = inner(self, module_id);
2048 self.exec_store_events
2049 .lock()
2050 .expect("Unable to lock events list")
2051 .push(ExecutionStoreEvent::ModuleResolverGetModule {
2052 module_id: module_id.clone(),
2053 result: res.clone(),
2054 });
2055 res
2056 }
2057}
2058
2059impl ModuleResolver for &mut LocalExec {
2060 type Error = IotaError;
2061
2062 fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2063 (**self).get_module(module_id)
2066 }
2067}
2068
2069impl ObjectStore for LocalExec {
2070 fn try_get_object(
2073 &self,
2074 object_id: &ObjectID,
2075 ) -> iota_types::storage::error::Result<Option<Object>> {
2076 let res = self
2077 .storage
2078 .live_objects_store
2079 .lock()
2080 .expect("Can't lock")
2081 .get(object_id)
2082 .cloned();
2083 self.exec_store_events
2084 .lock()
2085 .expect("Unable to lock events list")
2086 .push(ExecutionStoreEvent::ObjectStoreGetObject {
2087 object_id: *object_id,
2088 result: Ok(res.clone()),
2089 });
2090 Ok(res)
2091 }
2092
2093 fn try_get_object_by_key(
2096 &self,
2097 object_id: &ObjectID,
2098 version: VersionNumber,
2099 ) -> iota_types::storage::error::Result<Option<Object>> {
2100 let res = self
2101 .storage
2102 .live_objects_store
2103 .lock()
2104 .expect("Can't lock")
2105 .get(object_id)
2106 .and_then(|obj| {
2107 if obj.version() == version {
2108 Some(obj.clone())
2109 } else {
2110 None
2111 }
2112 });
2113
2114 self.exec_store_events
2115 .lock()
2116 .expect("Unable to lock events list")
2117 .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2118 object_id: *object_id,
2119 version,
2120 result: Ok(res.clone()),
2121 });
2122
2123 Ok(res)
2124 }
2125}
2126
2127impl ObjectStore for &mut LocalExec {
2128 fn try_get_object(
2129 &self,
2130 object_id: &ObjectID,
2131 ) -> iota_types::storage::error::Result<Option<Object>> {
2132 (**self).try_get_object(object_id)
2135 }
2136
2137 fn try_get_object_by_key(
2138 &self,
2139 object_id: &ObjectID,
2140 version: VersionNumber,
2141 ) -> iota_types::storage::error::Result<Option<Object>> {
2142 (**self).try_get_object_by_key(object_id, version)
2145 }
2146}
2147
2148impl GetModule for LocalExec {
2149 type Error = IotaError;
2150 type Item = CompiledModule;
2151
2152 fn get_module_by_id(&self, id: &ModuleId) -> IotaResult<Option<Self::Item>> {
2153 let res = get_module_by_id(self, id);
2154
2155 self.exec_store_events
2156 .lock()
2157 .expect("Unable to lock events list")
2158 .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2159 id: id.clone(),
2160 result: res.clone(),
2161 });
2162 res
2163 }
2164}
2165
2166pub fn get_executor(
2169 executor_version_override: Option<i64>,
2170 protocol_config: &ProtocolConfig,
2171 _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2172 enable_profiler: Option<PathBuf>,
2173) -> Arc<dyn Executor + Send + Sync> {
2174 let protocol_config = executor_version_override
2175 .map(|q| {
2176 let ver = if q < 0 {
2177 ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2178 } else {
2179 q as u64
2180 };
2181
2182 let mut c = protocol_config.clone();
2183 c.set_execution_version_for_testing(ver);
2184 c
2185 })
2186 .unwrap_or(protocol_config.clone());
2187
2188 let silent = true;
2189 iota_execution::executor(&protocol_config, silent, enable_profiler)
2190 .expect("Creating an executor should not fail here")
2191}
2192
2193fn parse_effect_error_for_denied_coins(status: &IotaExecutionStatus) -> Option<String> {
2194 let IotaExecutionStatus::Failure { error } = status else {
2195 return None;
2196 };
2197 parse_denied_error_string(error)
2198}
2199
2200fn parse_denied_error_string(error: &str) -> Option<String> {
2201 let regulated_regex = regex::Regex::new(
2202 r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2203 )
2204 .unwrap();
2205
2206 let caps = regulated_regex.captures(error)?;
2207 Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2208}
2209
2210#[cfg(test)]
2211mod tests {
2212 use super::parse_denied_error_string;
2213 #[test]
2214 fn test_regex_regulated_coin_errors() {
2215 let test_bank = vec![
2216 "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2217 "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2218 ];
2219 let expected_string =
2220 "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2221
2222 for test in &test_bank {
2223 assert!(parse_denied_error_string(test).unwrap() == expected_string);
2224 }
2225 }
2226}