1use std::{
6 collections::{BTreeMap, HashSet},
7 path::PathBuf,
8 sync::{Arc, Mutex},
9};
10
11use futures::executor::block_on;
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_core::authority::NodeStateDump;
14use iota_execution::Executor;
15use iota_framework::BuiltInFramework;
16use iota_json_rpc_types::{
17 IotaExecutionStatus, IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI,
18};
19use iota_protocol_config::{Chain, ProtocolConfig};
20use iota_sdk::{IotaClient, IotaClientBuilder};
21use iota_types::{
22 IOTA_DENY_LIST_OBJECT_ID,
23 base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
24 committee::EpochId,
25 digests::{ObjectDigest, TransactionDigest},
26 error::{ExecutionError, IotaError, IotaResult},
27 executable_transaction::VerifiedExecutableTransaction,
28 gas::IotaGasStatus,
29 in_memory_storage::InMemoryStorage,
30 inner_temporary_store::InnerTemporaryStore,
31 message_envelope::Message,
32 metrics::LimitsMetrics,
33 object::{Data, Object, Owner},
34 storage::{
35 BackingPackageStore, ChildObjectResolver, ObjectStore, PackageObject, get_module,
36 get_module_by_id,
37 },
38 transaction::{
39 CheckedInputObjects, InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
40 SenderSignedData, Transaction, TransactionDataAPI, TransactionKind,
41 TransactionKind::ProgrammableTransaction, VerifiedTransaction,
42 },
43};
44use move_binary_format::CompiledModule;
45use move_bytecode_utils::module_cache::GetModule;
46use move_core_types::{
47 account_address::AccountAddress,
48 language_storage::{ModuleId, StructTag},
49 resolver::{ModuleResolver, ResourceResolver},
50};
51use prometheus::Registry;
52use serde::{Deserialize, Serialize};
53use similar::{ChangeTag, TextDiff};
54use tracing::{error, info, trace, warn};
55
56use crate::{
57 chain_from_chain_id,
58 data_fetcher::{
59 DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
60 },
61 displays::{
62 Pretty,
63 transaction_displays::{FullPTB, transform_command_results_to_annotated},
64 },
65 types::*,
66};
67
68#[derive(Debug, Serialize, Deserialize)]
71pub struct ExecutionSandboxState {
72 pub transaction_info: OnChainTransactionInfo,
74 pub required_objects: Vec<Object>,
76 #[serde(skip)]
79 pub local_exec_temporary_store: Option<InnerTemporaryStore>,
80 pub local_exec_effects: IotaTransactionBlockEffects,
82 #[serde(skip)]
84 pub local_exec_status: Option<Result<(), ExecutionError>>,
85}
86
87impl ExecutionSandboxState {
88 pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
89 if self.transaction_info.effects != self.local_exec_effects {
90 error!("Replay tool forked {}", self.transaction_info.tx_digest);
91 let diff = self.diff_effects();
92 println!("{}", diff);
93 return Err(ReplayEngineError::EffectsForked {
94 digest: self.transaction_info.tx_digest,
95 diff: format!("\n{}", diff),
96 on_chain: Box::new(self.transaction_info.effects.clone()),
97 local: Box::new(self.local_exec_effects.clone()),
98 });
99 }
100 Ok(())
101 }
102
103 pub fn diff_effects(&self) -> String {
105 let eff1 = &self.transaction_info.effects;
106 let eff2 = &self.local_exec_effects;
107 let on_chain_str = format!("{:#?}", eff1);
108 let local_chain_str = format!("{:#?}", eff2);
109 let mut res = vec![];
110
111 let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
112 for change in diff.iter_all_changes() {
113 let sign = match change.tag() {
114 ChangeTag::Delete => "---",
115 ChangeTag::Insert => "+++",
116 ChangeTag::Equal => " ",
117 };
118 res.push(format!("{}{}", sign, change));
119 }
120
121 res.join("")
122 }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct ProtocolVersionSummary {
127 pub protocol_version: u64,
129 pub epoch_start: u64,
131 pub epoch_end: u64,
133 pub checkpoint_start: Option<u64>,
135 pub checkpoint_end: Option<u64>,
137 pub epoch_change_tx: TransactionDigest,
139}
140
141#[derive(Clone)]
142pub struct Storage {
143 pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
148
149 pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
152 pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
155}
156
157impl std::fmt::Display for Storage {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 writeln!(f, "Live object store")?;
160 for (id, obj) in self
161 .live_objects_store
162 .lock()
163 .expect("Unable to lock")
164 .iter()
165 {
166 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
167 }
168 writeln!(f, "Package cache")?;
169 for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
170 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
171 }
172 writeln!(f, "Object version cache")?;
173 for (id, _) in self
174 .object_version_cache
175 .lock()
176 .expect("Unable to lock")
177 .iter()
178 {
179 writeln!(f, "{}: {}", id.0, id.1)?;
180 }
181
182 write!(f, "")
183 }
184}
185
186impl Storage {
187 pub fn default() -> Self {
188 Self {
189 live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
190 package_cache: Arc::new(Mutex::new(BTreeMap::new())),
191 object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
192 }
193 }
194
195 pub fn all_objects(&self) -> Vec<Object> {
196 self.live_objects_store
197 .lock()
198 .expect("Unable to lock")
199 .values()
200 .cloned()
201 .chain(
202 self.package_cache
203 .lock()
204 .expect("Unable to lock")
205 .iter()
206 .map(|(_, obj)| obj.clone()),
207 )
208 .chain(
209 self.object_version_cache
210 .lock()
211 .expect("Unable to lock")
212 .iter()
213 .map(|(_, obj)| obj.clone()),
214 )
215 .collect::<Vec<_>>()
216 }
217}
218
219#[derive(Clone)]
220pub struct LocalExec {
221 pub client: Option<IotaClient>,
222 pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
225 pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
227 pub current_protocol_version: u64,
229 pub storage: Storage,
231 pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
233 pub metrics: Arc<LimitsMetrics>,
235 pub fetcher: Fetchers,
237
238 pub executor_version: Option<i64>,
241 pub protocol_version: Option<i64>,
245 pub enable_profiler: Option<PathBuf>,
248 pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
249 pub num_retries_for_timeout: u32,
251 pub sleep_period_for_timeout: std::time::Duration,
252}
253
254impl LocalExec {
255 pub async fn multi_download(
258 &self,
259 objs: &[(ObjectID, SequenceNumber)],
260 ) -> Result<Vec<Object>, ReplayEngineError> {
261 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
262 while num_retries_for_timeout >= 0 {
263 match self.fetcher.multi_get_versioned(objs).await {
264 Ok(objs) => return Ok(objs),
265 Err(ReplayEngineError::IotaRpcRequestTimeout) => {
266 warn!(
267 "RPC request timed out. Retries left {}. Sleeping for {}s",
268 num_retries_for_timeout,
269 self.sleep_period_for_timeout.as_secs()
270 );
271 num_retries_for_timeout -= 1;
272 tokio::time::sleep(self.sleep_period_for_timeout).await;
273 }
274 Err(e) => return Err(e),
275 }
276 }
277 Err(ReplayEngineError::IotaRpcRequestTimeout)
278 }
279 pub async fn multi_download_latest(
282 &self,
283 objs: &[ObjectID],
284 ) -> Result<Vec<Object>, ReplayEngineError> {
285 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
286 while num_retries_for_timeout >= 0 {
287 match self.fetcher.multi_get_latest(objs).await {
288 Ok(objs) => return Ok(objs),
289 Err(ReplayEngineError::IotaRpcRequestTimeout) => {
290 warn!(
291 "RPC request timed out. Retries left {}. Sleeping for {}s",
292 num_retries_for_timeout,
293 self.sleep_period_for_timeout.as_secs()
294 );
295 num_retries_for_timeout -= 1;
296 tokio::time::sleep(self.sleep_period_for_timeout).await;
297 }
298 Err(e) => return Err(e),
299 }
300 }
301 Err(ReplayEngineError::IotaRpcRequestTimeout)
302 }
303
304 pub async fn fetch_loaded_child_refs(
305 &self,
306 tx_digest: &TransactionDigest,
307 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
308 self.fetcher.get_loaded_child_objects(tx_digest).await
310 }
311
312 pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
313 Self::new_for_remote(
314 IotaClientBuilder::default()
315 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
316 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
317 .build(http_url)
318 .await?,
319 None,
320 )
321 .await
322 }
323
324 pub async fn replay_with_network_config(
325 rpc_url: String,
326 tx_digest: TransactionDigest,
327 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
328 use_authority: bool,
329 executor_version: Option<i64>,
330 protocol_version: Option<i64>,
331 enable_profiler: Option<PathBuf>,
332 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
333 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
334 info!("Using RPC URL: {}", rpc_url);
335 LocalExec::new_from_fn_url(&rpc_url)
336 .await?
337 .init_for_execution()
338 .await?
339 .execute_transaction(
340 &tx_digest,
341 expensive_safety_check_config,
342 use_authority,
343 executor_version,
344 protocol_version,
345 enable_profiler,
346 config_and_versions,
347 )
348 .await
349 }
350
351 pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
356 self.populate_protocol_version_tables().await?;
357 tokio::task::yield_now().await;
358 Ok(self)
359 }
360
361 pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
362 Self::new_for_remote(
363 self.client.expect("Remote client not initialized"),
364 Some(self.fetcher.into_remote()),
365 )
366 .await?
367 .init_for_execution()
368 .await
369 }
370
371 pub async fn new_for_remote(
372 client: IotaClient,
373 remote_fetcher: Option<RemoteFetcher>,
374 ) -> Result<Self, ReplayEngineError> {
375 let registry = prometheus::Registry::new();
377 let metrics = Arc::new(LimitsMetrics::new(®istry));
378
379 let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
380
381 Ok(Self {
382 client: Some(client),
383 protocol_version_epoch_table: BTreeMap::new(),
384 protocol_version_system_package_table: BTreeMap::new(),
385 current_protocol_version: 0,
386 exec_store_events: Arc::new(Mutex::new(Vec::new())),
387 metrics,
388 storage: Storage::default(),
389 fetcher: Fetchers::Remote(fetcher),
390 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
392 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
393 executor_version: None,
394 protocol_version: None,
395 enable_profiler: None,
396 config_and_versions: None,
397 })
398 }
399
400 pub async fn new_for_state_dump(
401 path: &str,
402 backup_rpc_url: Option<String>,
403 ) -> Result<Self, ReplayEngineError> {
404 let registry = prometheus::Registry::new();
406 let metrics = Arc::new(LimitsMetrics::new(®istry));
407
408 let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
409 let current_protocol_version = state.protocol_version;
410 let fetcher = match backup_rpc_url {
411 Some(url) => NodeStateDumpFetcher::new(
412 state,
413 Some(RemoteFetcher::new(
414 IotaClientBuilder::default()
415 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
416 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
417 .build(url)
418 .await?,
419 )),
420 ),
421 None => NodeStateDumpFetcher::new(state, None),
422 };
423
424 Ok(Self {
425 client: None,
426 protocol_version_epoch_table: BTreeMap::new(),
427 protocol_version_system_package_table: BTreeMap::new(),
428 current_protocol_version,
429 exec_store_events: Arc::new(Mutex::new(Vec::new())),
430 metrics,
431 storage: Storage::default(),
432 fetcher: Fetchers::NodeStateDump(fetcher),
433 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
435 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
436 executor_version: None,
437 protocol_version: None,
438 enable_profiler: None,
439 config_and_versions: None,
440 })
441 }
442
443 pub async fn multi_download_and_store(
444 &mut self,
445 objs: &[(ObjectID, SequenceNumber)],
446 ) -> Result<Vec<Object>, ReplayEngineError> {
447 let objs = self.multi_download(objs).await?;
448
449 for obj in objs.iter() {
451 let o_ref = obj.compute_object_reference();
452 self.storage
453 .live_objects_store
454 .lock()
455 .expect("Can't lock")
456 .insert(o_ref.0, obj.clone());
457 self.storage
458 .object_version_cache
459 .lock()
460 .expect("Cannot lock")
461 .insert((o_ref.0, o_ref.1), obj.clone());
462 if obj.is_package() {
463 self.storage
464 .package_cache
465 .lock()
466 .expect("Cannot lock")
467 .insert(o_ref.0, obj.clone());
468 }
469 }
470 tokio::task::yield_now().await;
471 Ok(objs)
472 }
473
474 pub async fn multi_download_relevant_packages_and_store(
475 &mut self,
476 objs: Vec<ObjectID>,
477 protocol_version: u64,
478 ) -> Result<Vec<Object>, ReplayEngineError> {
479 let syst_packages = self.system_package_versions_for_protocol_version(protocol_version)?;
480 let syst_packages_objs = self.multi_download(&syst_packages).await?;
481
482 let non_system_package_objs: Vec<_> = objs
485 .into_iter()
486 .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
487 .collect();
488 let objs = self
489 .multi_download_latest(&non_system_package_objs)
490 .await?
491 .into_iter()
492 .chain(syst_packages_objs.into_iter());
493
494 for obj in objs.clone() {
495 let o_ref = obj.compute_object_reference();
496 self.storage
499 .object_version_cache
500 .lock()
501 .expect("Cannot lock")
502 .insert((o_ref.0, o_ref.1), obj.clone());
503 if obj.is_package() {
504 self.storage
505 .package_cache
506 .lock()
507 .expect("Cannot lock")
508 .insert(o_ref.0, obj.clone());
509 }
510 }
511 Ok(objs.collect())
512 }
513
514 #[expect(clippy::disallowed_methods)]
516 pub fn download_object(
517 &self,
518 object_id: &ObjectID,
519 version: SequenceNumber,
520 ) -> Result<Object, ReplayEngineError> {
521 if self
522 .storage
523 .object_version_cache
524 .lock()
525 .expect("Cannot lock")
526 .contains_key(&(*object_id, version))
527 {
528 return Ok(self
529 .storage
530 .object_version_cache
531 .lock()
532 .expect("Cannot lock")
533 .get(&(*object_id, version))
534 .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
535 id: *object_id,
536 version: Some(version),
537 })?
538 .clone());
539 }
540
541 let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
542 q.pop().unwrap_or_else(|| {
543 panic!(
544 "Downloaded obj response cannot be empty {:?}",
545 (*object_id, version)
546 )
547 })
548 })?;
549
550 let o_ref = o.compute_object_reference();
551 self.storage
552 .object_version_cache
553 .lock()
554 .expect("Cannot lock")
555 .insert((o_ref.0, o_ref.1), o.clone());
556 Ok(o)
557 }
558
559 #[expect(clippy::disallowed_methods)]
561 pub fn download_latest_object(
562 &self,
563 object_id: &ObjectID,
564 ) -> Result<Option<Object>, ReplayEngineError> {
565 let resp = block_on({
566 self.multi_download_latest(&[*object_id])
568 })
569 .map(|mut q| {
570 q.pop()
571 .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
572 });
573
574 match resp {
575 Ok(v) => Ok(Some(v)),
576 Err(ReplayEngineError::ObjectNotExist { id }) => {
577 error!(
578 "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
579 );
580 Ok(None)
581 }
582 Err(ReplayEngineError::ObjectDeleted {
583 id,
584 version,
585 digest,
586 }) => {
587 error!("Object {id} {version} {digest} was deleted on RPC server.");
588 Ok(None)
589 }
590 Err(err) => Err(ReplayEngineError::IotaRpcError {
591 err: err.to_string(),
592 }),
593 }
594 }
595
596 #[expect(clippy::disallowed_methods)]
597 pub fn download_object_by_upper_bound(
598 &self,
599 object_id: &ObjectID,
600 version_upper_bound: VersionNumber,
601 ) -> Result<Option<Object>, ReplayEngineError> {
602 let local_object = self
603 .storage
604 .live_objects_store
605 .lock()
606 .expect("Can't lock")
607 .get(object_id)
608 .cloned();
609 if local_object.is_some() {
610 return Ok(local_object);
611 }
612 let response = block_on({
613 self.fetcher
614 .get_child_object(object_id, version_upper_bound)
615 });
616 match response {
617 Ok(object) => {
618 let obj_ref = object.compute_object_reference();
619 self.storage
620 .live_objects_store
621 .lock()
622 .expect("Can't lock")
623 .insert(*object_id, object.clone());
624 self.storage
625 .object_version_cache
626 .lock()
627 .expect("Can't lock")
628 .insert((obj_ref.0, obj_ref.1), object.clone());
629 Ok(Some(object))
630 }
631 Err(ReplayEngineError::ObjectNotExist { id }) => {
632 error!(
633 "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
634 );
635 Ok(None)
636 }
637 Err(ReplayEngineError::ObjectDeleted {
638 id,
639 version,
640 digest,
641 }) => {
642 error!("Object {id} {version} {digest} was deleted on RPC server.");
643 Ok(None)
644 }
645 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
648 info!(
649 "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
650 );
651 Ok(None)
652 }
653 Err(err) => Err(ReplayEngineError::IotaRpcError {
654 err: err.to_string(),
655 }),
656 }
657 }
658
659 pub async fn get_checkpoint_txs(
660 &self,
661 checkpoint_id: u64,
662 ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
663 self.fetcher
664 .get_checkpoint_txs(checkpoint_id)
665 .await
666 .map_err(|e| ReplayEngineError::IotaRpcError { err: e.to_string() })
667 }
668
669 pub async fn execute_all_in_checkpoints(
670 &mut self,
671 checkpoint_ids: &[u64],
672 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
673 terminate_early: bool,
674 use_authority: bool,
675 ) -> Result<(u64, u64), ReplayEngineError> {
676 let mut txs = Vec::new();
678 for checkpoint_id in checkpoint_ids {
679 txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
680 }
681 let num = txs.len();
682 let mut succeeded = 0;
683 for tx in txs {
684 match self
685 .execute_transaction(
686 &tx,
687 expensive_safety_check_config.clone(),
688 use_authority,
689 None,
690 None,
691 None,
692 None,
693 )
694 .await
695 .map(|q| q.check_effects())
696 {
697 Err(e) | Ok(Err(e)) => {
698 if terminate_early {
699 return Err(e);
700 }
701 error!("Error executing tx: {}, {:#?}", tx, e);
702 continue;
703 }
704 _ => (),
705 }
706
707 succeeded += 1;
708 }
709 Ok((succeeded, num as u64))
710 }
711
712 pub async fn execution_engine_execute_with_tx_info_impl(
713 &mut self,
714 tx_info: &OnChainTransactionInfo,
715 override_transaction_kind: Option<TransactionKind>,
716 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
717 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
718 let tx_digest = &tx_info.tx_digest;
719 if tx_info.sender_signed_data.transaction_data().is_system_tx() {
721 warn!(
722 "System TX replay not supported: {}, skipping transaction",
723 tx_digest
724 );
725 return Err(ReplayEngineError::TransactionNotSupported {
726 digest: *tx_digest,
727 reason: "System transaction".to_string(),
728 });
729 }
730
731 let input_objects = self.initialize_execution_env_state(tx_info).await?;
734 assert_eq!(
735 &input_objects.filter_shared_objects().len(),
736 &tx_info.shared_object_refs.len()
737 );
738 let protocol_config =
743 &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
744
745 let metrics = self.metrics.clone();
746
747 let ov = self.executor_version;
748
749 let executor = get_executor(
751 ov,
752 protocol_config,
753 expensive_safety_check_config,
754 self.enable_profiler.clone(),
755 );
756
757 let expensive_checks = true;
759 let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
760 let certificate_deny_set = HashSet::new();
761 let (inner_store, gas_status, effects, result) = if let Ok(gas_status) = IotaGasStatus::new(
762 tx_info.gas_budget,
763 tx_info.gas_price,
764 tx_info.reference_gas_price,
765 protocol_config,
766 ) {
767 executor.execute_transaction_to_effects(
768 &self,
769 protocol_config,
770 metrics.clone(),
771 expensive_checks,
772 &certificate_deny_set,
773 &tx_info.executed_epoch,
774 tx_info.epoch_start_timestamp,
775 CheckedInputObjects::new_for_replay(input_objects.clone()),
776 tx_info.gas.clone(),
777 gas_status,
778 transaction_kind.clone(),
779 tx_info.sender,
780 *tx_digest,
781 &mut None,
782 )
783 } else {
784 unreachable!("Transaction was valid so gas status must be valid");
785 };
786
787 if let Err(err) = self.pretty_print_for_tracing(
788 &gas_status,
789 &executor,
790 tx_info,
791 &transaction_kind,
792 protocol_config,
793 metrics,
794 expensive_checks,
795 input_objects.clone(),
796 ) {
797 error!("Failed to pretty print for tracing: {:?}", err);
798 }
799
800 let all_required_objects = self.storage.all_objects();
801
802 let effects =
803 IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
804
805 Ok(ExecutionSandboxState {
806 transaction_info: tx_info.clone(),
807 required_objects: all_required_objects,
808 local_exec_temporary_store: Some(inner_store),
809 local_exec_effects: effects,
810 local_exec_status: Some(result),
811 })
812 }
813
814 fn pretty_print_for_tracing(
815 &self,
816 gas_status: &IotaGasStatus,
817 executor: &Arc<dyn Executor + Send + Sync>,
818 tx_info: &OnChainTransactionInfo,
819 transaction_kind: &TransactionKind,
820 protocol_config: &ProtocolConfig,
821 metrics: Arc<LimitsMetrics>,
822 expensive_checks: bool,
823 input_objects: InputObjects,
824 ) -> anyhow::Result<()> {
825 trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
826
827 let skip_checks = true;
828 if let ProgrammableTransaction(pt) = transaction_kind {
829 trace!(
830 target: "replay_ptb_info",
831 "{}",
832 Pretty(&FullPTB {
833 ptb: pt.clone(),
834 results: transform_command_results_to_annotated(
835 executor,
836 &self.clone(),
837 executor.dev_inspect_transaction(
838 &self,
839 protocol_config,
840 metrics,
841 expensive_checks,
842 &HashSet::new(),
843 &tx_info.executed_epoch,
844 tx_info.epoch_start_timestamp,
845 CheckedInputObjects::new_for_replay(input_objects),
846 tx_info.gas.clone(),
847 IotaGasStatus::new(
848 tx_info.gas_budget,
849 tx_info.gas_price,
850 tx_info.reference_gas_price,
851 protocol_config,
852 )?,
853 transaction_kind.clone(),
854 tx_info.sender,
855 tx_info.sender_signed_data.digest(),
856 skip_checks,
857 )
858 .3
859 .unwrap_or_default(),
860 )?,
861 }));
862 }
863 Ok(())
864 }
865
866 pub async fn execution_engine_execute_impl(
868 &mut self,
869 tx_digest: &TransactionDigest,
870 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
871 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
872 if self.is_remote_replay() {
873 assert!(
874 !self.protocol_version_system_package_table.is_empty()
875 || !self.protocol_version_epoch_table.is_empty(),
876 "Required tables not populated. Must call `init_for_execution` before executing transactions"
877 );
878 }
879
880 let tx_info = if self.is_remote_replay() {
881 self.resolve_tx_components(tx_digest).await?
882 } else {
883 self.resolve_tx_components_from_dump(tx_digest).await?
884 };
885 self.execution_engine_execute_with_tx_info_impl(
886 &tx_info,
887 None,
888 expensive_safety_check_config,
889 )
890 .await
891 }
892
893 pub async fn certificate_execute_with_sandbox_state(
897 pre_run_sandbox: &ExecutionSandboxState,
898 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
899 let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
901 let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
902 let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
903 let protocol_config = ProtocolConfig::get_for_version(
904 pre_run_sandbox.transaction_info.protocol_version,
905 pre_run_sandbox.transaction_info.chain,
906 );
907 let required_objects = pre_run_sandbox.required_objects.clone();
908 let store = InMemoryStorage::new(required_objects.clone());
909
910 let transaction =
911 Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
912
913 let input_objects = store.read_input_objects_for_transaction(&transaction);
919 let executable = VerifiedExecutableTransaction::new_from_quorum_execution(
920 VerifiedTransaction::new_unchecked(transaction),
921 executed_epoch,
922 );
923 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
924 &executable,
925 input_objects,
926 &protocol_config,
927 reference_gas_price,
928 )
929 .unwrap();
930 let (kind, signer, gas) = executable.transaction_data().execution_parts();
931 let executor = iota_execution::executor(&protocol_config, true, None).unwrap();
932 let (_, _, effects, exec_res) = executor.execute_transaction_to_effects(
933 &store,
934 &protocol_config,
935 Arc::new(LimitsMetrics::new(&Registry::new())),
936 true,
937 &HashSet::new(),
938 &executed_epoch,
939 epoch_start_timestamp,
940 input_objects,
941 gas,
942 gas_status,
943 kind,
944 signer,
945 *executable.digest(),
946 &mut None,
947 );
948
949 let effects =
950 IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
951
952 Ok(ExecutionSandboxState {
953 transaction_info: pre_run_sandbox.transaction_info.clone(),
954 required_objects,
955 local_exec_temporary_store: None, local_exec_effects: effects,
957 local_exec_status: Some(exec_res),
958 })
959 }
960
961 pub async fn certificate_execute(
965 &mut self,
966 tx_digest: &TransactionDigest,
967 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
968 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
969 let pre_run_sandbox = self
971 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
972 .await?;
973 Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
974 }
975
976 pub async fn execution_engine_execute(
980 &mut self,
981 tx_digest: &TransactionDigest,
982 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
983 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
984 let sandbox_state = self
985 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
986 .await?;
987
988 Ok(sandbox_state)
989 }
990
991 pub async fn execute_state_dump(
992 &mut self,
993 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
994 ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
995 assert!(!self.is_remote_replay());
996
997 let d = match self.fetcher.clone() {
998 Fetchers::NodeStateDump(d) => d,
999 _ => panic!("Invalid fetcher for state dump"),
1000 };
1001 let tx_digest = d.node_state_dump.clone().tx_digest;
1002 let sandbox_state = self
1003 .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1004 .await?;
1005
1006 Ok((sandbox_state, d.node_state_dump))
1007 }
1008
1009 pub async fn execute_transaction(
1010 &mut self,
1011 tx_digest: &TransactionDigest,
1012 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1013 use_authority: bool,
1014 executor_version: Option<i64>,
1015 protocol_version: Option<i64>,
1016 enable_profiler: Option<PathBuf>,
1017 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1018 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1019 self.executor_version = executor_version;
1020 self.protocol_version = protocol_version;
1021 self.enable_profiler = enable_profiler;
1022 self.config_and_versions = config_and_versions;
1023 if use_authority {
1024 self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1025 .await
1026 } else {
1027 self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1028 .await
1029 }
1030 }
1031 fn system_package_ids(_protocol_version: u64) -> Vec<ObjectID> {
1032 BuiltInFramework::all_package_ids()
1033 }
1034
1035 pub fn get_or_download_object(
1037 &self,
1038 obj_id: &ObjectID,
1039 package_expected: bool,
1040 ) -> Result<Option<Object>, ReplayEngineError> {
1041 if package_expected {
1042 if let Some(obj) = self
1043 .storage
1044 .package_cache
1045 .lock()
1046 .expect("Cannot lock")
1047 .get(obj_id)
1048 {
1049 return Ok(Some(obj.clone()));
1050 };
1051 } else if let Some(obj) = self
1058 .storage
1059 .live_objects_store
1060 .lock()
1061 .expect("Can't lock")
1062 .get(obj_id)
1063 {
1064 return Ok(Some(obj.clone()));
1065 }
1066
1067 let Some(o) = self.download_latest_object(obj_id)? else {
1068 return Ok(None);
1069 };
1070
1071 if o.is_package() {
1072 assert!(
1073 package_expected,
1074 "Did not expect package but downloaded object is a package: {obj_id}"
1075 );
1076
1077 self.storage
1078 .package_cache
1079 .lock()
1080 .expect("Cannot lock")
1081 .insert(*obj_id, o.clone());
1082 }
1083 let o_ref = o.compute_object_reference();
1084 self.storage
1085 .object_version_cache
1086 .lock()
1087 .expect("Cannot lock")
1088 .insert((o_ref.0, o_ref.1), o.clone());
1089 Ok(Some(o))
1090 }
1091
1092 pub fn is_remote_replay(&self) -> bool {
1093 matches!(self.fetcher, Fetchers::Remote(_))
1094 }
1095
1096 pub fn system_package_versions_for_protocol_version(
1098 &self,
1099 protocol_version: u64,
1100 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1101 match &self.fetcher {
1102 Fetchers::Remote(_) => Ok(self
1103 .protocol_version_system_package_table
1104 .get(&protocol_version)
1105 .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1106 protocol_version,
1107 })?
1108 .clone()
1109 .into_iter()
1110 .collect()),
1111
1112 Fetchers::NodeStateDump(d) => Ok(d
1113 .node_state_dump
1114 .relevant_system_packages
1115 .iter()
1116 .map(|w| (w.id, w.version, w.digest))
1117 .map(|q| (q.0, q.1))
1118 .collect()),
1119 }
1120 }
1121
1122 pub async fn protocol_ver_to_epoch_map(
1123 &self,
1124 ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1125 let mut range_map = BTreeMap::new();
1126 let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1127
1128 let mut tx_digest = *self
1130 .fetcher
1131 .get_checkpoint_txs(0)
1132 .await?
1133 .first()
1134 .expect("Genesis TX must be in first checkpoint");
1135 let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1138 (0, 1, Some(0u64));
1139
1140 let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1141 (start_epoch, start_protocol_version, start_checkpoint);
1142
1143 (start_epoch, start_protocol_version, start_checkpoint) =
1144 (curr_epoch, curr_protocol_version, curr_checkpoint);
1145
1146 let mut end_epoch_tx_digest = tx_digest;
1149
1150 for event in epoch_change_events {
1151 (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1152 end_epoch_tx_digest = event.id.tx_digest;
1153
1154 if start_protocol_version == curr_protocol_version {
1155 continue;
1157 }
1158
1159 curr_checkpoint = self
1162 .fetcher
1163 .get_transaction(&event.id.tx_digest)
1164 .await?
1165 .checkpoint;
1166 range_map.insert(
1168 start_protocol_version,
1169 ProtocolVersionSummary {
1170 protocol_version: start_protocol_version,
1171 epoch_start: start_epoch,
1172 epoch_end: curr_epoch - 1,
1173 checkpoint_start: start_checkpoint,
1174 checkpoint_end: curr_checkpoint.map(|x| x - 1),
1175 epoch_change_tx: tx_digest,
1176 },
1177 );
1178
1179 start_epoch = curr_epoch;
1180 start_protocol_version = curr_protocol_version;
1181 tx_digest = event.id.tx_digest;
1182 start_checkpoint = curr_checkpoint;
1183 }
1184
1185 range_map.insert(
1187 curr_protocol_version,
1188 ProtocolVersionSummary {
1189 protocol_version: curr_protocol_version,
1190 epoch_start: start_epoch,
1191 epoch_end: curr_epoch,
1192 checkpoint_start: curr_checkpoint,
1193 checkpoint_end: self
1194 .fetcher
1195 .get_transaction(&end_epoch_tx_digest)
1196 .await?
1197 .checkpoint,
1198 epoch_change_tx: tx_digest,
1199 },
1200 );
1201
1202 Ok(range_map)
1203 }
1204
1205 pub fn protocol_version_for_epoch(
1206 epoch: u64,
1207 mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1208 ) -> u64 {
1209 let mut version = 1;
1212 for (k, v) in mp.iter().rev() {
1213 if v.1 <= epoch {
1214 version = *k;
1215 break;
1216 }
1217 }
1218 version
1219 }
1220
1221 pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1222 self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1223
1224 let system_package_revisions = self.system_package_versions().await?;
1225
1226 for (
1229 prot_ver,
1230 ProtocolVersionSummary {
1231 epoch_change_tx: tx_digest,
1232 ..
1233 },
1234 ) in self.protocol_version_epoch_table.clone()
1235 {
1236 let mut working = if prot_ver <= 1 {
1238 BTreeMap::new()
1239 } else {
1240 self.protocol_version_system_package_table
1241 .iter()
1242 .rev()
1243 .find(|(ver, _)| **ver <= prot_ver)
1244 .expect("Prev entry must exist")
1245 .1
1246 .clone()
1247 };
1248
1249 for (id, versions) in system_package_revisions.iter() {
1250 for ver in versions.iter().rev() {
1252 if ver.1 == tx_digest {
1253 working.insert(*id, ver.0);
1255 break;
1256 }
1257 }
1258 }
1259 self.protocol_version_system_package_table
1260 .insert(prot_ver, working);
1261 }
1262 Ok(())
1263 }
1264
1265 pub async fn system_package_versions(
1266 &self,
1267 ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1268 {
1269 let system_package_ids = Self::system_package_ids(
1270 *self
1271 .protocol_version_epoch_table
1272 .keys()
1273 .peekable()
1274 .last()
1275 .expect("Protocol version epoch table not populated"),
1276 );
1277 let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1278
1279 let mut mapping = BTreeMap::new();
1280
1281 while !system_package_objs.is_empty() {
1283 let previous_txs: Vec<_> = system_package_objs
1286 .iter()
1287 .map(|o| (o.compute_object_reference(), o.previous_transaction))
1288 .collect();
1289
1290 previous_txs.iter().for_each(|((id, ver, _), tx)| {
1291 mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1292 });
1293
1294 let previous_ver_refs: Vec<_> = previous_txs
1297 .iter()
1298 .filter_map(|(q, _)| {
1299 let prev_ver = u64::from(q.1) - 1;
1300 if prev_ver == 0 {
1301 None
1302 } else {
1303 Some((q.0, SequenceNumber::from(prev_ver)))
1304 }
1305 })
1306 .collect();
1307 system_package_objs = match self.multi_download(&previous_ver_refs).await {
1308 Ok(packages) => packages,
1309 Err(ReplayEngineError::ObjectNotExist { id }) => {
1310 warn!(
1314 "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1315 id
1316 );
1317 break;
1318 }
1319 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1320 warn!(
1324 "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1325 id, version
1326 );
1327 break;
1328 }
1329 Err(ReplayEngineError::ObjectVersionTooHigh {
1330 id,
1331 asked_version,
1332 latest_version,
1333 }) => {
1334 warn!(
1335 "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1336 id, asked_version, latest_version
1337 );
1338 break;
1339 }
1340 Err(ReplayEngineError::ObjectDeleted {
1341 id,
1342 version,
1343 digest,
1344 }) => {
1345 warn!(
1349 "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1350 id, version, digest
1351 );
1352 break;
1353 }
1354 Err(e) => return Err(e),
1355 };
1356 }
1357 Ok(mapping)
1358 }
1359
1360 pub async fn get_protocol_config(
1361 &self,
1362 epoch_id: EpochId,
1363 chain: Chain,
1364 ) -> Result<ProtocolConfig, ReplayEngineError> {
1365 match self.protocol_version {
1366 Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1367 Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1368 None => self
1369 .protocol_version_epoch_table
1370 .iter()
1371 .rev()
1372 .find(|(_, rg)| epoch_id >= rg.epoch_start)
1373 .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1374 .unwrap_or_else(|| {
1375 Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1376 }),
1377 }
1378 }
1379
1380 pub async fn checkpoints_for_epoch(
1381 &self,
1382 epoch_id: u64,
1383 ) -> Result<(u64, u64), ReplayEngineError> {
1384 let epoch_change_events = self
1385 .fetcher
1386 .get_epoch_change_events(true)
1387 .await?
1388 .into_iter()
1389 .collect::<Vec<_>>();
1390 let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1391 (0, 1)
1392 } else {
1393 let idx = epoch_change_events
1394 .iter()
1395 .position(|ev| match extract_epoch_and_version(ev.clone()) {
1396 Ok((epoch, _)) => epoch == epoch_id,
1397 Err(_) => false,
1398 })
1399 .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1400 let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1401 (
1402 self.fetcher
1403 .get_transaction(&epoch_change_tx)
1404 .await?
1405 .checkpoint
1406 .unwrap_or_else(|| {
1407 panic!(
1408 "Checkpoint for transaction {} not present. Could be due to pruning",
1409 epoch_change_tx
1410 )
1411 }),
1412 idx,
1413 )
1414 };
1415
1416 let next_epoch_change_tx = epoch_change_events
1417 .get(start_epoch_idx + 1)
1418 .map(|v| v.id.tx_digest)
1419 .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1420
1421 let next_epoch_checkpoint = self
1422 .fetcher
1423 .get_transaction(&next_epoch_change_tx)
1424 .await?
1425 .checkpoint
1426 .unwrap_or_else(|| {
1427 panic!(
1428 "Checkpoint for transaction {} not present. Could be due to pruning",
1429 next_epoch_change_tx
1430 )
1431 });
1432
1433 Ok((start_checkpoint, next_epoch_checkpoint - 1))
1434 }
1435
1436 pub async fn get_epoch_start_timestamp_and_rgp(
1437 &self,
1438 epoch_id: u64,
1439 tx_digest: &TransactionDigest,
1440 ) -> Result<(u64, u64), ReplayEngineError> {
1441 if epoch_id == 0 {
1442 return Err(ReplayEngineError::TransactionNotSupported {
1443 digest: *tx_digest,
1444 reason: "Transactions from epoch 0 not supported".to_string(),
1445 });
1446 }
1447 self.fetcher
1448 .get_epoch_start_timestamp_and_rgp(epoch_id)
1449 .await
1450 }
1451
1452 fn add_config_objects_if_needed(
1453 &self,
1454 status: &IotaExecutionStatus,
1455 ) -> Vec<(ObjectID, SequenceNumber)> {
1456 match parse_effect_error_for_denied_coins(status) {
1457 Some(coin_type) => {
1458 let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1459 panic!(
1460 "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1461 );
1462 };
1463 if !config_id_and_version
1465 .iter()
1466 .any(|(id, _)| id == &IOTA_DENY_LIST_OBJECT_ID)
1467 {
1468 let deny_list_oid_version = self.download_latest_object(&IOTA_DENY_LIST_OBJECT_ID)
1469 .ok()
1470 .flatten()
1471 .expect("Unable to download the deny list object for a transaction that requires it")
1472 .version();
1473 config_id_and_version.push((IOTA_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1474 }
1475 config_id_and_version
1476 }
1477 None => vec![],
1478 }
1479 }
1480
1481 async fn resolve_tx_components(
1482 &self,
1483 tx_digest: &TransactionDigest,
1484 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1485 assert!(self.is_remote_replay());
1486 let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1488 let sender = match tx_info.clone().transaction.unwrap().data {
1489 iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.sender,
1490 };
1491 let IotaTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1492
1493 let config_objects = self.add_config_objects_if_needed(effects.status());
1494
1495 let raw_tx_bytes = tx_info.clone().raw_transaction;
1496 let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1497 let input_objs = orig_tx
1498 .transaction_data()
1499 .input_objects()
1500 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1501 let tx_kind_orig = orig_tx.transaction_data().kind();
1502
1503 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1505
1506 let shared_object_refs: Vec<ObjectRef> = effects
1507 .shared_objects()
1508 .iter()
1509 .map(|so_ref| {
1510 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1511 unimplemented!(
1512 "Replay of deleted shared object transactions is not supported yet"
1513 );
1514 } else {
1515 so_ref.to_object_ref()
1516 }
1517 })
1518 .collect();
1519 let gas_data = match tx_info.clone().transaction.unwrap().data {
1520 iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.gas_data,
1521 };
1522 let gas_object_refs: Vec<_> = gas_data
1523 .payment
1524 .iter()
1525 .map(|obj_ref| obj_ref.to_object_ref())
1526 .collect();
1527 let receiving_objs = orig_tx
1528 .transaction_data()
1529 .receiving_objects()
1530 .into_iter()
1531 .map(|(obj_id, version, _)| (obj_id, version))
1532 .collect();
1533
1534 let epoch_id = effects.executed_epoch;
1535 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1536
1537 let (epoch_start_timestamp, reference_gas_price) = self
1539 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1540 .await?;
1541
1542 Ok(OnChainTransactionInfo {
1543 kind: tx_kind_orig.clone(),
1544 sender,
1545 modified_at_versions,
1546 input_objects: input_objs,
1547 shared_object_refs,
1548 gas: gas_object_refs,
1549 gas_budget: gas_data.budget,
1550 gas_price: gas_data.price,
1551 executed_epoch: epoch_id,
1552 dependencies: effects.dependencies().to_vec(),
1553 effects: IotaTransactionBlockEffects::V1(effects),
1554 receiving_objs,
1555 config_objects,
1556 protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1560 tx_digest: *tx_digest,
1561 epoch_start_timestamp,
1562 sender_signed_data: orig_tx.clone(),
1563 reference_gas_price,
1564 chain,
1565 })
1566 }
1567
1568 async fn resolve_tx_components_from_dump(
1569 &self,
1570 tx_digest: &TransactionDigest,
1571 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1572 assert!(!self.is_remote_replay());
1573
1574 let dp = self.fetcher.as_node_state_dump();
1575
1576 let sender = dp
1577 .node_state_dump
1578 .sender_signed_data
1579 .transaction_data()
1580 .sender();
1581 let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1582 let effects = dp.node_state_dump.computed_effects.clone();
1583 let effects = IotaTransactionBlockEffects::try_from(effects).unwrap();
1584 let config_objects = self.add_config_objects_if_needed(effects.status());
1587
1588 let input_objs = orig_tx
1592 .transaction_data()
1593 .input_objects()
1594 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1595 let tx_kind_orig = orig_tx.transaction_data().kind();
1596
1597 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1599
1600 let shared_object_refs: Vec<ObjectRef> = effects
1601 .shared_objects()
1602 .iter()
1603 .map(|so_ref| {
1604 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1605 unimplemented!(
1606 "Replay of deleted shared object transactions is not supported yet"
1607 );
1608 } else {
1609 so_ref.to_object_ref()
1610 }
1611 })
1612 .collect();
1613 let gas_data = orig_tx.transaction_data().gas_data();
1614 let gas_object_refs: Vec<_> = gas_data.clone().payment;
1615 let receiving_objs = orig_tx
1616 .transaction_data()
1617 .receiving_objects()
1618 .into_iter()
1619 .map(|(obj_id, version, _)| (obj_id, version))
1620 .collect();
1621
1622 let epoch_id = dp.node_state_dump.executed_epoch;
1623
1624 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1625
1626 let protocol_config =
1627 ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1628 let (epoch_start_timestamp, reference_gas_price) = self
1630 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1631 .await?;
1632
1633 Ok(OnChainTransactionInfo {
1634 kind: tx_kind_orig.clone(),
1635 sender,
1636 modified_at_versions,
1637 input_objects: input_objs,
1638 shared_object_refs,
1639 gas: gas_object_refs,
1640 gas_budget: gas_data.budget,
1641 gas_price: gas_data.price,
1642 executed_epoch: epoch_id,
1643 dependencies: effects.dependencies().to_vec(),
1644 effects,
1645 receiving_objs,
1646 config_objects,
1647 protocol_version: protocol_config.version,
1648 tx_digest: *tx_digest,
1649 epoch_start_timestamp,
1650 sender_signed_data: orig_tx.clone(),
1651 reference_gas_price,
1652 chain,
1653 })
1654 }
1655
1656 async fn resolve_download_input_objects(
1657 &mut self,
1658 tx_info: &OnChainTransactionInfo,
1659 deleted_shared_objects: Vec<ObjectRef>,
1660 ) -> Result<InputObjects, ReplayEngineError> {
1661 let mut package_inputs = vec![];
1663 let mut imm_owned_inputs = vec![];
1664 let mut shared_inputs = vec![];
1665 let mut deleted_shared_info_map = BTreeMap::new();
1666
1667 if !deleted_shared_objects.is_empty() {
1671 for tx_digest in tx_info.dependencies.iter() {
1672 let tx_info = self.resolve_tx_components(tx_digest).await?;
1673 for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1674 deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1675 }
1676 }
1677 }
1678
1679 tx_info
1680 .input_objects
1681 .iter()
1682 .map(|kind| match kind {
1683 InputObjectKind::MovePackage(i) => {
1684 package_inputs.push(*i);
1685 Ok(())
1686 }
1687 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1688 imm_owned_inputs.push((o_ref.0, o_ref.1));
1689 Ok(())
1690 }
1691 InputObjectKind::SharedMoveObject {
1692 id,
1693 initial_shared_version: _,
1694 mutable: _,
1695 } if !deleted_shared_info_map.contains_key(id) => {
1696 if let Some(o) = self
1698 .storage
1699 .live_objects_store
1700 .lock()
1701 .expect("Can't lock")
1702 .get(id)
1703 {
1704 shared_inputs.push(o.clone());
1705 Ok(())
1706 } else {
1707 Err(ReplayEngineError::InternalCacheInvariantViolation {
1708 id: *id,
1709 version: None,
1710 })
1711 }
1712 }
1713 _ => Ok(()),
1714 })
1715 .collect::<Result<Vec<_>, _>>()?;
1716
1717 let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1719
1720 in_objs.extend(
1723 self.multi_download_relevant_packages_and_store(
1724 package_inputs,
1725 tx_info.protocol_version.as_u64(),
1726 )
1727 .await?,
1728 );
1729 in_objs.extend(shared_inputs);
1731
1732 let resolved_input_objs = tx_info
1735 .input_objects
1736 .iter()
1737 .flat_map(|kind| match kind {
1738 InputObjectKind::MovePackage(i) => {
1739 Some(ObjectReadResult::new(
1741 *kind,
1742 self.storage
1743 .package_cache
1744 .lock()
1745 .expect("Cannot lock")
1746 .get(i)
1747 .unwrap_or(
1748 &self
1749 .download_latest_object(i)
1750 .expect("Object download failed")
1751 .expect("Object not found on chain"),
1752 )
1753 .clone()
1754 .into(),
1755 ))
1756 }
1757 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1758 *kind,
1759 self.storage
1760 .object_version_cache
1761 .lock()
1762 .expect("Cannot lock")
1763 .get(&(o_ref.0, o_ref.1))
1764 .unwrap()
1765 .clone()
1766 .into(),
1767 )),
1768 InputObjectKind::SharedMoveObject { id, .. }
1769 if !deleted_shared_info_map.contains_key(id) =>
1770 {
1771 Some(ObjectReadResult::new(
1773 *kind,
1774 self.storage
1775 .live_objects_store
1776 .lock()
1777 .expect("Can't lock")
1778 .get(id)
1779 .unwrap()
1780 .clone()
1781 .into(),
1782 ))
1783 }
1784 InputObjectKind::SharedMoveObject { id, .. } => {
1785 let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1786 Some(ObjectReadResult::new(
1787 *kind,
1788 ObjectReadResultKind::DeletedSharedObject(*version, *digest),
1789 ))
1790 }
1791 })
1792 .collect();
1793
1794 Ok(InputObjects::new(resolved_input_objs))
1795 }
1796
1797 async fn initialize_execution_env_state(
1800 &mut self,
1801 tx_info: &OnChainTransactionInfo,
1802 ) -> Result<InputObjects, ReplayEngineError> {
1803 self.current_protocol_version = tx_info.protocol_version.as_u64();
1805
1806 self.multi_download_and_store(&tx_info.modified_at_versions)
1808 .await?;
1809
1810 let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1811 .shared_object_refs
1812 .iter()
1813 .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1814
1815 let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1817 self.multi_download_and_store(&shared_refs).await?;
1818
1819 let gas_refs: Vec<_> = tx_info.gas.iter().map(|w| (w.0, w.1)).collect();
1822 self.multi_download_and_store(&gas_refs).await?;
1823
1824 let input_objs = self
1826 .resolve_download_input_objects(tx_info, deleted_shared_refs)
1827 .await?;
1828
1829 self.multi_download_and_store(&tx_info.receiving_objs)
1831 .await?;
1832
1833 self.multi_download_and_store(&tx_info.config_objects)
1835 .await?;
1836
1837 let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1841 self.multi_download_and_store(&loaded_child_refs).await?;
1842 tokio::task::yield_now().await;
1843
1844 Ok(input_objs)
1845 }
1846}
1847
1848impl BackingPackageStore for LocalExec {
1852 fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1856 fn inner(self_: &LocalExec, package_id: &ObjectID) -> IotaResult<Option<Object>> {
1857 self_
1859 .get_or_download_object(package_id, true )
1860 .map_err(|e| IotaError::Storage(e.to_string()))
1861 }
1862
1863 let res = inner(self, package_id);
1864 self.exec_store_events
1865 .lock()
1866 .expect("Unable to lock events list")
1867 .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1868 package_id: *package_id,
1869 result: res.clone(),
1870 });
1871 res.map(|o| o.map(PackageObject::new))
1872 }
1873}
1874
1875impl ChildObjectResolver for LocalExec {
1876 fn read_child_object(
1879 &self,
1880 parent: &ObjectID,
1881 child: &ObjectID,
1882 child_version_upper_bound: SequenceNumber,
1883 ) -> IotaResult<Option<Object>> {
1884 fn inner(
1885 self_: &LocalExec,
1886 parent: &ObjectID,
1887 child: &ObjectID,
1888 child_version_upper_bound: SequenceNumber,
1889 ) -> IotaResult<Option<Object>> {
1890 let child_object =
1891 match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1892 None => return Ok(None),
1893 Some(o) => o,
1894 };
1895 let child_version = child_object.version();
1896 if child_object.version() > child_version_upper_bound {
1897 return Err(IotaError::Unknown(format!(
1898 "Invariant Violation. Replay loaded child_object {child} at version \
1899 {child_version} but expected the version to be <= {child_version_upper_bound}"
1900 )));
1901 }
1902 let parent = *parent;
1903 if child_object.owner != Owner::ObjectOwner(parent.into()) {
1904 return Err(IotaError::InvalidChildObjectAccess {
1905 object: *child,
1906 given_parent: parent,
1907 actual_owner: child_object.owner,
1908 });
1909 }
1910 Ok(Some(child_object))
1911 }
1912
1913 let res = inner(self, parent, child, child_version_upper_bound);
1914 self.exec_store_events
1915 .lock()
1916 .expect("Unable to lock events list")
1917 .push(
1918 ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1919 parent: *parent,
1920 child: *child,
1921 result: res.clone(),
1922 },
1923 );
1924 res
1925 }
1926
1927 fn get_object_received_at_version(
1928 &self,
1929 owner: &ObjectID,
1930 receiving_object_id: &ObjectID,
1931 receive_object_at_version: SequenceNumber,
1932 _epoch_id: EpochId,
1933 ) -> IotaResult<Option<Object>> {
1934 fn inner(
1935 self_: &LocalExec,
1936 owner: &ObjectID,
1937 receiving_object_id: &ObjectID,
1938 receive_object_at_version: SequenceNumber,
1939 ) -> IotaResult<Option<Object>> {
1940 let recv_object = match self_.get_object(receiving_object_id)? {
1941 None => return Ok(None),
1942 Some(o) => o,
1943 };
1944 if recv_object.version() != receive_object_at_version {
1945 return Err(IotaError::Unknown(format!(
1946 "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
1947 {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
1948 )));
1949 }
1950 if recv_object.owner != Owner::AddressOwner((*owner).into()) {
1951 return Ok(None);
1952 }
1953 Ok(Some(recv_object))
1954 }
1955
1956 let res = inner(self, owner, receiving_object_id, receive_object_at_version);
1957 self.exec_store_events
1958 .lock()
1959 .expect("Unable to lock events list")
1960 .push(ExecutionStoreEvent::ReceiveObject {
1961 owner: *owner,
1962 receive: *receiving_object_id,
1963 receive_at_version: receive_object_at_version,
1964 result: res.clone(),
1965 });
1966 res
1967 }
1968}
1969
1970impl ResourceResolver for LocalExec {
1971 type Error = IotaError;
1972
1973 fn get_resource(
1977 &self,
1978 address: &AccountAddress,
1979 typ: &StructTag,
1980 ) -> IotaResult<Option<Vec<u8>>> {
1981 fn inner(
1982 self_: &LocalExec,
1983 address: &AccountAddress,
1984 typ: &StructTag,
1985 ) -> IotaResult<Option<Vec<u8>>> {
1986 let Some(object) = self_.get_or_download_object(
1988 &ObjectID::from(*address),
1989 false, )?
1991 else {
1992 return Ok(None);
1993 };
1994
1995 match &object.data {
1996 Data::Move(m) => {
1997 assert!(
1998 m.is_type(typ),
1999 "Invariant violation: ill-typed object in storage \
2000 or bad object request from caller"
2001 );
2002 Ok(Some(m.contents().to_vec()))
2003 }
2004 other => unimplemented!(
2005 "Bad object lookup: expected Move object, but got {:#?}",
2006 other
2007 ),
2008 }
2009 }
2010
2011 let res = inner(self, address, typ);
2012 self.exec_store_events
2013 .lock()
2014 .expect("Unable to lock events list")
2015 .push(ExecutionStoreEvent::ResourceResolverGetResource {
2016 address: *address,
2017 typ: typ.clone(),
2018 result: res.clone(),
2019 });
2020 res
2021 }
2022}
2023
2024impl ModuleResolver for LocalExec {
2025 type Error = IotaError;
2026
2027 fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2030 fn inner(self_: &LocalExec, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2031 get_module(self_, module_id)
2032 }
2033
2034 let res = inner(self, module_id);
2035 self.exec_store_events
2036 .lock()
2037 .expect("Unable to lock events list")
2038 .push(ExecutionStoreEvent::ModuleResolverGetModule {
2039 module_id: module_id.clone(),
2040 result: res.clone(),
2041 });
2042 res
2043 }
2044}
2045
2046impl ModuleResolver for &mut LocalExec {
2047 type Error = IotaError;
2048
2049 fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2050 (**self).get_module(module_id)
2053 }
2054}
2055
2056impl ObjectStore for LocalExec {
2057 fn get_object(
2060 &self,
2061 object_id: &ObjectID,
2062 ) -> iota_types::storage::error::Result<Option<Object>> {
2063 let res = self
2064 .storage
2065 .live_objects_store
2066 .lock()
2067 .expect("Can't lock")
2068 .get(object_id)
2069 .cloned();
2070 self.exec_store_events
2071 .lock()
2072 .expect("Unable to lock events list")
2073 .push(ExecutionStoreEvent::ObjectStoreGetObject {
2074 object_id: *object_id,
2075 result: Ok(res.clone()),
2076 });
2077 Ok(res)
2078 }
2079
2080 fn get_object_by_key(
2083 &self,
2084 object_id: &ObjectID,
2085 version: VersionNumber,
2086 ) -> iota_types::storage::error::Result<Option<Object>> {
2087 let res = self
2088 .storage
2089 .live_objects_store
2090 .lock()
2091 .expect("Can't lock")
2092 .get(object_id)
2093 .and_then(|obj| {
2094 if obj.version() == version {
2095 Some(obj.clone())
2096 } else {
2097 None
2098 }
2099 });
2100
2101 self.exec_store_events
2102 .lock()
2103 .expect("Unable to lock events list")
2104 .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2105 object_id: *object_id,
2106 version,
2107 result: Ok(res.clone()),
2108 });
2109
2110 Ok(res)
2111 }
2112}
2113
2114impl ObjectStore for &mut LocalExec {
2115 fn get_object(
2116 &self,
2117 object_id: &ObjectID,
2118 ) -> iota_types::storage::error::Result<Option<Object>> {
2119 (**self).get_object(object_id)
2122 }
2123
2124 fn get_object_by_key(
2125 &self,
2126 object_id: &ObjectID,
2127 version: VersionNumber,
2128 ) -> iota_types::storage::error::Result<Option<Object>> {
2129 (**self).get_object_by_key(object_id, version)
2132 }
2133}
2134
2135impl GetModule for LocalExec {
2136 type Error = IotaError;
2137 type Item = CompiledModule;
2138
2139 fn get_module_by_id(&self, id: &ModuleId) -> IotaResult<Option<Self::Item>> {
2140 let res = get_module_by_id(self, id);
2141
2142 self.exec_store_events
2143 .lock()
2144 .expect("Unable to lock events list")
2145 .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2146 id: id.clone(),
2147 result: res.clone(),
2148 });
2149 res
2150 }
2151}
2152
2153pub fn get_executor(
2156 executor_version_override: Option<i64>,
2157 protocol_config: &ProtocolConfig,
2158 _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2159 enable_profiler: Option<PathBuf>,
2160) -> Arc<dyn Executor + Send + Sync> {
2161 let protocol_config = executor_version_override
2162 .map(|q| {
2163 let ver = if q < 0 {
2164 ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2165 } else {
2166 q as u64
2167 };
2168
2169 let mut c = protocol_config.clone();
2170 c.set_execution_version_for_testing(ver);
2171 c
2172 })
2173 .unwrap_or(protocol_config.clone());
2174
2175 let silent = true;
2176 iota_execution::executor(&protocol_config, silent, enable_profiler)
2177 .expect("Creating an executor should not fail here")
2178}
2179
2180fn parse_effect_error_for_denied_coins(status: &IotaExecutionStatus) -> Option<String> {
2181 let IotaExecutionStatus::Failure { error } = status else {
2182 return None;
2183 };
2184 parse_denied_error_string(error)
2185}
2186
2187fn parse_denied_error_string(error: &str) -> Option<String> {
2188 let regulated_regex = regex::Regex::new(
2189 r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2190 )
2191 .unwrap();
2192
2193 let caps = regulated_regex.captures(error)?;
2194 Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2195}
2196
2197#[cfg(test)]
2198mod tests {
2199 use super::parse_denied_error_string;
2200 #[test]
2201 fn test_regex_regulated_coin_errors() {
2202 let test_bank = vec![
2203 "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2204 "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2205 ];
2206 let expected_string =
2207 "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2208
2209 for test in &test_bank {
2210 assert!(parse_denied_error_string(test).unwrap() == expected_string);
2211 }
2212 }
2213}