iota_replay/
replay.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashSet},
7    path::PathBuf,
8    sync::{Arc, Mutex},
9};
10
11use futures::executor::block_on;
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_core::authority::NodeStateDump;
14use iota_execution::Executor;
15use iota_framework::BuiltInFramework;
16use iota_json_rpc_types::{
17    IotaExecutionStatus, IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI,
18};
19use iota_protocol_config::{Chain, ProtocolConfig};
20use iota_sdk::{IotaClient, IotaClientBuilder};
21use iota_types::{
22    IOTA_DENY_LIST_OBJECT_ID,
23    base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
24    committee::EpochId,
25    digests::{ObjectDigest, TransactionDigest},
26    error::{ExecutionError, IotaError, IotaResult},
27    executable_transaction::VerifiedExecutableTransaction,
28    gas::IotaGasStatus,
29    in_memory_storage::InMemoryStorage,
30    inner_temporary_store::InnerTemporaryStore,
31    message_envelope::Message,
32    metrics::LimitsMetrics,
33    object::{Data, Object, Owner},
34    storage::{
35        BackingPackageStore, ChildObjectResolver, ObjectStore, PackageObject, get_module,
36        get_module_by_id,
37    },
38    transaction::{
39        CheckedInputObjects, GasData, InputObjectKind, InputObjects, ObjectReadResult,
40        ObjectReadResultKind, SenderSignedData, Transaction, TransactionDataAPI,
41        TransactionKind::{self, ProgrammableTransaction},
42        VerifiedTransaction,
43    },
44};
45use move_binary_format::CompiledModule;
46use move_bytecode_utils::module_cache::GetModule;
47use move_core_types::{
48    account_address::AccountAddress,
49    language_storage::{ModuleId, StructTag},
50    resolver::{ModuleResolver, ResourceResolver},
51};
52use prometheus::Registry;
53use serde::{Deserialize, Serialize};
54use similar::{ChangeTag, TextDiff};
55use tracing::{error, info, trace, warn};
56
57use crate::{
58    chain_from_chain_id,
59    data_fetcher::{
60        DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
61    },
62    displays::{
63        Pretty,
64        transaction_displays::{FullPTB, transform_command_results_to_annotated},
65    },
66    types::*,
67};
68
69// TODO: add persistent cache. But perf is good enough already.
70
71#[derive(Debug, Serialize, Deserialize)]
72pub struct ExecutionSandboxState {
73    /// Information describing the transaction
74    pub transaction_info: OnChainTransactionInfo,
75    /// All the objects that are required for the execution of the transaction
76    pub required_objects: Vec<Object>,
77    /// Temporary store from executing this locally in
78    /// `execute_transaction_to_effects`
79    #[serde(skip)]
80    pub local_exec_temporary_store: Option<InnerTemporaryStore>,
81    /// Effects from executing this locally in `execute_transaction_to_effects`
82    pub local_exec_effects: IotaTransactionBlockEffects,
83    /// Status from executing this locally in `execute_transaction_to_effects`
84    #[serde(skip)]
85    pub local_exec_status: Option<Result<(), ExecutionError>>,
86}
87
88impl ExecutionSandboxState {
89    pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
90        if self.transaction_info.effects != self.local_exec_effects {
91            error!("Replay tool forked {}", self.transaction_info.tx_digest);
92            let diff = self.diff_effects();
93            println!("{diff}");
94            return Err(ReplayEngineError::EffectsForked {
95                digest: self.transaction_info.tx_digest,
96                diff: format!("\n{diff}"),
97                on_chain: Box::new(self.transaction_info.effects.clone()),
98                local: Box::new(self.local_exec_effects.clone()),
99            });
100        }
101        Ok(())
102    }
103
104    /// Utility to diff effects in a human readable format
105    pub fn diff_effects(&self) -> String {
106        let eff1 = &self.transaction_info.effects;
107        let eff2 = &self.local_exec_effects;
108        let on_chain_str = format!("{eff1:#?}");
109        let local_chain_str = format!("{eff2:#?}");
110        let mut res = vec![];
111
112        let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
113        for change in diff.iter_all_changes() {
114            let sign = match change.tag() {
115                ChangeTag::Delete => "---",
116                ChangeTag::Insert => "+++",
117                ChangeTag::Equal => "   ",
118            };
119            res.push(format!("{sign}{change}"));
120        }
121
122        res.join("")
123    }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub struct ProtocolVersionSummary {
128    /// Protocol version at this point
129    pub protocol_version: u64,
130    /// The first epoch that uses this protocol version
131    pub epoch_start: u64,
132    /// The last epoch that uses this protocol version
133    pub epoch_end: u64,
134    /// The first checkpoint in this protocol v ersion
135    pub checkpoint_start: Option<u64>,
136    /// The last checkpoint in this protocol version
137    pub checkpoint_end: Option<u64>,
138    /// The transaction which triggered this epoch change
139    pub epoch_change_tx: TransactionDigest,
140}
141
142#[derive(Clone)]
143pub struct Storage {
144    /// These are objects at the frontier of the execution's view
145    /// They might not be the latest object currently but they are the latest
146    /// objects for the TX at the time it was run
147    /// This store cannot be shared between runners
148    pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
149
150    /// Package cache and object version cache can be shared between runners
151    /// Non system packages are immutable so we can cache these
152    pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
153    /// Object contents are frozen at their versions so we can cache these
154    /// We must place system packages here as well
155    pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
156}
157
158impl std::fmt::Display for Storage {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        writeln!(f, "Live object store")?;
161        for (id, obj) in self
162            .live_objects_store
163            .lock()
164            .expect("Unable to lock")
165            .iter()
166        {
167            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
168        }
169        writeln!(f, "Package cache")?;
170        for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
171            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
172        }
173        writeln!(f, "Object version cache")?;
174        for (id, _) in self
175            .object_version_cache
176            .lock()
177            .expect("Unable to lock")
178            .iter()
179        {
180            writeln!(f, "{}: {}", id.0, id.1)?;
181        }
182
183        write!(f, "")
184    }
185}
186
187impl Storage {
188    pub fn default() -> Self {
189        Self {
190            live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
191            package_cache: Arc::new(Mutex::new(BTreeMap::new())),
192            object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
193        }
194    }
195
196    pub fn all_objects(&self) -> Vec<Object> {
197        self.live_objects_store
198            .lock()
199            .expect("Unable to lock")
200            .values()
201            .cloned()
202            .chain(
203                self.package_cache
204                    .lock()
205                    .expect("Unable to lock")
206                    .values()
207                    .cloned(),
208            )
209            .chain(
210                self.object_version_cache
211                    .lock()
212                    .expect("Unable to lock")
213                    .values()
214                    .cloned(),
215            )
216            .collect::<Vec<_>>()
217    }
218}
219
220#[derive(Clone)]
221pub struct LocalExec {
222    pub client: Option<IotaClient>,
223    // For a given protocol version, what TX created it, and what is the valid range of epochs
224    // at this protocol version.
225    pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
226    // For a given protocol version, the mapping valid sequence numbers for each framework package
227    pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
228    // The current protocol version for this execution
229    pub current_protocol_version: u64,
230    // All state is contained here
231    pub storage: Storage,
232    // Debug events
233    pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
234    // Debug events
235    pub metrics: Arc<LimitsMetrics>,
236    // Used for fetching data from the network or remote store
237    pub fetcher: Fetchers,
238
239    // One can optionally override the executor version
240    // -1 implies use latest version
241    pub executor_version: Option<i64>,
242    // One can optionally override the protocol version
243    // -1 implies use latest version
244    // None implies use the protocol version at the time of execution
245    pub protocol_version: Option<i64>,
246    // Whether or not to enable the gas profiler, the PathBuf contains either a user specified
247    // filepath or the default current directory and name format for the profile output
248    pub enable_profiler: Option<PathBuf>,
249    pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
250    // Retry policies due to RPC errors
251    pub num_retries_for_timeout: u32,
252    pub sleep_period_for_timeout: std::time::Duration,
253}
254
255impl LocalExec {
256    /// Wrapper around fetcher in case we want to add more functionality
257    /// Such as fetching from local DB from snapshot
258    pub async fn multi_download(
259        &self,
260        objs: &[(ObjectID, SequenceNumber)],
261    ) -> Result<Vec<Object>, ReplayEngineError> {
262        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
263        while num_retries_for_timeout >= 0 {
264            match self.fetcher.multi_get_versioned(objs).await {
265                Ok(objs) => return Ok(objs),
266                Err(ReplayEngineError::IotaRpcRequestTimeout) => {
267                    warn!(
268                        "RPC request timed out. Retries left {}. Sleeping for {}s",
269                        num_retries_for_timeout,
270                        self.sleep_period_for_timeout.as_secs()
271                    );
272                    num_retries_for_timeout -= 1;
273                    tokio::time::sleep(self.sleep_period_for_timeout).await;
274                }
275                Err(e) => return Err(e),
276            }
277        }
278        Err(ReplayEngineError::IotaRpcRequestTimeout)
279    }
280    /// Wrapper around fetcher in case we want to add more functionality
281    /// Such as fetching from local DB from snapshot
282    pub async fn multi_download_latest(
283        &self,
284        objs: &[ObjectID],
285    ) -> Result<Vec<Object>, ReplayEngineError> {
286        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
287        while num_retries_for_timeout >= 0 {
288            match self.fetcher.multi_get_latest(objs).await {
289                Ok(objs) => return Ok(objs),
290                Err(ReplayEngineError::IotaRpcRequestTimeout) => {
291                    warn!(
292                        "RPC request timed out. Retries left {}. Sleeping for {}s",
293                        num_retries_for_timeout,
294                        self.sleep_period_for_timeout.as_secs()
295                    );
296                    num_retries_for_timeout -= 1;
297                    tokio::time::sleep(self.sleep_period_for_timeout).await;
298                }
299                Err(e) => return Err(e),
300            }
301        }
302        Err(ReplayEngineError::IotaRpcRequestTimeout)
303    }
304
305    pub async fn fetch_loaded_child_refs(
306        &self,
307        tx_digest: &TransactionDigest,
308    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
309        // Get the child objects loaded
310        self.fetcher.get_loaded_child_objects(tx_digest).await
311    }
312
313    pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
314        Self::new_for_remote(
315            IotaClientBuilder::default()
316                .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
317                .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
318                .build(http_url)
319                .await?,
320            None,
321        )
322        .await
323    }
324
325    pub async fn replay_with_network_config(
326        rpc_url: String,
327        tx_digest: TransactionDigest,
328        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
329        use_authority: bool,
330        executor_version: Option<i64>,
331        protocol_version: Option<i64>,
332        enable_profiler: Option<PathBuf>,
333        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
334    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
335        info!("Using RPC URL: {}", rpc_url);
336        LocalExec::new_from_fn_url(&rpc_url)
337            .await?
338            .init_for_execution()
339            .await?
340            .execute_transaction(
341                &tx_digest,
342                expensive_safety_check_config,
343                use_authority,
344                executor_version,
345                protocol_version,
346                enable_profiler,
347                config_and_versions,
348            )
349            .await
350    }
351
352    /// This captures the state of the network at a given point in time and
353    /// populates prptocol version tables including which system packages to
354    /// fetch If this function is called across epoch boundaries, the info
355    /// might be stale. But it should only be called once per epoch.
356    pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
357        self.populate_protocol_version_tables().await?;
358        tokio::task::yield_now().await;
359        Ok(self)
360    }
361
362    pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
363        Self::new_for_remote(
364            self.client.expect("Remote client not initialized"),
365            Some(self.fetcher.into_remote()),
366        )
367        .await?
368        .init_for_execution()
369        .await
370    }
371
372    pub async fn new_for_remote(
373        client: IotaClient,
374        remote_fetcher: Option<RemoteFetcher>,
375    ) -> Result<Self, ReplayEngineError> {
376        // Use a throwaway metrics registry for local execution.
377        let registry = prometheus::Registry::new();
378        let metrics = Arc::new(LimitsMetrics::new(&registry));
379
380        let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
381
382        Ok(Self {
383            client: Some(client),
384            protocol_version_epoch_table: BTreeMap::new(),
385            protocol_version_system_package_table: BTreeMap::new(),
386            current_protocol_version: 0,
387            exec_store_events: Arc::new(Mutex::new(Vec::new())),
388            metrics,
389            storage: Storage::default(),
390            fetcher: Fetchers::Remote(fetcher),
391            // TODO: make these configurable
392            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
393            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
394            executor_version: None,
395            protocol_version: None,
396            enable_profiler: None,
397            config_and_versions: None,
398        })
399    }
400
401    pub async fn new_for_state_dump(
402        path: &str,
403        backup_rpc_url: Option<String>,
404    ) -> Result<Self, ReplayEngineError> {
405        // Use a throwaway metrics registry for local execution.
406        let registry = prometheus::Registry::new();
407        let metrics = Arc::new(LimitsMetrics::new(&registry));
408
409        let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
410        let current_protocol_version = state.protocol_version;
411        let fetcher = match backup_rpc_url {
412            Some(url) => NodeStateDumpFetcher::new(
413                state,
414                Some(RemoteFetcher::new(
415                    IotaClientBuilder::default()
416                        .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
417                        .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
418                        .build(url)
419                        .await?,
420                )),
421            ),
422            None => NodeStateDumpFetcher::new(state, None),
423        };
424
425        Ok(Self {
426            client: None,
427            protocol_version_epoch_table: BTreeMap::new(),
428            protocol_version_system_package_table: BTreeMap::new(),
429            current_protocol_version,
430            exec_store_events: Arc::new(Mutex::new(Vec::new())),
431            metrics,
432            storage: Storage::default(),
433            fetcher: Fetchers::NodeStateDump(fetcher),
434            // TODO: make these configurable
435            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
436            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
437            executor_version: None,
438            protocol_version: None,
439            enable_profiler: None,
440            config_and_versions: None,
441        })
442    }
443
444    pub async fn multi_download_and_store(
445        &mut self,
446        objs: &[(ObjectID, SequenceNumber)],
447    ) -> Result<Vec<Object>, ReplayEngineError> {
448        let objs = self.multi_download(objs).await?;
449
450        // Backfill the store
451        for obj in objs.iter() {
452            let o_ref = obj.compute_object_reference();
453            self.storage
454                .live_objects_store
455                .lock()
456                .expect("Can't lock")
457                .insert(o_ref.0, obj.clone());
458            self.storage
459                .object_version_cache
460                .lock()
461                .expect("Cannot lock")
462                .insert((o_ref.0, o_ref.1), obj.clone());
463            if obj.is_package() {
464                self.storage
465                    .package_cache
466                    .lock()
467                    .expect("Cannot lock")
468                    .insert(o_ref.0, obj.clone());
469            }
470        }
471        tokio::task::yield_now().await;
472        Ok(objs)
473    }
474
475    pub async fn multi_download_relevant_packages_and_store(
476        &mut self,
477        objs: Vec<ObjectID>,
478        protocol_version: u64,
479    ) -> Result<Vec<Object>, ReplayEngineError> {
480        let syst_packages_objs = if self.protocol_version.is_some_and(|i| i < 0) {
481            BuiltInFramework::genesis_objects().collect()
482        } else {
483            let syst_packages =
484                self.system_package_versions_for_protocol_version(protocol_version)?;
485            self.multi_download(&syst_packages).await?
486        };
487
488        // Download latest version of all packages that are not system packages
489        // This is okay since the versions can never change
490        let non_system_package_objs: Vec<_> = objs
491            .into_iter()
492            .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
493            .collect();
494        let objs = self
495            .multi_download_latest(&non_system_package_objs)
496            .await?
497            .into_iter()
498            .chain(syst_packages_objs.into_iter());
499
500        for obj in objs.clone() {
501            let o_ref = obj.compute_object_reference();
502            // We dont always want the latest in store
503            // self.storage.store.insert(o_ref.0, obj.clone());
504            self.storage
505                .object_version_cache
506                .lock()
507                .expect("Cannot lock")
508                .insert((o_ref.0, o_ref.1), obj.clone());
509            if obj.is_package() {
510                self.storage
511                    .package_cache
512                    .lock()
513                    .expect("Cannot lock")
514                    .insert(o_ref.0, obj.clone());
515            }
516        }
517        Ok(objs.collect())
518    }
519
520    // TODO: remove this after `futures::executor::block_on` is removed.
521    #[expect(clippy::disallowed_methods)]
522    pub fn download_object(
523        &self,
524        object_id: &ObjectID,
525        version: SequenceNumber,
526    ) -> Result<Object, ReplayEngineError> {
527        if self
528            .storage
529            .object_version_cache
530            .lock()
531            .expect("Cannot lock")
532            .contains_key(&(*object_id, version))
533        {
534            return Ok(self
535                .storage
536                .object_version_cache
537                .lock()
538                .expect("Cannot lock")
539                .get(&(*object_id, version))
540                .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
541                    id: *object_id,
542                    version: Some(version),
543                })?
544                .clone());
545        }
546
547        let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
548            q.pop().unwrap_or_else(|| {
549                panic!(
550                    "Downloaded obj response cannot be empty {:?}",
551                    (*object_id, version)
552                )
553            })
554        })?;
555
556        let o_ref = o.compute_object_reference();
557        self.storage
558            .object_version_cache
559            .lock()
560            .expect("Cannot lock")
561            .insert((o_ref.0, o_ref.1), o.clone());
562        Ok(o)
563    }
564
565    // TODO: remove this after `futures::executor::block_on` is removed.
566    #[expect(clippy::disallowed_methods)]
567    pub fn download_latest_object(
568        &self,
569        object_id: &ObjectID,
570    ) -> Result<Option<Object>, ReplayEngineError> {
571        let resp = block_on({
572            // info!("Downloading latest object {object_id}");
573            self.multi_download_latest(&[*object_id])
574        })
575        .map(|mut q| {
576            q.pop()
577                .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
578        });
579
580        match resp {
581            Ok(v) => Ok(Some(v)),
582            Err(ReplayEngineError::ObjectNotExist { id }) => {
583                error!(
584                    "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
585                );
586                Ok(None)
587            }
588            Err(ReplayEngineError::ObjectDeleted {
589                id,
590                version,
591                digest,
592            }) => {
593                error!("Object {id} {version} {digest} was deleted on RPC server.");
594                Ok(None)
595            }
596            Err(err) => Err(ReplayEngineError::IotaRpcError {
597                err: err.to_string(),
598            }),
599        }
600    }
601
602    #[expect(clippy::disallowed_methods)]
603    pub fn download_object_by_upper_bound(
604        &self,
605        object_id: &ObjectID,
606        version_upper_bound: VersionNumber,
607    ) -> Result<Option<Object>, ReplayEngineError> {
608        let local_object = self
609            .storage
610            .live_objects_store
611            .lock()
612            .expect("Can't lock")
613            .get(object_id)
614            .cloned();
615        if local_object.is_some() {
616            return Ok(local_object);
617        }
618        let response = block_on({
619            self.fetcher
620                .get_child_object(object_id, version_upper_bound)
621        });
622        match response {
623            Ok(object) => {
624                let obj_ref = object.compute_object_reference();
625                self.storage
626                    .live_objects_store
627                    .lock()
628                    .expect("Can't lock")
629                    .insert(*object_id, object.clone());
630                self.storage
631                    .object_version_cache
632                    .lock()
633                    .expect("Can't lock")
634                    .insert((obj_ref.0, obj_ref.1), object.clone());
635                Ok(Some(object))
636            }
637            Err(ReplayEngineError::ObjectNotExist { id }) => {
638                error!(
639                    "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
640                );
641                Ok(None)
642            }
643            Err(ReplayEngineError::ObjectDeleted {
644                id,
645                version,
646                digest,
647            }) => {
648                error!("Object {id} {version} {digest} was deleted on RPC server.");
649                Ok(None)
650            }
651            // This is a child object which was not found in the store (e.g., due to exists
652            // check before creating the dynamic field).
653            Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
654                info!(
655                    "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
656                );
657                Ok(None)
658            }
659            Err(err) => Err(ReplayEngineError::IotaRpcError {
660                err: err.to_string(),
661            }),
662        }
663    }
664
665    pub async fn get_checkpoint_txs(
666        &self,
667        checkpoint_id: u64,
668    ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
669        self.fetcher
670            .get_checkpoint_txs(checkpoint_id)
671            .await
672            .map_err(|e| ReplayEngineError::IotaRpcError { err: e.to_string() })
673    }
674
675    pub async fn execute_all_in_checkpoints(
676        &mut self,
677        checkpoint_ids: &[u64],
678        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
679        terminate_early: bool,
680        use_authority: bool,
681    ) -> Result<(u64, u64), ReplayEngineError> {
682        // Get all the TXs at this checkpoint
683        let mut txs = Vec::new();
684        for checkpoint_id in checkpoint_ids {
685            txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
686        }
687        let num = txs.len();
688        let mut succeeded = 0;
689        for tx in txs {
690            match self
691                .execute_transaction(
692                    &tx,
693                    expensive_safety_check_config.clone(),
694                    use_authority,
695                    None,
696                    None,
697                    None,
698                    None,
699                )
700                .await
701                .map(|q| q.check_effects())
702            {
703                Err(e) | Ok(Err(e)) => {
704                    if terminate_early {
705                        return Err(e);
706                    }
707                    error!("Error executing tx: {},  {:#?}", tx, e);
708                    continue;
709                }
710                _ => (),
711            }
712
713            succeeded += 1;
714        }
715        Ok((succeeded, num as u64))
716    }
717
718    pub async fn execution_engine_execute_with_tx_info_impl(
719        &mut self,
720        tx_info: &OnChainTransactionInfo,
721        override_transaction_kind: Option<TransactionKind>,
722        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
723    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
724        let tx_digest = &tx_info.tx_digest;
725
726        // Initialize the state necessary for execution
727        // Get the input objects
728        let input_objects = self.initialize_execution_env_state(tx_info).await?;
729        assert_eq!(
730            &input_objects.filter_shared_objects().len(),
731            &tx_info.shared_object_refs.len()
732        );
733        // At this point we have all the objects needed for replay
734
735        // This assumes we already initialized the protocol version table
736        // `protocol_version_epoch_table`
737        let protocol_config =
738            &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
739
740        let metrics = self.metrics.clone();
741
742        let ov = self.executor_version;
743
744        // We could probably cache the executor per protocol config
745        let executor = get_executor(
746            ov,
747            protocol_config,
748            expensive_safety_check_config,
749            self.enable_profiler.clone(),
750        );
751
752        // All prep done
753        let expensive_checks = true;
754        let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
755        let certificate_deny_set = HashSet::new();
756        let gas_status = if tx_info.kind.is_system_tx() {
757            IotaGasStatus::new_unmetered()
758        } else {
759            IotaGasStatus::new(
760                tx_info.gas_budget,
761                tx_info.gas_price,
762                tx_info.reference_gas_price,
763                protocol_config,
764            )
765            .expect("Failed to create gas status")
766        };
767        let gas_data = GasData {
768            payment: tx_info.gas.clone(),
769            owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
770            price: tx_info.gas_price,
771            budget: tx_info.gas_budget,
772        };
773        let (inner_store, gas_status, effects, result) = executor.execute_transaction_to_effects(
774            &self,
775            protocol_config,
776            metrics.clone(),
777            expensive_checks,
778            &certificate_deny_set,
779            &tx_info.executed_epoch,
780            tx_info.epoch_start_timestamp,
781            CheckedInputObjects::new_for_replay(input_objects.clone()),
782            gas_data,
783            gas_status,
784            transaction_kind.clone(),
785            tx_info.sender,
786            *tx_digest,
787            &mut None,
788        );
789
790        if let Err(err) = self.pretty_print_for_tracing(
791            &gas_status,
792            &executor,
793            tx_info,
794            &transaction_kind,
795            protocol_config,
796            metrics,
797            expensive_checks,
798            input_objects,
799        ) {
800            error!("Failed to pretty print for tracing: {:?}", err);
801        }
802
803        let all_required_objects = self.storage.all_objects();
804
805        let effects =
806            IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
807
808        Ok(ExecutionSandboxState {
809            transaction_info: tx_info.clone(),
810            required_objects: all_required_objects,
811            local_exec_temporary_store: Some(inner_store),
812            local_exec_effects: effects,
813            local_exec_status: Some(result),
814        })
815    }
816
817    fn pretty_print_for_tracing(
818        &self,
819        gas_status: &IotaGasStatus,
820        executor: &Arc<dyn Executor + Send + Sync>,
821        tx_info: &OnChainTransactionInfo,
822        transaction_kind: &TransactionKind,
823        protocol_config: &ProtocolConfig,
824        metrics: Arc<LimitsMetrics>,
825        expensive_checks: bool,
826        input_objects: InputObjects,
827    ) -> anyhow::Result<()> {
828        trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
829
830        let skip_checks = true;
831        let gas_data = GasData {
832            payment: tx_info.gas.clone(),
833            owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
834            price: tx_info.gas_price,
835            budget: tx_info.gas_budget,
836        };
837        if let ProgrammableTransaction(pt) = transaction_kind {
838            trace!(
839                target: "replay_ptb_info",
840                "{}",
841                Pretty(&FullPTB {
842                    ptb: pt.clone(),
843                    results: transform_command_results_to_annotated(
844                        executor,
845                        &self.clone(),
846                        executor.dev_inspect_transaction(
847                            &self,
848                            protocol_config,
849                            metrics,
850                            expensive_checks,
851                            &HashSet::new(),
852                            &tx_info.executed_epoch,
853                            tx_info.epoch_start_timestamp,
854                            CheckedInputObjects::new_for_replay(input_objects),
855                            gas_data,
856                            IotaGasStatus::new(
857                                tx_info.gas_budget,
858                                tx_info.gas_price,
859                                tx_info.reference_gas_price,
860                                protocol_config,
861                            )?,
862                            transaction_kind.clone(),
863                            tx_info.sender,
864                            tx_info.sender_signed_data.digest(),
865                            skip_checks,
866                        )
867                        .3
868                        .unwrap_or_default(),
869                    )?,
870            }));
871        }
872        Ok(())
873    }
874
875    /// Must be called after `init_for_execution`
876    pub async fn execution_engine_execute_impl(
877        &mut self,
878        tx_digest: &TransactionDigest,
879        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
880    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
881        if self.is_remote_replay() {
882            assert!(
883                !self.protocol_version_system_package_table.is_empty()
884                    || !self.protocol_version_epoch_table.is_empty(),
885                "Required tables not populated. Must call `init_for_execution` before executing transactions"
886            );
887        }
888
889        let tx_info = if self.is_remote_replay() {
890            self.resolve_tx_components(tx_digest).await?
891        } else {
892            self.resolve_tx_components_from_dump(tx_digest).await?
893        };
894        self.execution_engine_execute_with_tx_info_impl(
895            &tx_info,
896            None,
897            expensive_safety_check_config,
898        )
899        .await
900    }
901
902    /// Executes a transaction with the state specified in `pre_run_sandbox`
903    /// This is useful for executing a transaction with a specific state
904    /// However if the state in invalid, the behavior is undefined.
905    pub async fn certificate_execute_with_sandbox_state(
906        pre_run_sandbox: &ExecutionSandboxState,
907    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
908        // These cannot be changed and are inherited from the sandbox state
909        let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
910        let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
911        let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
912        let protocol_config = ProtocolConfig::get_for_version(
913            pre_run_sandbox.transaction_info.protocol_version,
914            pre_run_sandbox.transaction_info.chain,
915        );
916        let required_objects = pre_run_sandbox.required_objects.clone();
917        let store = InMemoryStorage::new(required_objects.clone());
918
919        let transaction =
920            Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
921
922        // TODO: This will not work for deleted shared objects. We need to persist that
923        // information in the sandbox. TODO: A lot of the following code is
924        // replicated in several places. We should introduce a few traits and
925        // make them shared so that we don't have to fix one by one when we have major
926        // execution layer changes.
927        let input_objects = store.read_input_objects_for_transaction(&transaction);
928        let executable = VerifiedExecutableTransaction::new_from_quorum_execution(
929            VerifiedTransaction::new_unchecked(transaction),
930            executed_epoch,
931        );
932        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
933            &executable,
934            input_objects,
935            &protocol_config,
936            reference_gas_price,
937        )
938        .unwrap();
939        let (kind, signer, gas_data) = executable.transaction_data().execution_parts();
940        let executor = iota_execution::executor(&protocol_config, true, None).unwrap();
941        let (_, _, effects, exec_res) = executor.execute_transaction_to_effects(
942            &store,
943            &protocol_config,
944            Arc::new(LimitsMetrics::new(&Registry::new())),
945            true,
946            &HashSet::new(),
947            &executed_epoch,
948            epoch_start_timestamp,
949            input_objects,
950            gas_data,
951            gas_status,
952            kind,
953            signer,
954            *executable.digest(),
955            &mut None,
956        );
957
958        let effects =
959            IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
960
961        Ok(ExecutionSandboxState {
962            transaction_info: pre_run_sandbox.transaction_info.clone(),
963            required_objects,
964            local_exec_temporary_store: None, // We dont capture it for cert exec run
965            local_exec_effects: effects,
966            local_exec_status: Some(exec_res),
967        })
968    }
969
970    /// Must be called after `init_for_execution`
971    /// This executes from
972    /// `iota_core::authority::AuthorityState::try_execute_immediately`
973    pub async fn certificate_execute(
974        &mut self,
975        tx_digest: &TransactionDigest,
976        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
977    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
978        // Use the lighterweight execution engine to get the pre-run state
979        let pre_run_sandbox = self
980            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
981            .await?;
982        Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
983    }
984
985    /// Must be called after `init_for_execution`
986    /// This executes from
987    /// `iota_adapter::execution_engine::execute_transaction_to_effects`
988    pub async fn execution_engine_execute(
989        &mut self,
990        tx_digest: &TransactionDigest,
991        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
992    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
993        let sandbox_state = self
994            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
995            .await?;
996
997        Ok(sandbox_state)
998    }
999
1000    pub async fn execute_state_dump(
1001        &mut self,
1002        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1003    ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
1004        assert!(!self.is_remote_replay());
1005
1006        let d = match self.fetcher.clone() {
1007            Fetchers::NodeStateDump(d) => d,
1008            _ => panic!("Invalid fetcher for state dump"),
1009        };
1010        let tx_digest = d.node_state_dump.clone().tx_digest;
1011        let sandbox_state = self
1012            .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1013            .await?;
1014
1015        Ok((sandbox_state, d.node_state_dump))
1016    }
1017
1018    pub async fn execute_transaction(
1019        &mut self,
1020        tx_digest: &TransactionDigest,
1021        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1022        use_authority: bool,
1023        executor_version: Option<i64>,
1024        protocol_version: Option<i64>,
1025        enable_profiler: Option<PathBuf>,
1026        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1027    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1028        self.executor_version = executor_version;
1029        self.protocol_version = protocol_version;
1030        self.enable_profiler = enable_profiler;
1031        self.config_and_versions = config_and_versions;
1032        if use_authority {
1033            self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1034                .await
1035        } else {
1036            self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1037                .await
1038        }
1039    }
1040    fn system_package_ids(_protocol_version: u64) -> Vec<ObjectID> {
1041        BuiltInFramework::all_package_ids()
1042    }
1043
1044    /// This is the only function which accesses the network during execution
1045    pub fn get_or_download_object(
1046        &self,
1047        obj_id: &ObjectID,
1048        package_expected: bool,
1049    ) -> Result<Option<Object>, ReplayEngineError> {
1050        if package_expected {
1051            if let Some(obj) = self
1052                .storage
1053                .package_cache
1054                .lock()
1055                .expect("Cannot lock")
1056                .get(obj_id)
1057            {
1058                return Ok(Some(obj.clone()));
1059            };
1060            // Check if its a system package because we must've downloaded all
1061            // TODO: Will return this check once we can download completely for
1062            // other networks assert!(
1063            //     !self.system_package_ids().contains(obj_id),
1064            //     "All system packages should be downloaded already"
1065            // );
1066        } else if let Some(obj) = self
1067            .storage
1068            .live_objects_store
1069            .lock()
1070            .expect("Can't lock")
1071            .get(obj_id)
1072        {
1073            return Ok(Some(obj.clone()));
1074        }
1075
1076        let Some(o) = self.download_latest_object(obj_id)? else {
1077            return Ok(None);
1078        };
1079
1080        if o.is_package() {
1081            assert!(
1082                package_expected,
1083                "Did not expect package but downloaded object is a package: {obj_id}"
1084            );
1085
1086            self.storage
1087                .package_cache
1088                .lock()
1089                .expect("Cannot lock")
1090                .insert(*obj_id, o.clone());
1091        }
1092        let o_ref = o.compute_object_reference();
1093        self.storage
1094            .object_version_cache
1095            .lock()
1096            .expect("Cannot lock")
1097            .insert((o_ref.0, o_ref.1), o.clone());
1098        Ok(Some(o))
1099    }
1100
1101    pub fn is_remote_replay(&self) -> bool {
1102        matches!(self.fetcher, Fetchers::Remote(_))
1103    }
1104
1105    /// Must be called after `populate_protocol_version_tables`
1106    pub fn system_package_versions_for_protocol_version(
1107        &self,
1108        protocol_version: u64,
1109    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1110        match &self.fetcher {
1111            Fetchers::Remote(_) => Ok(self
1112                .protocol_version_system_package_table
1113                .get(&protocol_version)
1114                .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1115                    protocol_version,
1116                })?
1117                .clone()
1118                .into_iter()
1119                .collect()),
1120
1121            Fetchers::NodeStateDump(d) => Ok(d
1122                .node_state_dump
1123                .relevant_system_packages
1124                .iter()
1125                .map(|w| (w.id, w.version, w.digest))
1126                .map(|q| (q.0, q.1))
1127                .collect()),
1128        }
1129    }
1130
1131    pub async fn protocol_ver_to_epoch_map(
1132        &self,
1133    ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1134        let mut range_map = BTreeMap::new();
1135        let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1136
1137        // Exception for Genesis: Protocol version 1 at epoch 0
1138        let mut tx_digest = *self
1139            .fetcher
1140            .get_checkpoint_txs(0)
1141            .await?
1142            .first()
1143            .expect("Genesis TX must be in first checkpoint");
1144        // Somehow the genesis TX did not emit any event, but we know it was the start
1145        // of version 1 So we need to manually add this range
1146        let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1147            (0, 1, Some(0u64));
1148
1149        let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1150            (start_epoch, start_protocol_version, start_checkpoint);
1151
1152        (start_epoch, start_protocol_version, start_checkpoint) =
1153            (curr_epoch, curr_protocol_version, curr_checkpoint);
1154
1155        // This is the final tx digest for the epoch change. We need this to track the
1156        // final checkpoint
1157        let mut end_epoch_tx_digest = tx_digest;
1158
1159        for event in epoch_change_events {
1160            (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1161            end_epoch_tx_digest = event.id.tx_digest;
1162
1163            if start_protocol_version == curr_protocol_version {
1164                // Same range
1165                continue;
1166            }
1167
1168            // Change in prot version
1169            // Find the last checkpoint
1170            curr_checkpoint = self
1171                .fetcher
1172                .get_transaction(&event.id.tx_digest)
1173                .await?
1174                .checkpoint;
1175            // Insert the last range
1176            range_map.insert(
1177                start_protocol_version,
1178                ProtocolVersionSummary {
1179                    protocol_version: start_protocol_version,
1180                    epoch_start: start_epoch,
1181                    epoch_end: curr_epoch - 1,
1182                    checkpoint_start: start_checkpoint,
1183                    checkpoint_end: curr_checkpoint.map(|x| x - 1),
1184                    epoch_change_tx: tx_digest,
1185                },
1186            );
1187
1188            start_epoch = curr_epoch;
1189            start_protocol_version = curr_protocol_version;
1190            tx_digest = event.id.tx_digest;
1191            start_checkpoint = curr_checkpoint;
1192        }
1193
1194        // Insert the last range
1195        range_map.insert(
1196            curr_protocol_version,
1197            ProtocolVersionSummary {
1198                protocol_version: curr_protocol_version,
1199                epoch_start: start_epoch,
1200                epoch_end: curr_epoch,
1201                checkpoint_start: curr_checkpoint,
1202                checkpoint_end: self
1203                    .fetcher
1204                    .get_transaction(&end_epoch_tx_digest)
1205                    .await?
1206                    .checkpoint,
1207                epoch_change_tx: tx_digest,
1208            },
1209        );
1210
1211        Ok(range_map)
1212    }
1213
1214    pub fn protocol_version_for_epoch(
1215        epoch: u64,
1216        mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1217    ) -> u64 {
1218        // Naive impl but works for now
1219        // Can improve with range algos & data structures
1220        let mut version = 1;
1221        for (k, v) in mp.iter().rev() {
1222            if v.1 <= epoch {
1223                version = *k;
1224                break;
1225            }
1226        }
1227        version
1228    }
1229
1230    pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1231        self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1232
1233        let system_package_revisions = self.system_package_versions().await?;
1234
1235        // This can be more efficient but small footprint so okay for now
1236        // Table is sorted from earliest to latest
1237        for (
1238            prot_ver,
1239            ProtocolVersionSummary {
1240                epoch_change_tx: tx_digest,
1241                ..
1242            },
1243        ) in self.protocol_version_epoch_table.clone()
1244        {
1245            // Use the previous versions protocol version table
1246            let mut working = if prot_ver <= 1 {
1247                BTreeMap::new()
1248            } else {
1249                self.protocol_version_system_package_table
1250                    .iter()
1251                    .rev()
1252                    .find(|(ver, _)| **ver <= prot_ver)
1253                    .expect("Prev entry must exist")
1254                    .1
1255                    .clone()
1256            };
1257
1258            for (id, versions) in system_package_revisions.iter() {
1259                // Oldest appears first in list, so reverse
1260                for ver in versions.iter().rev() {
1261                    if ver.1 == tx_digest {
1262                        // Found the version for this protocol version
1263                        working.insert(*id, ver.0);
1264                        break;
1265                    }
1266                }
1267            }
1268            self.protocol_version_system_package_table
1269                .insert(prot_ver, working);
1270        }
1271        Ok(())
1272    }
1273
1274    pub async fn system_package_versions(
1275        &self,
1276    ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1277    {
1278        let system_package_ids = Self::system_package_ids(
1279            *self
1280                .protocol_version_epoch_table
1281                .keys()
1282                .peekable()
1283                .last()
1284                .expect("Protocol version epoch table not populated"),
1285        );
1286        let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1287
1288        let mut mapping = BTreeMap::new();
1289
1290        // Extract all the transactions which created or mutated this object
1291        while !system_package_objs.is_empty() {
1292            // For the given object and its version, record the transaction which upgraded
1293            // or created it
1294            let previous_txs: Vec<_> = system_package_objs
1295                .iter()
1296                .map(|o| (o.compute_object_reference(), o.previous_transaction))
1297                .collect();
1298
1299            previous_txs.iter().for_each(|((id, ver, _), tx)| {
1300                mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1301            });
1302
1303            // Next round
1304            // Get the previous version of each object if exists
1305            let previous_ver_refs: Vec<_> = previous_txs
1306                .iter()
1307                .filter_map(|(q, _)| {
1308                    let prev_ver = u64::from(q.1) - 1;
1309                    if prev_ver == 0 {
1310                        None
1311                    } else {
1312                        Some((q.0, SequenceNumber::from(prev_ver)))
1313                    }
1314                })
1315                .collect();
1316            system_package_objs = match self.multi_download(&previous_ver_refs).await {
1317                Ok(packages) => packages,
1318                Err(ReplayEngineError::ObjectNotExist { id }) => {
1319                    // This happens when the RPC server prunes older object
1320                    // Replays in the current protocol version will work but old ones might not
1321                    // as we cannot fetch the package
1322                    warn!(
1323                        "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1324                        id
1325                    );
1326                    break;
1327                }
1328                Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1329                    // This happens when the RPC server prunes older object
1330                    // Replays in the current protocol version will work but old ones might not
1331                    // as we cannot fetch the package
1332                    warn!(
1333                        "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1334                        id, version
1335                    );
1336                    break;
1337                }
1338                Err(ReplayEngineError::ObjectVersionTooHigh {
1339                    id,
1340                    asked_version,
1341                    latest_version,
1342                }) => {
1343                    warn!(
1344                        "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1345                        id, asked_version, latest_version
1346                    );
1347                    break;
1348                }
1349                Err(ReplayEngineError::ObjectDeleted {
1350                    id,
1351                    version,
1352                    digest,
1353                }) => {
1354                    // This happens when the RPC server prunes older object
1355                    // Replays in the current protocol version will work but old ones might not
1356                    // as we cannot fetch the package
1357                    warn!(
1358                        "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1359                        id, version, digest
1360                    );
1361                    break;
1362                }
1363                Err(e) => return Err(e),
1364            };
1365        }
1366        Ok(mapping)
1367    }
1368
1369    pub async fn get_protocol_config(
1370        &self,
1371        epoch_id: EpochId,
1372        chain: Chain,
1373    ) -> Result<ProtocolConfig, ReplayEngineError> {
1374        match self.protocol_version {
1375            Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1376            Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1377            None => self
1378                .protocol_version_epoch_table
1379                .iter()
1380                .rev()
1381                .find(|(_, rg)| epoch_id >= rg.epoch_start)
1382                .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1383                .unwrap_or_else(|| {
1384                    Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1385                }),
1386        }
1387    }
1388
1389    pub async fn checkpoints_for_epoch(
1390        &self,
1391        epoch_id: u64,
1392    ) -> Result<(u64, u64), ReplayEngineError> {
1393        let epoch_change_events = self
1394            .fetcher
1395            .get_epoch_change_events(true)
1396            .await?
1397            .into_iter()
1398            .collect::<Vec<_>>();
1399        let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1400            (0, 1)
1401        } else {
1402            let idx = epoch_change_events
1403                .iter()
1404                .position(|ev| match extract_epoch_and_version(ev.clone()) {
1405                    Ok((epoch, _)) => epoch == epoch_id,
1406                    Err(_) => false,
1407                })
1408                .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1409            let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1410            (
1411                self.fetcher
1412                    .get_transaction(&epoch_change_tx)
1413                    .await?
1414                    .checkpoint
1415                    .unwrap_or_else(|| {
1416                        panic!(
1417                            "Checkpoint for transaction {epoch_change_tx} not present. Could be due to pruning"
1418                        )
1419                    }),
1420                idx,
1421            )
1422        };
1423
1424        let next_epoch_change_tx = epoch_change_events
1425            .get(start_epoch_idx + 1)
1426            .map(|v| v.id.tx_digest)
1427            .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1428
1429        let next_epoch_checkpoint = self
1430            .fetcher
1431            .get_transaction(&next_epoch_change_tx)
1432            .await?
1433            .checkpoint
1434            .unwrap_or_else(|| {
1435                panic!(
1436                    "Checkpoint for transaction {next_epoch_change_tx} not present. Could be due to pruning"
1437                )
1438            });
1439
1440        Ok((start_checkpoint, next_epoch_checkpoint - 1))
1441    }
1442
1443    pub async fn get_epoch_start_timestamp_and_rgp(
1444        &self,
1445        epoch_id: u64,
1446        tx_digest: &TransactionDigest,
1447    ) -> Result<(u64, u64), ReplayEngineError> {
1448        if epoch_id == 0 {
1449            return Err(ReplayEngineError::TransactionNotSupported {
1450                digest: *tx_digest,
1451                reason: "Transactions from epoch 0 not supported".to_string(),
1452            });
1453        }
1454        self.fetcher
1455            .get_epoch_start_timestamp_and_rgp(epoch_id)
1456            .await
1457    }
1458
1459    fn add_config_objects_if_needed(
1460        &self,
1461        status: &IotaExecutionStatus,
1462    ) -> Vec<(ObjectID, SequenceNumber)> {
1463        match parse_effect_error_for_denied_coins(status) {
1464            Some(coin_type) => {
1465                let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1466                    panic!(
1467                        "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1468                    );
1469                };
1470                // NB: the version of the deny list object doesn't matter
1471                if !config_id_and_version
1472                    .iter()
1473                    .any(|(id, _)| id == &IOTA_DENY_LIST_OBJECT_ID)
1474                {
1475                    let deny_list_oid_version = self.download_latest_object(&IOTA_DENY_LIST_OBJECT_ID)
1476                        .ok()
1477                        .flatten()
1478                        .expect("Unable to download the deny list object for a transaction that requires it")
1479                        .version();
1480                    config_id_and_version.push((IOTA_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1481                }
1482                config_id_and_version
1483            }
1484            None => vec![],
1485        }
1486    }
1487
1488    async fn resolve_tx_components(
1489        &self,
1490        tx_digest: &TransactionDigest,
1491    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1492        assert!(self.is_remote_replay());
1493        // Fetch full transaction content
1494        let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1495        let sender = match tx_info.clone().transaction.unwrap().data {
1496            iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.sender,
1497        };
1498        let IotaTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1499
1500        let config_objects = self.add_config_objects_if_needed(effects.status());
1501
1502        let raw_tx_bytes = tx_info.clone().raw_transaction;
1503        let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1504        let input_objs = orig_tx
1505            .transaction_data()
1506            .input_objects()
1507            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1508        let tx_kind_orig = orig_tx.transaction_data().kind();
1509
1510        // Download the objects at the version right before the execution of this TX
1511        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1512
1513        let shared_object_refs: Vec<ObjectRef> = effects
1514            .shared_objects()
1515            .iter()
1516            .map(|so_ref| {
1517                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1518                    unimplemented!(
1519                        "Replay of deleted shared object transactions is not supported yet"
1520                    );
1521                } else {
1522                    so_ref.to_object_ref()
1523                }
1524            })
1525            .collect();
1526        let gas_data = match tx_info.clone().transaction.unwrap().data {
1527            iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.gas_data,
1528        };
1529        let gas_object_refs: Vec<_> = gas_data
1530            .payment
1531            .iter()
1532            .map(|obj_ref| obj_ref.to_object_ref())
1533            .collect();
1534        let receiving_objs = orig_tx
1535            .transaction_data()
1536            .receiving_objects()
1537            .into_iter()
1538            .map(|(obj_id, version, _)| (obj_id, version))
1539            .collect();
1540
1541        let epoch_id = effects.executed_epoch;
1542        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1543
1544        // Extract the epoch start timestamp
1545        let (epoch_start_timestamp, reference_gas_price) = self
1546            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1547            .await?;
1548
1549        Ok(OnChainTransactionInfo {
1550            kind: tx_kind_orig.clone(),
1551            sender,
1552            modified_at_versions,
1553            input_objects: input_objs,
1554            shared_object_refs,
1555            gas: gas_object_refs,
1556            gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1557            gas_budget: gas_data.budget,
1558            gas_price: gas_data.price,
1559            executed_epoch: epoch_id,
1560            dependencies: effects.dependencies().to_vec(),
1561            effects: IotaTransactionBlockEffects::V1(effects),
1562            receiving_objs,
1563            config_objects,
1564            // Find the protocol version for this epoch
1565            // This assumes we already initialized the protocol version table
1566            // `protocol_version_epoch_table`
1567            protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1568            tx_digest: *tx_digest,
1569            epoch_start_timestamp,
1570            sender_signed_data: orig_tx.clone(),
1571            reference_gas_price,
1572            chain,
1573        })
1574    }
1575
1576    async fn resolve_tx_components_from_dump(
1577        &self,
1578        tx_digest: &TransactionDigest,
1579    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1580        assert!(!self.is_remote_replay());
1581
1582        let dp = self.fetcher.as_node_state_dump();
1583
1584        let sender = dp
1585            .node_state_dump
1586            .sender_signed_data
1587            .transaction_data()
1588            .sender();
1589        let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1590        let effects = dp.node_state_dump.computed_effects.clone();
1591        let effects = IotaTransactionBlockEffects::try_from(effects).unwrap();
1592        // Config objects don't show up in the node state dump so they need to be
1593        // provided.
1594        let config_objects = self.add_config_objects_if_needed(effects.status());
1595
1596        // Fetch full transaction content
1597        // let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1598
1599        let input_objs = orig_tx
1600            .transaction_data()
1601            .input_objects()
1602            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1603        let tx_kind_orig = orig_tx.transaction_data().kind();
1604
1605        // Download the objects at the version right before the execution of this TX
1606        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1607
1608        let shared_object_refs: Vec<ObjectRef> = effects
1609            .shared_objects()
1610            .iter()
1611            .map(|so_ref| {
1612                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1613                    unimplemented!(
1614                        "Replay of deleted shared object transactions is not supported yet"
1615                    );
1616                } else {
1617                    so_ref.to_object_ref()
1618                }
1619            })
1620            .collect();
1621        let receiving_objs = orig_tx
1622            .transaction_data()
1623            .receiving_objects()
1624            .into_iter()
1625            .map(|(obj_id, version, _)| (obj_id, version))
1626            .collect();
1627
1628        let epoch_id = dp.node_state_dump.executed_epoch;
1629
1630        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1631
1632        let protocol_config =
1633            ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1634        // Extract the epoch start timestamp
1635        let (epoch_start_timestamp, reference_gas_price) = self
1636            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1637            .await?;
1638        let gas_data = orig_tx.transaction_data().gas_data();
1639        let gas_object_refs: Vec<_> = gas_data.clone().payment;
1640
1641        Ok(OnChainTransactionInfo {
1642            kind: tx_kind_orig.clone(),
1643            sender,
1644            modified_at_versions,
1645            input_objects: input_objs,
1646            shared_object_refs,
1647            gas: gas_object_refs,
1648            gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1649            gas_budget: gas_data.budget,
1650            gas_price: gas_data.price,
1651            executed_epoch: epoch_id,
1652            dependencies: effects.dependencies().to_vec(),
1653            effects,
1654            receiving_objs,
1655            config_objects,
1656            protocol_version: protocol_config.version,
1657            tx_digest: *tx_digest,
1658            epoch_start_timestamp,
1659            sender_signed_data: orig_tx.clone(),
1660            reference_gas_price,
1661            chain,
1662        })
1663    }
1664
1665    async fn resolve_download_input_objects(
1666        &mut self,
1667        tx_info: &OnChainTransactionInfo,
1668        deleted_shared_objects: Vec<ObjectRef>,
1669    ) -> Result<InputObjects, ReplayEngineError> {
1670        // Download the input objects
1671        let mut package_inputs = vec![];
1672        let mut imm_owned_inputs = vec![];
1673        let mut shared_inputs = vec![];
1674        let mut deleted_shared_info_map = BTreeMap::new();
1675
1676        // for deleted shared objects, we need to look at the transaction dependencies
1677        // to find the correct transaction dependency for a deleted shared
1678        // object.
1679        if !deleted_shared_objects.is_empty() {
1680            for tx_digest in tx_info.dependencies.iter() {
1681                let tx_info = self.resolve_tx_components(tx_digest).await?;
1682                for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1683                    deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1684                }
1685            }
1686        }
1687
1688        tx_info
1689            .input_objects
1690            .iter()
1691            .map(|kind| match kind {
1692                InputObjectKind::MovePackage(i) => {
1693                    package_inputs.push(*i);
1694                    Ok(())
1695                }
1696                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1697                    imm_owned_inputs.push((o_ref.0, o_ref.1));
1698                    Ok(())
1699                }
1700                InputObjectKind::SharedMoveObject {
1701                    id,
1702                    initial_shared_version: _,
1703                    mutable: _,
1704                } if !deleted_shared_info_map.contains_key(id) => {
1705                    // We already downloaded
1706                    if let Some(o) = self
1707                        .storage
1708                        .live_objects_store
1709                        .lock()
1710                        .expect("Can't lock")
1711                        .get(id)
1712                    {
1713                        shared_inputs.push(o.clone());
1714                        Ok(())
1715                    } else {
1716                        Err(ReplayEngineError::InternalCacheInvariantViolation {
1717                            id: *id,
1718                            version: None,
1719                        })
1720                    }
1721                }
1722                _ => Ok(()),
1723            })
1724            .collect::<Result<Vec<_>, _>>()?;
1725
1726        // Download the imm and owned objects
1727        let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1728
1729        // For packages, download latest if non framework
1730        // If framework, download relevant for the current protocol version
1731        in_objs.extend(
1732            self.multi_download_relevant_packages_and_store(
1733                package_inputs,
1734                tx_info.protocol_version.as_u64(),
1735            )
1736            .await?,
1737        );
1738        // Add shared objects
1739        in_objs.extend(shared_inputs);
1740
1741        // TODO(Zhe): Account for cancelled transaction assigned version here, and
1742        // tests.
1743        let resolved_input_objs = tx_info
1744            .input_objects
1745            .iter()
1746            .flat_map(|kind| match kind {
1747                InputObjectKind::MovePackage(i) => {
1748                    // Okay to unwrap since we downloaded it
1749                    Some(ObjectReadResult::new(
1750                        *kind,
1751                        self.storage
1752                            .package_cache
1753                            .lock()
1754                            .expect("Cannot lock")
1755                            .get(i)
1756                            .unwrap_or(
1757                                &self
1758                                    .download_latest_object(i)
1759                                    .expect("Object download failed")
1760                                    .expect("Object not found on chain"),
1761                            )
1762                            .clone()
1763                            .into(),
1764                    ))
1765                }
1766                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1767                    *kind,
1768                    self.storage
1769                        .object_version_cache
1770                        .lock()
1771                        .expect("Cannot lock")
1772                        .get(&(o_ref.0, o_ref.1))
1773                        .unwrap()
1774                        .clone()
1775                        .into(),
1776                )),
1777                InputObjectKind::SharedMoveObject { id, .. }
1778                    if !deleted_shared_info_map.contains_key(id) =>
1779                {
1780                    // we already downloaded
1781                    Some(ObjectReadResult::new(
1782                        *kind,
1783                        self.storage
1784                            .live_objects_store
1785                            .lock()
1786                            .expect("Can't lock")
1787                            .get(id)
1788                            .unwrap()
1789                            .clone()
1790                            .into(),
1791                    ))
1792                }
1793                InputObjectKind::SharedMoveObject { id, .. } => {
1794                    let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1795                    Some(ObjectReadResult::new(
1796                        *kind,
1797                        ObjectReadResultKind::DeletedSharedObject(*version, *digest),
1798                    ))
1799                }
1800            })
1801            .collect();
1802
1803        Ok(InputObjects::new(resolved_input_objs))
1804    }
1805
1806    /// Given the OnChainTransactionInfo, download and store the input objects,
1807    /// and other info necessary for execution
1808    async fn initialize_execution_env_state(
1809        &mut self,
1810        tx_info: &OnChainTransactionInfo,
1811    ) -> Result<InputObjects, ReplayEngineError> {
1812        // We need this for other activities in this session
1813        self.current_protocol_version = tx_info.protocol_version.as_u64();
1814
1815        // Download the objects at the version right before the execution of this TX
1816        self.multi_download_and_store(&tx_info.modified_at_versions)
1817            .await?;
1818
1819        let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1820            .shared_object_refs
1821            .iter()
1822            .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1823
1824        // Download shared objects at the version right before the execution of this TX
1825        let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1826        self.multi_download_and_store(&shared_refs).await?;
1827
1828        // Download gas (although this should already be in cache from modified at
1829        // versions?)
1830        let gas_refs: Vec<_> = tx_info
1831            .gas
1832            .iter()
1833            .filter_map(|w| (w.0 != ObjectID::ZERO).then_some((w.0, w.1)))
1834            .collect();
1835        self.multi_download_and_store(&gas_refs).await?;
1836
1837        // Fetch the input objects we know from the raw transaction
1838        let input_objs = self
1839            .resolve_download_input_objects(tx_info, deleted_shared_refs)
1840            .await?;
1841
1842        // Fetch the receiving objects
1843        self.multi_download_and_store(&tx_info.receiving_objs)
1844            .await?;
1845
1846        // Fetch specified config objects if any
1847        self.multi_download_and_store(&tx_info.config_objects)
1848            .await?;
1849
1850        // Prep the object runtime for dynamic fields
1851        // Download the child objects accessed at the version right before the execution
1852        // of this TX
1853        let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1854        self.multi_download_and_store(&loaded_child_refs).await?;
1855        tokio::task::yield_now().await;
1856
1857        Ok(input_objs)
1858    }
1859}
1860
1861// <---------------------  Implement necessary traits for LocalExec to work with
1862// exec engine ----------------------->
1863
1864impl BackingPackageStore for LocalExec {
1865    /// In this case we might need to download a dependency package which was
1866    /// not present in the modified at versions list because packages are
1867    /// immutable
1868    fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1869        fn inner(self_: &LocalExec, package_id: &ObjectID) -> IotaResult<Option<Object>> {
1870            // If package not present fetch it from the network
1871            self_
1872                .get_or_download_object(package_id, true /* we expect a Move package */)
1873                .map_err(|e| IotaError::Storage(e.to_string()))
1874        }
1875
1876        let res = inner(self, package_id);
1877        self.exec_store_events
1878            .lock()
1879            .expect("Unable to lock events list")
1880            .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1881                package_id: *package_id,
1882                result: res.clone(),
1883            });
1884        res.map(|o| o.map(PackageObject::new))
1885    }
1886}
1887
1888impl ChildObjectResolver for LocalExec {
1889    /// This uses `get_object`, which does not download from the network
1890    /// Hence all objects must be in store already
1891    fn read_child_object(
1892        &self,
1893        parent: &ObjectID,
1894        child: &ObjectID,
1895        child_version_upper_bound: SequenceNumber,
1896    ) -> IotaResult<Option<Object>> {
1897        fn inner(
1898            self_: &LocalExec,
1899            parent: &ObjectID,
1900            child: &ObjectID,
1901            child_version_upper_bound: SequenceNumber,
1902        ) -> IotaResult<Option<Object>> {
1903            let child_object =
1904                match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1905                    None => return Ok(None),
1906                    Some(o) => o,
1907                };
1908            let child_version = child_object.version();
1909            if child_object.version() > child_version_upper_bound {
1910                return Err(IotaError::Unknown(format!(
1911                    "Invariant Violation. Replay loaded child_object {child} at version \
1912                    {child_version} but expected the version to be <= {child_version_upper_bound}"
1913                )));
1914            }
1915            let parent = *parent;
1916            if child_object.owner != Owner::ObjectOwner(parent.into()) {
1917                return Err(IotaError::InvalidChildObjectAccess {
1918                    object: *child,
1919                    given_parent: parent,
1920                    actual_owner: child_object.owner,
1921                });
1922            }
1923            Ok(Some(child_object))
1924        }
1925
1926        let res = inner(self, parent, child, child_version_upper_bound);
1927        self.exec_store_events
1928            .lock()
1929            .expect("Unable to lock events list")
1930            .push(
1931                ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1932                    parent: *parent,
1933                    child: *child,
1934                    result: res.clone(),
1935                },
1936            );
1937        res
1938    }
1939
1940    fn get_object_received_at_version(
1941        &self,
1942        owner: &ObjectID,
1943        receiving_object_id: &ObjectID,
1944        receive_object_at_version: SequenceNumber,
1945        _epoch_id: EpochId,
1946    ) -> IotaResult<Option<Object>> {
1947        fn inner(
1948            self_: &LocalExec,
1949            owner: &ObjectID,
1950            receiving_object_id: &ObjectID,
1951            receive_object_at_version: SequenceNumber,
1952        ) -> IotaResult<Option<Object>> {
1953            let recv_object = match self_.try_get_object(receiving_object_id)? {
1954                None => return Ok(None),
1955                Some(o) => o,
1956            };
1957            if recv_object.version() != receive_object_at_version {
1958                return Err(IotaError::Unknown(format!(
1959                    "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
1960                    {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
1961                )));
1962            }
1963            if recv_object.owner != Owner::AddressOwner((*owner).into()) {
1964                return Ok(None);
1965            }
1966            Ok(Some(recv_object))
1967        }
1968
1969        let res = inner(self, owner, receiving_object_id, receive_object_at_version);
1970        self.exec_store_events
1971            .lock()
1972            .expect("Unable to lock events list")
1973            .push(ExecutionStoreEvent::ReceiveObject {
1974                owner: *owner,
1975                receive: *receiving_object_id,
1976                receive_at_version: receive_object_at_version,
1977                result: res.clone(),
1978            });
1979        res
1980    }
1981}
1982
1983impl ResourceResolver for LocalExec {
1984    type Error = IotaError;
1985
1986    /// In this case we might need to download a Move object on the fly which
1987    /// was not present in the modified at versions list because packages
1988    /// are immutable
1989    fn get_resource(
1990        &self,
1991        address: &AccountAddress,
1992        type_: &StructTag,
1993    ) -> IotaResult<Option<Vec<u8>>> {
1994        fn inner(
1995            self_: &LocalExec,
1996            address: &AccountAddress,
1997            type_: &StructTag,
1998        ) -> IotaResult<Option<Vec<u8>>> {
1999            // If package not present fetch it from the network or some remote location
2000            let Some(object) = self_.get_or_download_object(
2001                &ObjectID::from(*address),
2002                false, // we expect a Move obj
2003            )?
2004            else {
2005                return Ok(None);
2006            };
2007
2008            match &object.data {
2009                Data::Move(m) => {
2010                    assert!(
2011                        m.is_type(type_),
2012                        "Invariant violation: ill-typed object in storage \
2013                        or bad object request from caller"
2014                    );
2015                    Ok(Some(m.contents().to_vec()))
2016                }
2017                other => unimplemented!(
2018                    "Bad object lookup: expected Move object, but got {:#?}",
2019                    other
2020                ),
2021            }
2022        }
2023
2024        let res = inner(self, address, type_);
2025        self.exec_store_events
2026            .lock()
2027            .expect("Unable to lock events list")
2028            .push(ExecutionStoreEvent::ResourceResolverGetResource {
2029                address: *address,
2030                typ: type_.clone(),
2031                result: res.clone(),
2032            });
2033        res
2034    }
2035}
2036
2037impl ModuleResolver for LocalExec {
2038    type Error = IotaError;
2039
2040    /// This fetches a module which must already be present in the store
2041    /// We do not download
2042    fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2043        fn inner(self_: &LocalExec, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2044            get_module(self_, module_id)
2045        }
2046
2047        let res = inner(self, module_id);
2048        self.exec_store_events
2049            .lock()
2050            .expect("Unable to lock events list")
2051            .push(ExecutionStoreEvent::ModuleResolverGetModule {
2052                module_id: module_id.clone(),
2053                result: res.clone(),
2054            });
2055        res
2056    }
2057}
2058
2059impl ModuleResolver for &mut LocalExec {
2060    type Error = IotaError;
2061
2062    fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2063        // Recording event here will be double-counting since its already recorded in
2064        // the get_module fn
2065        (**self).get_module(module_id)
2066    }
2067}
2068
2069impl ObjectStore for LocalExec {
2070    /// The object must be present in store by normal process we used to
2071    /// backfill store in init We dont download if not present
2072    fn try_get_object(
2073        &self,
2074        object_id: &ObjectID,
2075    ) -> iota_types::storage::error::Result<Option<Object>> {
2076        let res = self
2077            .storage
2078            .live_objects_store
2079            .lock()
2080            .expect("Can't lock")
2081            .get(object_id)
2082            .cloned();
2083        self.exec_store_events
2084            .lock()
2085            .expect("Unable to lock events list")
2086            .push(ExecutionStoreEvent::ObjectStoreGetObject {
2087                object_id: *object_id,
2088                result: Ok(res.clone()),
2089            });
2090        Ok(res)
2091    }
2092
2093    /// The object must be present in store by normal process we used to
2094    /// backfill store in init We dont download if not present
2095    fn try_get_object_by_key(
2096        &self,
2097        object_id: &ObjectID,
2098        version: VersionNumber,
2099    ) -> iota_types::storage::error::Result<Option<Object>> {
2100        let res = self
2101            .storage
2102            .live_objects_store
2103            .lock()
2104            .expect("Can't lock")
2105            .get(object_id)
2106            .and_then(|obj| {
2107                if obj.version() == version {
2108                    Some(obj.clone())
2109                } else {
2110                    None
2111                }
2112            });
2113
2114        self.exec_store_events
2115            .lock()
2116            .expect("Unable to lock events list")
2117            .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2118                object_id: *object_id,
2119                version,
2120                result: Ok(res.clone()),
2121            });
2122
2123        Ok(res)
2124    }
2125}
2126
2127impl ObjectStore for &mut LocalExec {
2128    fn try_get_object(
2129        &self,
2130        object_id: &ObjectID,
2131    ) -> iota_types::storage::error::Result<Option<Object>> {
2132        // Recording event here will be double-counting since its already recorded in
2133        // the get_module fn
2134        (**self).try_get_object(object_id)
2135    }
2136
2137    fn try_get_object_by_key(
2138        &self,
2139        object_id: &ObjectID,
2140        version: VersionNumber,
2141    ) -> iota_types::storage::error::Result<Option<Object>> {
2142        // Recording event here will be double-counting since its already recorded in
2143        // the get_module fn
2144        (**self).try_get_object_by_key(object_id, version)
2145    }
2146}
2147
2148impl GetModule for LocalExec {
2149    type Error = IotaError;
2150    type Item = CompiledModule;
2151
2152    fn get_module_by_id(&self, id: &ModuleId) -> IotaResult<Option<Self::Item>> {
2153        let res = get_module_by_id(self, id);
2154
2155        self.exec_store_events
2156            .lock()
2157            .expect("Unable to lock events list")
2158            .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2159                id: id.clone(),
2160                result: res.clone(),
2161            });
2162        res
2163    }
2164}
2165
2166// <--------------------- Util functions ----------------------->
2167
2168pub fn get_executor(
2169    executor_version_override: Option<i64>,
2170    protocol_config: &ProtocolConfig,
2171    _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2172    enable_profiler: Option<PathBuf>,
2173) -> Arc<dyn Executor + Send + Sync> {
2174    let protocol_config = executor_version_override
2175        .map(|q| {
2176            let ver = if q < 0 {
2177                ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2178            } else {
2179                q as u64
2180            };
2181
2182            let mut c = protocol_config.clone();
2183            c.set_execution_version_for_testing(ver);
2184            c
2185        })
2186        .unwrap_or(protocol_config.clone());
2187
2188    let silent = true;
2189    iota_execution::executor(&protocol_config, silent, enable_profiler)
2190        .expect("Creating an executor should not fail here")
2191}
2192
2193fn parse_effect_error_for_denied_coins(status: &IotaExecutionStatus) -> Option<String> {
2194    let IotaExecutionStatus::Failure { error } = status else {
2195        return None;
2196    };
2197    parse_denied_error_string(error)
2198}
2199
2200fn parse_denied_error_string(error: &str) -> Option<String> {
2201    let regulated_regex = regex::Regex::new(
2202        r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2203    )
2204    .unwrap();
2205
2206    let caps = regulated_regex.captures(error)?;
2207    Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2208}
2209
2210#[cfg(test)]
2211mod tests {
2212    use super::parse_denied_error_string;
2213    #[test]
2214    fn test_regex_regulated_coin_errors() {
2215        let test_bank = vec![
2216            "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2217            "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2218        ];
2219        let expected_string =
2220            "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2221
2222        for test in &test_bank {
2223            assert!(parse_denied_error_string(test).unwrap() == expected_string);
2224        }
2225    }
2226}