1use std::{
6 collections::{BTreeMap, HashSet},
7 path::PathBuf,
8 sync::{Arc, Mutex},
9};
10
11use futures::executor::block_on;
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_core::authority::NodeStateDump;
14use iota_execution::Executor;
15use iota_framework::BuiltInFramework;
16use iota_json_rpc_types::{
17 IotaExecutionStatus, IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI,
18};
19use iota_protocol_config::{Chain, ProtocolConfig};
20use iota_sdk::{IotaClient, IotaClientBuilder};
21use iota_types::{
22 IOTA_DENY_LIST_OBJECT_ID,
23 base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
24 committee::EpochId,
25 digests::{ObjectDigest, TransactionDigest},
26 error::{ExecutionError, IotaError, IotaResult},
27 executable_transaction::VerifiedExecutableTransaction,
28 gas::IotaGasStatus,
29 in_memory_storage::InMemoryStorage,
30 inner_temporary_store::InnerTemporaryStore,
31 message_envelope::Message,
32 metrics::LimitsMetrics,
33 object::{Data, Object, Owner},
34 storage::{
35 BackingPackageStore, ChildObjectResolver, ObjectStore, PackageObject, get_module,
36 get_module_by_id,
37 },
38 transaction::{
39 CheckedInputObjects, InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
40 SenderSignedData, Transaction, TransactionDataAPI, TransactionKind,
41 TransactionKind::ProgrammableTransaction, VerifiedTransaction,
42 },
43};
44use move_binary_format::CompiledModule;
45use move_bytecode_utils::module_cache::GetModule;
46use move_core_types::{
47 account_address::AccountAddress,
48 language_storage::{ModuleId, StructTag},
49 resolver::{ModuleResolver, ResourceResolver},
50};
51use prometheus::Registry;
52use serde::{Deserialize, Serialize};
53use similar::{ChangeTag, TextDiff};
54use tracing::{error, info, trace, warn};
55
56use crate::{
57 chain_from_chain_id,
58 data_fetcher::{
59 DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
60 },
61 displays::{
62 Pretty,
63 transaction_displays::{FullPTB, transform_command_results_to_annotated},
64 },
65 types::*,
66};
67
68#[derive(Debug, Serialize, Deserialize)]
71pub struct ExecutionSandboxState {
72 pub transaction_info: OnChainTransactionInfo,
74 pub required_objects: Vec<Object>,
76 #[serde(skip)]
79 pub local_exec_temporary_store: Option<InnerTemporaryStore>,
80 pub local_exec_effects: IotaTransactionBlockEffects,
82 #[serde(skip)]
84 pub local_exec_status: Option<Result<(), ExecutionError>>,
85}
86
87impl ExecutionSandboxState {
88 pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
89 if self.transaction_info.effects != self.local_exec_effects {
90 error!("Replay tool forked {}", self.transaction_info.tx_digest);
91 let diff = self.diff_effects();
92 println!("{diff}");
93 return Err(ReplayEngineError::EffectsForked {
94 digest: self.transaction_info.tx_digest,
95 diff: format!("\n{diff}"),
96 on_chain: Box::new(self.transaction_info.effects.clone()),
97 local: Box::new(self.local_exec_effects.clone()),
98 });
99 }
100 Ok(())
101 }
102
103 pub fn diff_effects(&self) -> String {
105 let eff1 = &self.transaction_info.effects;
106 let eff2 = &self.local_exec_effects;
107 let on_chain_str = format!("{eff1:#?}");
108 let local_chain_str = format!("{eff2:#?}");
109 let mut res = vec![];
110
111 let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
112 for change in diff.iter_all_changes() {
113 let sign = match change.tag() {
114 ChangeTag::Delete => "---",
115 ChangeTag::Insert => "+++",
116 ChangeTag::Equal => " ",
117 };
118 res.push(format!("{sign}{change}"));
119 }
120
121 res.join("")
122 }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct ProtocolVersionSummary {
127 pub protocol_version: u64,
129 pub epoch_start: u64,
131 pub epoch_end: u64,
133 pub checkpoint_start: Option<u64>,
135 pub checkpoint_end: Option<u64>,
137 pub epoch_change_tx: TransactionDigest,
139}
140
141#[derive(Clone)]
142pub struct Storage {
143 pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
148
149 pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
152 pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
155}
156
157impl std::fmt::Display for Storage {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 writeln!(f, "Live object store")?;
160 for (id, obj) in self
161 .live_objects_store
162 .lock()
163 .expect("Unable to lock")
164 .iter()
165 {
166 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
167 }
168 writeln!(f, "Package cache")?;
169 for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
170 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
171 }
172 writeln!(f, "Object version cache")?;
173 for (id, _) in self
174 .object_version_cache
175 .lock()
176 .expect("Unable to lock")
177 .iter()
178 {
179 writeln!(f, "{}: {}", id.0, id.1)?;
180 }
181
182 write!(f, "")
183 }
184}
185
186impl Storage {
187 pub fn default() -> Self {
188 Self {
189 live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
190 package_cache: Arc::new(Mutex::new(BTreeMap::new())),
191 object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
192 }
193 }
194
195 pub fn all_objects(&self) -> Vec<Object> {
196 self.live_objects_store
197 .lock()
198 .expect("Unable to lock")
199 .values()
200 .cloned()
201 .chain(
202 self.package_cache
203 .lock()
204 .expect("Unable to lock")
205 .iter()
206 .map(|(_, obj)| obj.clone()),
207 )
208 .chain(
209 self.object_version_cache
210 .lock()
211 .expect("Unable to lock")
212 .iter()
213 .map(|(_, obj)| obj.clone()),
214 )
215 .collect::<Vec<_>>()
216 }
217}
218
219#[derive(Clone)]
220pub struct LocalExec {
221 pub client: Option<IotaClient>,
222 pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
225 pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
227 pub current_protocol_version: u64,
229 pub storage: Storage,
231 pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
233 pub metrics: Arc<LimitsMetrics>,
235 pub fetcher: Fetchers,
237
238 pub executor_version: Option<i64>,
241 pub protocol_version: Option<i64>,
245 pub enable_profiler: Option<PathBuf>,
248 pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
249 pub num_retries_for_timeout: u32,
251 pub sleep_period_for_timeout: std::time::Duration,
252}
253
254impl LocalExec {
255 pub async fn multi_download(
258 &self,
259 objs: &[(ObjectID, SequenceNumber)],
260 ) -> Result<Vec<Object>, ReplayEngineError> {
261 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
262 while num_retries_for_timeout >= 0 {
263 match self.fetcher.multi_get_versioned(objs).await {
264 Ok(objs) => return Ok(objs),
265 Err(ReplayEngineError::IotaRpcRequestTimeout) => {
266 warn!(
267 "RPC request timed out. Retries left {}. Sleeping for {}s",
268 num_retries_for_timeout,
269 self.sleep_period_for_timeout.as_secs()
270 );
271 num_retries_for_timeout -= 1;
272 tokio::time::sleep(self.sleep_period_for_timeout).await;
273 }
274 Err(e) => return Err(e),
275 }
276 }
277 Err(ReplayEngineError::IotaRpcRequestTimeout)
278 }
279 pub async fn multi_download_latest(
282 &self,
283 objs: &[ObjectID],
284 ) -> Result<Vec<Object>, ReplayEngineError> {
285 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
286 while num_retries_for_timeout >= 0 {
287 match self.fetcher.multi_get_latest(objs).await {
288 Ok(objs) => return Ok(objs),
289 Err(ReplayEngineError::IotaRpcRequestTimeout) => {
290 warn!(
291 "RPC request timed out. Retries left {}. Sleeping for {}s",
292 num_retries_for_timeout,
293 self.sleep_period_for_timeout.as_secs()
294 );
295 num_retries_for_timeout -= 1;
296 tokio::time::sleep(self.sleep_period_for_timeout).await;
297 }
298 Err(e) => return Err(e),
299 }
300 }
301 Err(ReplayEngineError::IotaRpcRequestTimeout)
302 }
303
304 pub async fn fetch_loaded_child_refs(
305 &self,
306 tx_digest: &TransactionDigest,
307 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
308 self.fetcher.get_loaded_child_objects(tx_digest).await
310 }
311
312 pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
313 Self::new_for_remote(
314 IotaClientBuilder::default()
315 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
316 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
317 .build(http_url)
318 .await?,
319 None,
320 )
321 .await
322 }
323
324 pub async fn replay_with_network_config(
325 rpc_url: String,
326 tx_digest: TransactionDigest,
327 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
328 use_authority: bool,
329 executor_version: Option<i64>,
330 protocol_version: Option<i64>,
331 enable_profiler: Option<PathBuf>,
332 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
333 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
334 info!("Using RPC URL: {}", rpc_url);
335 LocalExec::new_from_fn_url(&rpc_url)
336 .await?
337 .init_for_execution()
338 .await?
339 .execute_transaction(
340 &tx_digest,
341 expensive_safety_check_config,
342 use_authority,
343 executor_version,
344 protocol_version,
345 enable_profiler,
346 config_and_versions,
347 )
348 .await
349 }
350
351 pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
356 self.populate_protocol_version_tables().await?;
357 tokio::task::yield_now().await;
358 Ok(self)
359 }
360
361 pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
362 Self::new_for_remote(
363 self.client.expect("Remote client not initialized"),
364 Some(self.fetcher.into_remote()),
365 )
366 .await?
367 .init_for_execution()
368 .await
369 }
370
371 pub async fn new_for_remote(
372 client: IotaClient,
373 remote_fetcher: Option<RemoteFetcher>,
374 ) -> Result<Self, ReplayEngineError> {
375 let registry = prometheus::Registry::new();
377 let metrics = Arc::new(LimitsMetrics::new(®istry));
378
379 let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
380
381 Ok(Self {
382 client: Some(client),
383 protocol_version_epoch_table: BTreeMap::new(),
384 protocol_version_system_package_table: BTreeMap::new(),
385 current_protocol_version: 0,
386 exec_store_events: Arc::new(Mutex::new(Vec::new())),
387 metrics,
388 storage: Storage::default(),
389 fetcher: Fetchers::Remote(fetcher),
390 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
392 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
393 executor_version: None,
394 protocol_version: None,
395 enable_profiler: None,
396 config_and_versions: None,
397 })
398 }
399
400 pub async fn new_for_state_dump(
401 path: &str,
402 backup_rpc_url: Option<String>,
403 ) -> Result<Self, ReplayEngineError> {
404 let registry = prometheus::Registry::new();
406 let metrics = Arc::new(LimitsMetrics::new(®istry));
407
408 let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
409 let current_protocol_version = state.protocol_version;
410 let fetcher = match backup_rpc_url {
411 Some(url) => NodeStateDumpFetcher::new(
412 state,
413 Some(RemoteFetcher::new(
414 IotaClientBuilder::default()
415 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
416 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
417 .build(url)
418 .await?,
419 )),
420 ),
421 None => NodeStateDumpFetcher::new(state, None),
422 };
423
424 Ok(Self {
425 client: None,
426 protocol_version_epoch_table: BTreeMap::new(),
427 protocol_version_system_package_table: BTreeMap::new(),
428 current_protocol_version,
429 exec_store_events: Arc::new(Mutex::new(Vec::new())),
430 metrics,
431 storage: Storage::default(),
432 fetcher: Fetchers::NodeStateDump(fetcher),
433 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
435 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
436 executor_version: None,
437 protocol_version: None,
438 enable_profiler: None,
439 config_and_versions: None,
440 })
441 }
442
443 pub async fn multi_download_and_store(
444 &mut self,
445 objs: &[(ObjectID, SequenceNumber)],
446 ) -> Result<Vec<Object>, ReplayEngineError> {
447 let objs = self.multi_download(objs).await?;
448
449 for obj in objs.iter() {
451 let o_ref = obj.compute_object_reference();
452 self.storage
453 .live_objects_store
454 .lock()
455 .expect("Can't lock")
456 .insert(o_ref.0, obj.clone());
457 self.storage
458 .object_version_cache
459 .lock()
460 .expect("Cannot lock")
461 .insert((o_ref.0, o_ref.1), obj.clone());
462 if obj.is_package() {
463 self.storage
464 .package_cache
465 .lock()
466 .expect("Cannot lock")
467 .insert(o_ref.0, obj.clone());
468 }
469 }
470 tokio::task::yield_now().await;
471 Ok(objs)
472 }
473
474 pub async fn multi_download_relevant_packages_and_store(
475 &mut self,
476 objs: Vec<ObjectID>,
477 protocol_version: u64,
478 ) -> Result<Vec<Object>, ReplayEngineError> {
479 let syst_packages_objs = if self.protocol_version.is_some_and(|i| i < 0) {
480 BuiltInFramework::genesis_objects().collect()
481 } else {
482 let syst_packages =
483 self.system_package_versions_for_protocol_version(protocol_version)?;
484 self.multi_download(&syst_packages).await?
485 };
486
487 let non_system_package_objs: Vec<_> = objs
490 .into_iter()
491 .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
492 .collect();
493 let objs = self
494 .multi_download_latest(&non_system_package_objs)
495 .await?
496 .into_iter()
497 .chain(syst_packages_objs.into_iter());
498
499 for obj in objs.clone() {
500 let o_ref = obj.compute_object_reference();
501 self.storage
504 .object_version_cache
505 .lock()
506 .expect("Cannot lock")
507 .insert((o_ref.0, o_ref.1), obj.clone());
508 if obj.is_package() {
509 self.storage
510 .package_cache
511 .lock()
512 .expect("Cannot lock")
513 .insert(o_ref.0, obj.clone());
514 }
515 }
516 Ok(objs.collect())
517 }
518
519 #[expect(clippy::disallowed_methods)]
521 pub fn download_object(
522 &self,
523 object_id: &ObjectID,
524 version: SequenceNumber,
525 ) -> Result<Object, ReplayEngineError> {
526 if self
527 .storage
528 .object_version_cache
529 .lock()
530 .expect("Cannot lock")
531 .contains_key(&(*object_id, version))
532 {
533 return Ok(self
534 .storage
535 .object_version_cache
536 .lock()
537 .expect("Cannot lock")
538 .get(&(*object_id, version))
539 .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
540 id: *object_id,
541 version: Some(version),
542 })?
543 .clone());
544 }
545
546 let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
547 q.pop().unwrap_or_else(|| {
548 panic!(
549 "Downloaded obj response cannot be empty {:?}",
550 (*object_id, version)
551 )
552 })
553 })?;
554
555 let o_ref = o.compute_object_reference();
556 self.storage
557 .object_version_cache
558 .lock()
559 .expect("Cannot lock")
560 .insert((o_ref.0, o_ref.1), o.clone());
561 Ok(o)
562 }
563
564 #[expect(clippy::disallowed_methods)]
566 pub fn download_latest_object(
567 &self,
568 object_id: &ObjectID,
569 ) -> Result<Option<Object>, ReplayEngineError> {
570 let resp = block_on({
571 self.multi_download_latest(&[*object_id])
573 })
574 .map(|mut q| {
575 q.pop()
576 .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
577 });
578
579 match resp {
580 Ok(v) => Ok(Some(v)),
581 Err(ReplayEngineError::ObjectNotExist { id }) => {
582 error!(
583 "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
584 );
585 Ok(None)
586 }
587 Err(ReplayEngineError::ObjectDeleted {
588 id,
589 version,
590 digest,
591 }) => {
592 error!("Object {id} {version} {digest} was deleted on RPC server.");
593 Ok(None)
594 }
595 Err(err) => Err(ReplayEngineError::IotaRpcError {
596 err: err.to_string(),
597 }),
598 }
599 }
600
601 #[expect(clippy::disallowed_methods)]
602 pub fn download_object_by_upper_bound(
603 &self,
604 object_id: &ObjectID,
605 version_upper_bound: VersionNumber,
606 ) -> Result<Option<Object>, ReplayEngineError> {
607 let local_object = self
608 .storage
609 .live_objects_store
610 .lock()
611 .expect("Can't lock")
612 .get(object_id)
613 .cloned();
614 if local_object.is_some() {
615 return Ok(local_object);
616 }
617 let response = block_on({
618 self.fetcher
619 .get_child_object(object_id, version_upper_bound)
620 });
621 match response {
622 Ok(object) => {
623 let obj_ref = object.compute_object_reference();
624 self.storage
625 .live_objects_store
626 .lock()
627 .expect("Can't lock")
628 .insert(*object_id, object.clone());
629 self.storage
630 .object_version_cache
631 .lock()
632 .expect("Can't lock")
633 .insert((obj_ref.0, obj_ref.1), object.clone());
634 Ok(Some(object))
635 }
636 Err(ReplayEngineError::ObjectNotExist { id }) => {
637 error!(
638 "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
639 );
640 Ok(None)
641 }
642 Err(ReplayEngineError::ObjectDeleted {
643 id,
644 version,
645 digest,
646 }) => {
647 error!("Object {id} {version} {digest} was deleted on RPC server.");
648 Ok(None)
649 }
650 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
653 info!(
654 "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
655 );
656 Ok(None)
657 }
658 Err(err) => Err(ReplayEngineError::IotaRpcError {
659 err: err.to_string(),
660 }),
661 }
662 }
663
664 pub async fn get_checkpoint_txs(
665 &self,
666 checkpoint_id: u64,
667 ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
668 self.fetcher
669 .get_checkpoint_txs(checkpoint_id)
670 .await
671 .map_err(|e| ReplayEngineError::IotaRpcError { err: e.to_string() })
672 }
673
674 pub async fn execute_all_in_checkpoints(
675 &mut self,
676 checkpoint_ids: &[u64],
677 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
678 terminate_early: bool,
679 use_authority: bool,
680 ) -> Result<(u64, u64), ReplayEngineError> {
681 let mut txs = Vec::new();
683 for checkpoint_id in checkpoint_ids {
684 txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
685 }
686 let num = txs.len();
687 let mut succeeded = 0;
688 for tx in txs {
689 match self
690 .execute_transaction(
691 &tx,
692 expensive_safety_check_config.clone(),
693 use_authority,
694 None,
695 None,
696 None,
697 None,
698 )
699 .await
700 .map(|q| q.check_effects())
701 {
702 Err(e) | Ok(Err(e)) => {
703 if terminate_early {
704 return Err(e);
705 }
706 error!("Error executing tx: {}, {:#?}", tx, e);
707 continue;
708 }
709 _ => (),
710 }
711
712 succeeded += 1;
713 }
714 Ok((succeeded, num as u64))
715 }
716
717 pub async fn execution_engine_execute_with_tx_info_impl(
718 &mut self,
719 tx_info: &OnChainTransactionInfo,
720 override_transaction_kind: Option<TransactionKind>,
721 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
722 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
723 let tx_digest = &tx_info.tx_digest;
724
725 let input_objects = self.initialize_execution_env_state(tx_info).await?;
728 assert_eq!(
729 &input_objects.filter_shared_objects().len(),
730 &tx_info.shared_object_refs.len()
731 );
732 let protocol_config =
737 &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
738
739 let metrics = self.metrics.clone();
740
741 let ov = self.executor_version;
742
743 let executor = get_executor(
745 ov,
746 protocol_config,
747 expensive_safety_check_config,
748 self.enable_profiler.clone(),
749 );
750
751 let expensive_checks = true;
753 let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
754 let certificate_deny_set = HashSet::new();
755 let gas_status = if tx_info.kind.is_system_tx() {
756 IotaGasStatus::new_unmetered()
757 } else {
758 IotaGasStatus::new(
759 tx_info.gas_budget,
760 tx_info.gas_price,
761 tx_info.reference_gas_price,
762 protocol_config,
763 )
764 .expect("Failed to create gas status")
765 };
766 let (inner_store, gas_status, effects, result) = executor.execute_transaction_to_effects(
767 &self,
768 protocol_config,
769 metrics.clone(),
770 expensive_checks,
771 &certificate_deny_set,
772 &tx_info.executed_epoch,
773 tx_info.epoch_start_timestamp,
774 CheckedInputObjects::new_for_replay(input_objects.clone()),
775 tx_info.gas.clone(),
776 gas_status,
777 transaction_kind.clone(),
778 tx_info.sender,
779 *tx_digest,
780 &mut None,
781 );
782
783 if let Err(err) = self.pretty_print_for_tracing(
784 &gas_status,
785 &executor,
786 tx_info,
787 &transaction_kind,
788 protocol_config,
789 metrics,
790 expensive_checks,
791 input_objects.clone(),
792 ) {
793 error!("Failed to pretty print for tracing: {:?}", err);
794 }
795
796 let all_required_objects = self.storage.all_objects();
797
798 let effects =
799 IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
800
801 Ok(ExecutionSandboxState {
802 transaction_info: tx_info.clone(),
803 required_objects: all_required_objects,
804 local_exec_temporary_store: Some(inner_store),
805 local_exec_effects: effects,
806 local_exec_status: Some(result),
807 })
808 }
809
810 fn pretty_print_for_tracing(
811 &self,
812 gas_status: &IotaGasStatus,
813 executor: &Arc<dyn Executor + Send + Sync>,
814 tx_info: &OnChainTransactionInfo,
815 transaction_kind: &TransactionKind,
816 protocol_config: &ProtocolConfig,
817 metrics: Arc<LimitsMetrics>,
818 expensive_checks: bool,
819 input_objects: InputObjects,
820 ) -> anyhow::Result<()> {
821 trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
822
823 let skip_checks = true;
824 if let ProgrammableTransaction(pt) = transaction_kind {
825 trace!(
826 target: "replay_ptb_info",
827 "{}",
828 Pretty(&FullPTB {
829 ptb: pt.clone(),
830 results: transform_command_results_to_annotated(
831 executor,
832 &self.clone(),
833 executor.dev_inspect_transaction(
834 &self,
835 protocol_config,
836 metrics,
837 expensive_checks,
838 &HashSet::new(),
839 &tx_info.executed_epoch,
840 tx_info.epoch_start_timestamp,
841 CheckedInputObjects::new_for_replay(input_objects),
842 tx_info.gas.clone(),
843 IotaGasStatus::new(
844 tx_info.gas_budget,
845 tx_info.gas_price,
846 tx_info.reference_gas_price,
847 protocol_config,
848 )?,
849 transaction_kind.clone(),
850 tx_info.sender,
851 tx_info.sender_signed_data.digest(),
852 skip_checks,
853 )
854 .3
855 .unwrap_or_default(),
856 )?,
857 }));
858 }
859 Ok(())
860 }
861
862 pub async fn execution_engine_execute_impl(
864 &mut self,
865 tx_digest: &TransactionDigest,
866 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
867 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
868 if self.is_remote_replay() {
869 assert!(
870 !self.protocol_version_system_package_table.is_empty()
871 || !self.protocol_version_epoch_table.is_empty(),
872 "Required tables not populated. Must call `init_for_execution` before executing transactions"
873 );
874 }
875
876 let tx_info = if self.is_remote_replay() {
877 self.resolve_tx_components(tx_digest).await?
878 } else {
879 self.resolve_tx_components_from_dump(tx_digest).await?
880 };
881 self.execution_engine_execute_with_tx_info_impl(
882 &tx_info,
883 None,
884 expensive_safety_check_config,
885 )
886 .await
887 }
888
889 pub async fn certificate_execute_with_sandbox_state(
893 pre_run_sandbox: &ExecutionSandboxState,
894 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
895 let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
897 let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
898 let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
899 let protocol_config = ProtocolConfig::get_for_version(
900 pre_run_sandbox.transaction_info.protocol_version,
901 pre_run_sandbox.transaction_info.chain,
902 );
903 let required_objects = pre_run_sandbox.required_objects.clone();
904 let store = InMemoryStorage::new(required_objects.clone());
905
906 let transaction =
907 Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
908
909 let input_objects = store.read_input_objects_for_transaction(&transaction);
915 let executable = VerifiedExecutableTransaction::new_from_quorum_execution(
916 VerifiedTransaction::new_unchecked(transaction),
917 executed_epoch,
918 );
919 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
920 &executable,
921 input_objects,
922 &protocol_config,
923 reference_gas_price,
924 )
925 .unwrap();
926 let (kind, signer, gas) = executable.transaction_data().execution_parts();
927 let executor = iota_execution::executor(&protocol_config, true, None).unwrap();
928 let (_, _, effects, exec_res) = executor.execute_transaction_to_effects(
929 &store,
930 &protocol_config,
931 Arc::new(LimitsMetrics::new(&Registry::new())),
932 true,
933 &HashSet::new(),
934 &executed_epoch,
935 epoch_start_timestamp,
936 input_objects,
937 gas,
938 gas_status,
939 kind,
940 signer,
941 *executable.digest(),
942 &mut None,
943 );
944
945 let effects =
946 IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
947
948 Ok(ExecutionSandboxState {
949 transaction_info: pre_run_sandbox.transaction_info.clone(),
950 required_objects,
951 local_exec_temporary_store: None, local_exec_effects: effects,
953 local_exec_status: Some(exec_res),
954 })
955 }
956
957 pub async fn certificate_execute(
961 &mut self,
962 tx_digest: &TransactionDigest,
963 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
964 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
965 let pre_run_sandbox = self
967 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
968 .await?;
969 Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
970 }
971
972 pub async fn execution_engine_execute(
976 &mut self,
977 tx_digest: &TransactionDigest,
978 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
979 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
980 let sandbox_state = self
981 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
982 .await?;
983
984 Ok(sandbox_state)
985 }
986
987 pub async fn execute_state_dump(
988 &mut self,
989 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
990 ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
991 assert!(!self.is_remote_replay());
992
993 let d = match self.fetcher.clone() {
994 Fetchers::NodeStateDump(d) => d,
995 _ => panic!("Invalid fetcher for state dump"),
996 };
997 let tx_digest = d.node_state_dump.clone().tx_digest;
998 let sandbox_state = self
999 .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1000 .await?;
1001
1002 Ok((sandbox_state, d.node_state_dump))
1003 }
1004
1005 pub async fn execute_transaction(
1006 &mut self,
1007 tx_digest: &TransactionDigest,
1008 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1009 use_authority: bool,
1010 executor_version: Option<i64>,
1011 protocol_version: Option<i64>,
1012 enable_profiler: Option<PathBuf>,
1013 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1014 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1015 self.executor_version = executor_version;
1016 self.protocol_version = protocol_version;
1017 self.enable_profiler = enable_profiler;
1018 self.config_and_versions = config_and_versions;
1019 if use_authority {
1020 self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1021 .await
1022 } else {
1023 self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1024 .await
1025 }
1026 }
1027 fn system_package_ids(_protocol_version: u64) -> Vec<ObjectID> {
1028 BuiltInFramework::all_package_ids()
1029 }
1030
1031 pub fn get_or_download_object(
1033 &self,
1034 obj_id: &ObjectID,
1035 package_expected: bool,
1036 ) -> Result<Option<Object>, ReplayEngineError> {
1037 if package_expected {
1038 if let Some(obj) = self
1039 .storage
1040 .package_cache
1041 .lock()
1042 .expect("Cannot lock")
1043 .get(obj_id)
1044 {
1045 return Ok(Some(obj.clone()));
1046 };
1047 } else if let Some(obj) = self
1054 .storage
1055 .live_objects_store
1056 .lock()
1057 .expect("Can't lock")
1058 .get(obj_id)
1059 {
1060 return Ok(Some(obj.clone()));
1061 }
1062
1063 let Some(o) = self.download_latest_object(obj_id)? else {
1064 return Ok(None);
1065 };
1066
1067 if o.is_package() {
1068 assert!(
1069 package_expected,
1070 "Did not expect package but downloaded object is a package: {obj_id}"
1071 );
1072
1073 self.storage
1074 .package_cache
1075 .lock()
1076 .expect("Cannot lock")
1077 .insert(*obj_id, o.clone());
1078 }
1079 let o_ref = o.compute_object_reference();
1080 self.storage
1081 .object_version_cache
1082 .lock()
1083 .expect("Cannot lock")
1084 .insert((o_ref.0, o_ref.1), o.clone());
1085 Ok(Some(o))
1086 }
1087
1088 pub fn is_remote_replay(&self) -> bool {
1089 matches!(self.fetcher, Fetchers::Remote(_))
1090 }
1091
1092 pub fn system_package_versions_for_protocol_version(
1094 &self,
1095 protocol_version: u64,
1096 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1097 match &self.fetcher {
1098 Fetchers::Remote(_) => Ok(self
1099 .protocol_version_system_package_table
1100 .get(&protocol_version)
1101 .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1102 protocol_version,
1103 })?
1104 .clone()
1105 .into_iter()
1106 .collect()),
1107
1108 Fetchers::NodeStateDump(d) => Ok(d
1109 .node_state_dump
1110 .relevant_system_packages
1111 .iter()
1112 .map(|w| (w.id, w.version, w.digest))
1113 .map(|q| (q.0, q.1))
1114 .collect()),
1115 }
1116 }
1117
1118 pub async fn protocol_ver_to_epoch_map(
1119 &self,
1120 ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1121 let mut range_map = BTreeMap::new();
1122 let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1123
1124 let mut tx_digest = *self
1126 .fetcher
1127 .get_checkpoint_txs(0)
1128 .await?
1129 .first()
1130 .expect("Genesis TX must be in first checkpoint");
1131 let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1134 (0, 1, Some(0u64));
1135
1136 let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1137 (start_epoch, start_protocol_version, start_checkpoint);
1138
1139 (start_epoch, start_protocol_version, start_checkpoint) =
1140 (curr_epoch, curr_protocol_version, curr_checkpoint);
1141
1142 let mut end_epoch_tx_digest = tx_digest;
1145
1146 for event in epoch_change_events {
1147 (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1148 end_epoch_tx_digest = event.id.tx_digest;
1149
1150 if start_protocol_version == curr_protocol_version {
1151 continue;
1153 }
1154
1155 curr_checkpoint = self
1158 .fetcher
1159 .get_transaction(&event.id.tx_digest)
1160 .await?
1161 .checkpoint;
1162 range_map.insert(
1164 start_protocol_version,
1165 ProtocolVersionSummary {
1166 protocol_version: start_protocol_version,
1167 epoch_start: start_epoch,
1168 epoch_end: curr_epoch - 1,
1169 checkpoint_start: start_checkpoint,
1170 checkpoint_end: curr_checkpoint.map(|x| x - 1),
1171 epoch_change_tx: tx_digest,
1172 },
1173 );
1174
1175 start_epoch = curr_epoch;
1176 start_protocol_version = curr_protocol_version;
1177 tx_digest = event.id.tx_digest;
1178 start_checkpoint = curr_checkpoint;
1179 }
1180
1181 range_map.insert(
1183 curr_protocol_version,
1184 ProtocolVersionSummary {
1185 protocol_version: curr_protocol_version,
1186 epoch_start: start_epoch,
1187 epoch_end: curr_epoch,
1188 checkpoint_start: curr_checkpoint,
1189 checkpoint_end: self
1190 .fetcher
1191 .get_transaction(&end_epoch_tx_digest)
1192 .await?
1193 .checkpoint,
1194 epoch_change_tx: tx_digest,
1195 },
1196 );
1197
1198 Ok(range_map)
1199 }
1200
1201 pub fn protocol_version_for_epoch(
1202 epoch: u64,
1203 mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1204 ) -> u64 {
1205 let mut version = 1;
1208 for (k, v) in mp.iter().rev() {
1209 if v.1 <= epoch {
1210 version = *k;
1211 break;
1212 }
1213 }
1214 version
1215 }
1216
1217 pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1218 self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1219
1220 let system_package_revisions = self.system_package_versions().await?;
1221
1222 for (
1225 prot_ver,
1226 ProtocolVersionSummary {
1227 epoch_change_tx: tx_digest,
1228 ..
1229 },
1230 ) in self.protocol_version_epoch_table.clone()
1231 {
1232 let mut working = if prot_ver <= 1 {
1234 BTreeMap::new()
1235 } else {
1236 self.protocol_version_system_package_table
1237 .iter()
1238 .rev()
1239 .find(|(ver, _)| **ver <= prot_ver)
1240 .expect("Prev entry must exist")
1241 .1
1242 .clone()
1243 };
1244
1245 for (id, versions) in system_package_revisions.iter() {
1246 for ver in versions.iter().rev() {
1248 if ver.1 == tx_digest {
1249 working.insert(*id, ver.0);
1251 break;
1252 }
1253 }
1254 }
1255 self.protocol_version_system_package_table
1256 .insert(prot_ver, working);
1257 }
1258 Ok(())
1259 }
1260
1261 pub async fn system_package_versions(
1262 &self,
1263 ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1264 {
1265 let system_package_ids = Self::system_package_ids(
1266 *self
1267 .protocol_version_epoch_table
1268 .keys()
1269 .peekable()
1270 .last()
1271 .expect("Protocol version epoch table not populated"),
1272 );
1273 let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1274
1275 let mut mapping = BTreeMap::new();
1276
1277 while !system_package_objs.is_empty() {
1279 let previous_txs: Vec<_> = system_package_objs
1282 .iter()
1283 .map(|o| (o.compute_object_reference(), o.previous_transaction))
1284 .collect();
1285
1286 previous_txs.iter().for_each(|((id, ver, _), tx)| {
1287 mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1288 });
1289
1290 let previous_ver_refs: Vec<_> = previous_txs
1293 .iter()
1294 .filter_map(|(q, _)| {
1295 let prev_ver = u64::from(q.1) - 1;
1296 if prev_ver == 0 {
1297 None
1298 } else {
1299 Some((q.0, SequenceNumber::from(prev_ver)))
1300 }
1301 })
1302 .collect();
1303 system_package_objs = match self.multi_download(&previous_ver_refs).await {
1304 Ok(packages) => packages,
1305 Err(ReplayEngineError::ObjectNotExist { id }) => {
1306 warn!(
1310 "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1311 id
1312 );
1313 break;
1314 }
1315 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1316 warn!(
1320 "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1321 id, version
1322 );
1323 break;
1324 }
1325 Err(ReplayEngineError::ObjectVersionTooHigh {
1326 id,
1327 asked_version,
1328 latest_version,
1329 }) => {
1330 warn!(
1331 "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1332 id, asked_version, latest_version
1333 );
1334 break;
1335 }
1336 Err(ReplayEngineError::ObjectDeleted {
1337 id,
1338 version,
1339 digest,
1340 }) => {
1341 warn!(
1345 "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1346 id, version, digest
1347 );
1348 break;
1349 }
1350 Err(e) => return Err(e),
1351 };
1352 }
1353 Ok(mapping)
1354 }
1355
1356 pub async fn get_protocol_config(
1357 &self,
1358 epoch_id: EpochId,
1359 chain: Chain,
1360 ) -> Result<ProtocolConfig, ReplayEngineError> {
1361 match self.protocol_version {
1362 Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1363 Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1364 None => self
1365 .protocol_version_epoch_table
1366 .iter()
1367 .rev()
1368 .find(|(_, rg)| epoch_id >= rg.epoch_start)
1369 .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1370 .unwrap_or_else(|| {
1371 Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1372 }),
1373 }
1374 }
1375
1376 pub async fn checkpoints_for_epoch(
1377 &self,
1378 epoch_id: u64,
1379 ) -> Result<(u64, u64), ReplayEngineError> {
1380 let epoch_change_events = self
1381 .fetcher
1382 .get_epoch_change_events(true)
1383 .await?
1384 .into_iter()
1385 .collect::<Vec<_>>();
1386 let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1387 (0, 1)
1388 } else {
1389 let idx = epoch_change_events
1390 .iter()
1391 .position(|ev| match extract_epoch_and_version(ev.clone()) {
1392 Ok((epoch, _)) => epoch == epoch_id,
1393 Err(_) => false,
1394 })
1395 .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1396 let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1397 (
1398 self.fetcher
1399 .get_transaction(&epoch_change_tx)
1400 .await?
1401 .checkpoint
1402 .unwrap_or_else(|| {
1403 panic!(
1404 "Checkpoint for transaction {epoch_change_tx} not present. Could be due to pruning"
1405 )
1406 }),
1407 idx,
1408 )
1409 };
1410
1411 let next_epoch_change_tx = epoch_change_events
1412 .get(start_epoch_idx + 1)
1413 .map(|v| v.id.tx_digest)
1414 .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1415
1416 let next_epoch_checkpoint = self
1417 .fetcher
1418 .get_transaction(&next_epoch_change_tx)
1419 .await?
1420 .checkpoint
1421 .unwrap_or_else(|| {
1422 panic!(
1423 "Checkpoint for transaction {next_epoch_change_tx} not present. Could be due to pruning"
1424 )
1425 });
1426
1427 Ok((start_checkpoint, next_epoch_checkpoint - 1))
1428 }
1429
1430 pub async fn get_epoch_start_timestamp_and_rgp(
1431 &self,
1432 epoch_id: u64,
1433 tx_digest: &TransactionDigest,
1434 ) -> Result<(u64, u64), ReplayEngineError> {
1435 if epoch_id == 0 {
1436 return Err(ReplayEngineError::TransactionNotSupported {
1437 digest: *tx_digest,
1438 reason: "Transactions from epoch 0 not supported".to_string(),
1439 });
1440 }
1441 self.fetcher
1442 .get_epoch_start_timestamp_and_rgp(epoch_id)
1443 .await
1444 }
1445
1446 fn add_config_objects_if_needed(
1447 &self,
1448 status: &IotaExecutionStatus,
1449 ) -> Vec<(ObjectID, SequenceNumber)> {
1450 match parse_effect_error_for_denied_coins(status) {
1451 Some(coin_type) => {
1452 let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1453 panic!(
1454 "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1455 );
1456 };
1457 if !config_id_and_version
1459 .iter()
1460 .any(|(id, _)| id == &IOTA_DENY_LIST_OBJECT_ID)
1461 {
1462 let deny_list_oid_version = self.download_latest_object(&IOTA_DENY_LIST_OBJECT_ID)
1463 .ok()
1464 .flatten()
1465 .expect("Unable to download the deny list object for a transaction that requires it")
1466 .version();
1467 config_id_and_version.push((IOTA_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1468 }
1469 config_id_and_version
1470 }
1471 None => vec![],
1472 }
1473 }
1474
1475 async fn resolve_tx_components(
1476 &self,
1477 tx_digest: &TransactionDigest,
1478 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1479 assert!(self.is_remote_replay());
1480 let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1482 let sender = match tx_info.clone().transaction.unwrap().data {
1483 iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.sender,
1484 };
1485 let IotaTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1486
1487 let config_objects = self.add_config_objects_if_needed(effects.status());
1488
1489 let raw_tx_bytes = tx_info.clone().raw_transaction;
1490 let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1491 let input_objs = orig_tx
1492 .transaction_data()
1493 .input_objects()
1494 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1495 let tx_kind_orig = orig_tx.transaction_data().kind();
1496
1497 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1499
1500 let shared_object_refs: Vec<ObjectRef> = effects
1501 .shared_objects()
1502 .iter()
1503 .map(|so_ref| {
1504 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1505 unimplemented!(
1506 "Replay of deleted shared object transactions is not supported yet"
1507 );
1508 } else {
1509 so_ref.to_object_ref()
1510 }
1511 })
1512 .collect();
1513 let gas_data = match tx_info.clone().transaction.unwrap().data {
1514 iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.gas_data,
1515 };
1516 let gas_object_refs: Vec<_> = gas_data
1517 .payment
1518 .iter()
1519 .map(|obj_ref| obj_ref.to_object_ref())
1520 .collect();
1521 let receiving_objs = orig_tx
1522 .transaction_data()
1523 .receiving_objects()
1524 .into_iter()
1525 .map(|(obj_id, version, _)| (obj_id, version))
1526 .collect();
1527
1528 let epoch_id = effects.executed_epoch;
1529 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1530
1531 let (epoch_start_timestamp, reference_gas_price) = self
1533 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1534 .await?;
1535
1536 Ok(OnChainTransactionInfo {
1537 kind: tx_kind_orig.clone(),
1538 sender,
1539 modified_at_versions,
1540 input_objects: input_objs,
1541 shared_object_refs,
1542 gas: gas_object_refs,
1543 gas_budget: gas_data.budget,
1544 gas_price: gas_data.price,
1545 executed_epoch: epoch_id,
1546 dependencies: effects.dependencies().to_vec(),
1547 effects: IotaTransactionBlockEffects::V1(effects),
1548 receiving_objs,
1549 config_objects,
1550 protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1554 tx_digest: *tx_digest,
1555 epoch_start_timestamp,
1556 sender_signed_data: orig_tx.clone(),
1557 reference_gas_price,
1558 chain,
1559 })
1560 }
1561
1562 async fn resolve_tx_components_from_dump(
1563 &self,
1564 tx_digest: &TransactionDigest,
1565 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1566 assert!(!self.is_remote_replay());
1567
1568 let dp = self.fetcher.as_node_state_dump();
1569
1570 let sender = dp
1571 .node_state_dump
1572 .sender_signed_data
1573 .transaction_data()
1574 .sender();
1575 let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1576 let effects = dp.node_state_dump.computed_effects.clone();
1577 let effects = IotaTransactionBlockEffects::try_from(effects).unwrap();
1578 let config_objects = self.add_config_objects_if_needed(effects.status());
1581
1582 let input_objs = orig_tx
1586 .transaction_data()
1587 .input_objects()
1588 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1589 let tx_kind_orig = orig_tx.transaction_data().kind();
1590
1591 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1593
1594 let shared_object_refs: Vec<ObjectRef> = effects
1595 .shared_objects()
1596 .iter()
1597 .map(|so_ref| {
1598 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1599 unimplemented!(
1600 "Replay of deleted shared object transactions is not supported yet"
1601 );
1602 } else {
1603 so_ref.to_object_ref()
1604 }
1605 })
1606 .collect();
1607 let gas_data = orig_tx.transaction_data().gas_data();
1608 let gas_object_refs: Vec<_> = gas_data.clone().payment;
1609 let receiving_objs = orig_tx
1610 .transaction_data()
1611 .receiving_objects()
1612 .into_iter()
1613 .map(|(obj_id, version, _)| (obj_id, version))
1614 .collect();
1615
1616 let epoch_id = dp.node_state_dump.executed_epoch;
1617
1618 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1619
1620 let protocol_config =
1621 ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1622 let (epoch_start_timestamp, reference_gas_price) = self
1624 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1625 .await?;
1626
1627 Ok(OnChainTransactionInfo {
1628 kind: tx_kind_orig.clone(),
1629 sender,
1630 modified_at_versions,
1631 input_objects: input_objs,
1632 shared_object_refs,
1633 gas: gas_object_refs,
1634 gas_budget: gas_data.budget,
1635 gas_price: gas_data.price,
1636 executed_epoch: epoch_id,
1637 dependencies: effects.dependencies().to_vec(),
1638 effects,
1639 receiving_objs,
1640 config_objects,
1641 protocol_version: protocol_config.version,
1642 tx_digest: *tx_digest,
1643 epoch_start_timestamp,
1644 sender_signed_data: orig_tx.clone(),
1645 reference_gas_price,
1646 chain,
1647 })
1648 }
1649
1650 async fn resolve_download_input_objects(
1651 &mut self,
1652 tx_info: &OnChainTransactionInfo,
1653 deleted_shared_objects: Vec<ObjectRef>,
1654 ) -> Result<InputObjects, ReplayEngineError> {
1655 let mut package_inputs = vec![];
1657 let mut imm_owned_inputs = vec![];
1658 let mut shared_inputs = vec![];
1659 let mut deleted_shared_info_map = BTreeMap::new();
1660
1661 if !deleted_shared_objects.is_empty() {
1665 for tx_digest in tx_info.dependencies.iter() {
1666 let tx_info = self.resolve_tx_components(tx_digest).await?;
1667 for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1668 deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1669 }
1670 }
1671 }
1672
1673 tx_info
1674 .input_objects
1675 .iter()
1676 .map(|kind| match kind {
1677 InputObjectKind::MovePackage(i) => {
1678 package_inputs.push(*i);
1679 Ok(())
1680 }
1681 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1682 imm_owned_inputs.push((o_ref.0, o_ref.1));
1683 Ok(())
1684 }
1685 InputObjectKind::SharedMoveObject {
1686 id,
1687 initial_shared_version: _,
1688 mutable: _,
1689 } if !deleted_shared_info_map.contains_key(id) => {
1690 if let Some(o) = self
1692 .storage
1693 .live_objects_store
1694 .lock()
1695 .expect("Can't lock")
1696 .get(id)
1697 {
1698 shared_inputs.push(o.clone());
1699 Ok(())
1700 } else {
1701 Err(ReplayEngineError::InternalCacheInvariantViolation {
1702 id: *id,
1703 version: None,
1704 })
1705 }
1706 }
1707 _ => Ok(()),
1708 })
1709 .collect::<Result<Vec<_>, _>>()?;
1710
1711 let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1713
1714 in_objs.extend(
1717 self.multi_download_relevant_packages_and_store(
1718 package_inputs,
1719 tx_info.protocol_version.as_u64(),
1720 )
1721 .await?,
1722 );
1723 in_objs.extend(shared_inputs);
1725
1726 let resolved_input_objs = tx_info
1729 .input_objects
1730 .iter()
1731 .flat_map(|kind| match kind {
1732 InputObjectKind::MovePackage(i) => {
1733 Some(ObjectReadResult::new(
1735 *kind,
1736 self.storage
1737 .package_cache
1738 .lock()
1739 .expect("Cannot lock")
1740 .get(i)
1741 .unwrap_or(
1742 &self
1743 .download_latest_object(i)
1744 .expect("Object download failed")
1745 .expect("Object not found on chain"),
1746 )
1747 .clone()
1748 .into(),
1749 ))
1750 }
1751 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1752 *kind,
1753 self.storage
1754 .object_version_cache
1755 .lock()
1756 .expect("Cannot lock")
1757 .get(&(o_ref.0, o_ref.1))
1758 .unwrap()
1759 .clone()
1760 .into(),
1761 )),
1762 InputObjectKind::SharedMoveObject { id, .. }
1763 if !deleted_shared_info_map.contains_key(id) =>
1764 {
1765 Some(ObjectReadResult::new(
1767 *kind,
1768 self.storage
1769 .live_objects_store
1770 .lock()
1771 .expect("Can't lock")
1772 .get(id)
1773 .unwrap()
1774 .clone()
1775 .into(),
1776 ))
1777 }
1778 InputObjectKind::SharedMoveObject { id, .. } => {
1779 let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1780 Some(ObjectReadResult::new(
1781 *kind,
1782 ObjectReadResultKind::DeletedSharedObject(*version, *digest),
1783 ))
1784 }
1785 })
1786 .collect();
1787
1788 Ok(InputObjects::new(resolved_input_objs))
1789 }
1790
1791 async fn initialize_execution_env_state(
1794 &mut self,
1795 tx_info: &OnChainTransactionInfo,
1796 ) -> Result<InputObjects, ReplayEngineError> {
1797 self.current_protocol_version = tx_info.protocol_version.as_u64();
1799
1800 self.multi_download_and_store(&tx_info.modified_at_versions)
1802 .await?;
1803
1804 let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1805 .shared_object_refs
1806 .iter()
1807 .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1808
1809 let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1811 self.multi_download_and_store(&shared_refs).await?;
1812
1813 let gas_refs: Vec<_> = tx_info
1816 .gas
1817 .iter()
1818 .filter_map(|w| (w.0 != ObjectID::ZERO).then_some((w.0, w.1)))
1819 .collect();
1820 self.multi_download_and_store(&gas_refs).await?;
1821
1822 let input_objs = self
1824 .resolve_download_input_objects(tx_info, deleted_shared_refs)
1825 .await?;
1826
1827 self.multi_download_and_store(&tx_info.receiving_objs)
1829 .await?;
1830
1831 self.multi_download_and_store(&tx_info.config_objects)
1833 .await?;
1834
1835 let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1839 self.multi_download_and_store(&loaded_child_refs).await?;
1840 tokio::task::yield_now().await;
1841
1842 Ok(input_objs)
1843 }
1844}
1845
1846impl BackingPackageStore for LocalExec {
1850 fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1854 fn inner(self_: &LocalExec, package_id: &ObjectID) -> IotaResult<Option<Object>> {
1855 self_
1857 .get_or_download_object(package_id, true )
1858 .map_err(|e| IotaError::Storage(e.to_string()))
1859 }
1860
1861 let res = inner(self, package_id);
1862 self.exec_store_events
1863 .lock()
1864 .expect("Unable to lock events list")
1865 .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1866 package_id: *package_id,
1867 result: res.clone(),
1868 });
1869 res.map(|o| o.map(PackageObject::new))
1870 }
1871}
1872
1873impl ChildObjectResolver for LocalExec {
1874 fn read_child_object(
1877 &self,
1878 parent: &ObjectID,
1879 child: &ObjectID,
1880 child_version_upper_bound: SequenceNumber,
1881 ) -> IotaResult<Option<Object>> {
1882 fn inner(
1883 self_: &LocalExec,
1884 parent: &ObjectID,
1885 child: &ObjectID,
1886 child_version_upper_bound: SequenceNumber,
1887 ) -> IotaResult<Option<Object>> {
1888 let child_object =
1889 match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1890 None => return Ok(None),
1891 Some(o) => o,
1892 };
1893 let child_version = child_object.version();
1894 if child_object.version() > child_version_upper_bound {
1895 return Err(IotaError::Unknown(format!(
1896 "Invariant Violation. Replay loaded child_object {child} at version \
1897 {child_version} but expected the version to be <= {child_version_upper_bound}"
1898 )));
1899 }
1900 let parent = *parent;
1901 if child_object.owner != Owner::ObjectOwner(parent.into()) {
1902 return Err(IotaError::InvalidChildObjectAccess {
1903 object: *child,
1904 given_parent: parent,
1905 actual_owner: child_object.owner,
1906 });
1907 }
1908 Ok(Some(child_object))
1909 }
1910
1911 let res = inner(self, parent, child, child_version_upper_bound);
1912 self.exec_store_events
1913 .lock()
1914 .expect("Unable to lock events list")
1915 .push(
1916 ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1917 parent: *parent,
1918 child: *child,
1919 result: res.clone(),
1920 },
1921 );
1922 res
1923 }
1924
1925 fn get_object_received_at_version(
1926 &self,
1927 owner: &ObjectID,
1928 receiving_object_id: &ObjectID,
1929 receive_object_at_version: SequenceNumber,
1930 _epoch_id: EpochId,
1931 ) -> IotaResult<Option<Object>> {
1932 fn inner(
1933 self_: &LocalExec,
1934 owner: &ObjectID,
1935 receiving_object_id: &ObjectID,
1936 receive_object_at_version: SequenceNumber,
1937 ) -> IotaResult<Option<Object>> {
1938 let recv_object = match self_.get_object(receiving_object_id)? {
1939 None => return Ok(None),
1940 Some(o) => o,
1941 };
1942 if recv_object.version() != receive_object_at_version {
1943 return Err(IotaError::Unknown(format!(
1944 "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
1945 {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
1946 )));
1947 }
1948 if recv_object.owner != Owner::AddressOwner((*owner).into()) {
1949 return Ok(None);
1950 }
1951 Ok(Some(recv_object))
1952 }
1953
1954 let res = inner(self, owner, receiving_object_id, receive_object_at_version);
1955 self.exec_store_events
1956 .lock()
1957 .expect("Unable to lock events list")
1958 .push(ExecutionStoreEvent::ReceiveObject {
1959 owner: *owner,
1960 receive: *receiving_object_id,
1961 receive_at_version: receive_object_at_version,
1962 result: res.clone(),
1963 });
1964 res
1965 }
1966}
1967
1968impl ResourceResolver for LocalExec {
1969 type Error = IotaError;
1970
1971 fn get_resource(
1975 &self,
1976 address: &AccountAddress,
1977 typ: &StructTag,
1978 ) -> IotaResult<Option<Vec<u8>>> {
1979 fn inner(
1980 self_: &LocalExec,
1981 address: &AccountAddress,
1982 typ: &StructTag,
1983 ) -> IotaResult<Option<Vec<u8>>> {
1984 let Some(object) = self_.get_or_download_object(
1986 &ObjectID::from(*address),
1987 false, )?
1989 else {
1990 return Ok(None);
1991 };
1992
1993 match &object.data {
1994 Data::Move(m) => {
1995 assert!(
1996 m.is_type(typ),
1997 "Invariant violation: ill-typed object in storage \
1998 or bad object request from caller"
1999 );
2000 Ok(Some(m.contents().to_vec()))
2001 }
2002 other => unimplemented!(
2003 "Bad object lookup: expected Move object, but got {:#?}",
2004 other
2005 ),
2006 }
2007 }
2008
2009 let res = inner(self, address, typ);
2010 self.exec_store_events
2011 .lock()
2012 .expect("Unable to lock events list")
2013 .push(ExecutionStoreEvent::ResourceResolverGetResource {
2014 address: *address,
2015 typ: typ.clone(),
2016 result: res.clone(),
2017 });
2018 res
2019 }
2020}
2021
2022impl ModuleResolver for LocalExec {
2023 type Error = IotaError;
2024
2025 fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2028 fn inner(self_: &LocalExec, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2029 get_module(self_, module_id)
2030 }
2031
2032 let res = inner(self, module_id);
2033 self.exec_store_events
2034 .lock()
2035 .expect("Unable to lock events list")
2036 .push(ExecutionStoreEvent::ModuleResolverGetModule {
2037 module_id: module_id.clone(),
2038 result: res.clone(),
2039 });
2040 res
2041 }
2042}
2043
2044impl ModuleResolver for &mut LocalExec {
2045 type Error = IotaError;
2046
2047 fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2048 (**self).get_module(module_id)
2051 }
2052}
2053
2054impl ObjectStore for LocalExec {
2055 fn get_object(
2058 &self,
2059 object_id: &ObjectID,
2060 ) -> iota_types::storage::error::Result<Option<Object>> {
2061 let res = self
2062 .storage
2063 .live_objects_store
2064 .lock()
2065 .expect("Can't lock")
2066 .get(object_id)
2067 .cloned();
2068 self.exec_store_events
2069 .lock()
2070 .expect("Unable to lock events list")
2071 .push(ExecutionStoreEvent::ObjectStoreGetObject {
2072 object_id: *object_id,
2073 result: Ok(res.clone()),
2074 });
2075 Ok(res)
2076 }
2077
2078 fn get_object_by_key(
2081 &self,
2082 object_id: &ObjectID,
2083 version: VersionNumber,
2084 ) -> iota_types::storage::error::Result<Option<Object>> {
2085 let res = self
2086 .storage
2087 .live_objects_store
2088 .lock()
2089 .expect("Can't lock")
2090 .get(object_id)
2091 .and_then(|obj| {
2092 if obj.version() == version {
2093 Some(obj.clone())
2094 } else {
2095 None
2096 }
2097 });
2098
2099 self.exec_store_events
2100 .lock()
2101 .expect("Unable to lock events list")
2102 .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2103 object_id: *object_id,
2104 version,
2105 result: Ok(res.clone()),
2106 });
2107
2108 Ok(res)
2109 }
2110}
2111
2112impl ObjectStore for &mut LocalExec {
2113 fn get_object(
2114 &self,
2115 object_id: &ObjectID,
2116 ) -> iota_types::storage::error::Result<Option<Object>> {
2117 (**self).get_object(object_id)
2120 }
2121
2122 fn get_object_by_key(
2123 &self,
2124 object_id: &ObjectID,
2125 version: VersionNumber,
2126 ) -> iota_types::storage::error::Result<Option<Object>> {
2127 (**self).get_object_by_key(object_id, version)
2130 }
2131}
2132
2133impl GetModule for LocalExec {
2134 type Error = IotaError;
2135 type Item = CompiledModule;
2136
2137 fn get_module_by_id(&self, id: &ModuleId) -> IotaResult<Option<Self::Item>> {
2138 let res = get_module_by_id(self, id);
2139
2140 self.exec_store_events
2141 .lock()
2142 .expect("Unable to lock events list")
2143 .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2144 id: id.clone(),
2145 result: res.clone(),
2146 });
2147 res
2148 }
2149}
2150
2151pub fn get_executor(
2154 executor_version_override: Option<i64>,
2155 protocol_config: &ProtocolConfig,
2156 _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2157 enable_profiler: Option<PathBuf>,
2158) -> Arc<dyn Executor + Send + Sync> {
2159 let protocol_config = executor_version_override
2160 .map(|q| {
2161 let ver = if q < 0 {
2162 ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2163 } else {
2164 q as u64
2165 };
2166
2167 let mut c = protocol_config.clone();
2168 c.set_execution_version_for_testing(ver);
2169 c
2170 })
2171 .unwrap_or(protocol_config.clone());
2172
2173 let silent = true;
2174 iota_execution::executor(&protocol_config, silent, enable_profiler)
2175 .expect("Creating an executor should not fail here")
2176}
2177
2178fn parse_effect_error_for_denied_coins(status: &IotaExecutionStatus) -> Option<String> {
2179 let IotaExecutionStatus::Failure { error } = status else {
2180 return None;
2181 };
2182 parse_denied_error_string(error)
2183}
2184
2185fn parse_denied_error_string(error: &str) -> Option<String> {
2186 let regulated_regex = regex::Regex::new(
2187 r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2188 )
2189 .unwrap();
2190
2191 let caps = regulated_regex.captures(error)?;
2192 Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2193}
2194
2195#[cfg(test)]
2196mod tests {
2197 use super::parse_denied_error_string;
2198 #[test]
2199 fn test_regex_regulated_coin_errors() {
2200 let test_bank = vec![
2201 "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2202 "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2203 ];
2204 let expected_string =
2205 "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2206
2207 for test in &test_bank {
2208 assert!(parse_denied_error_string(test).unwrap() == expected_string);
2209 }
2210 }
2211}