iota_replay/
replay.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashSet},
7    path::PathBuf,
8    sync::{Arc, Mutex},
9};
10
11use futures::executor::block_on;
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_core::authority::NodeStateDump;
14use iota_execution::Executor;
15use iota_framework::BuiltInFramework;
16use iota_json_rpc_types::{
17    IotaExecutionStatus, IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI,
18};
19use iota_protocol_config::{Chain, ProtocolConfig};
20use iota_sdk::{IotaClient, IotaClientBuilder};
21use iota_types::{
22    IOTA_DENY_LIST_OBJECT_ID,
23    base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
24    committee::EpochId,
25    digests::{ObjectDigest, TransactionDigest},
26    error::{ExecutionError, IotaError, IotaResult},
27    executable_transaction::VerifiedExecutableTransaction,
28    gas::IotaGasStatus,
29    in_memory_storage::InMemoryStorage,
30    inner_temporary_store::InnerTemporaryStore,
31    message_envelope::Message,
32    metrics::LimitsMetrics,
33    object::{Data, Object, Owner},
34    storage::{
35        BackingPackageStore, ChildObjectResolver, ObjectStore, PackageObject, get_module,
36        get_module_by_id,
37    },
38    transaction::{
39        CheckedInputObjects, InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
40        SenderSignedData, Transaction, TransactionDataAPI, TransactionKind,
41        TransactionKind::ProgrammableTransaction, VerifiedTransaction,
42    },
43};
44use move_binary_format::CompiledModule;
45use move_bytecode_utils::module_cache::GetModule;
46use move_core_types::{
47    account_address::AccountAddress,
48    language_storage::{ModuleId, StructTag},
49    resolver::{ModuleResolver, ResourceResolver},
50};
51use prometheus::Registry;
52use serde::{Deserialize, Serialize};
53use similar::{ChangeTag, TextDiff};
54use tracing::{error, info, trace, warn};
55
56use crate::{
57    chain_from_chain_id,
58    data_fetcher::{
59        DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
60    },
61    displays::{
62        Pretty,
63        transaction_displays::{FullPTB, transform_command_results_to_annotated},
64    },
65    types::*,
66};
67
68// TODO: add persistent cache. But perf is good enough already.
69
70#[derive(Debug, Serialize, Deserialize)]
71pub struct ExecutionSandboxState {
72    /// Information describing the transaction
73    pub transaction_info: OnChainTransactionInfo,
74    /// All the objects that are required for the execution of the transaction
75    pub required_objects: Vec<Object>,
76    /// Temporary store from executing this locally in
77    /// `execute_transaction_to_effects`
78    #[serde(skip)]
79    pub local_exec_temporary_store: Option<InnerTemporaryStore>,
80    /// Effects from executing this locally in `execute_transaction_to_effects`
81    pub local_exec_effects: IotaTransactionBlockEffects,
82    /// Status from executing this locally in `execute_transaction_to_effects`
83    #[serde(skip)]
84    pub local_exec_status: Option<Result<(), ExecutionError>>,
85}
86
87impl ExecutionSandboxState {
88    pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
89        if self.transaction_info.effects != self.local_exec_effects {
90            error!("Replay tool forked {}", self.transaction_info.tx_digest);
91            let diff = self.diff_effects();
92            println!("{diff}");
93            return Err(ReplayEngineError::EffectsForked {
94                digest: self.transaction_info.tx_digest,
95                diff: format!("\n{diff}"),
96                on_chain: Box::new(self.transaction_info.effects.clone()),
97                local: Box::new(self.local_exec_effects.clone()),
98            });
99        }
100        Ok(())
101    }
102
103    /// Utility to diff effects in a human readable format
104    pub fn diff_effects(&self) -> String {
105        let eff1 = &self.transaction_info.effects;
106        let eff2 = &self.local_exec_effects;
107        let on_chain_str = format!("{eff1:#?}");
108        let local_chain_str = format!("{eff2:#?}");
109        let mut res = vec![];
110
111        let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
112        for change in diff.iter_all_changes() {
113            let sign = match change.tag() {
114                ChangeTag::Delete => "---",
115                ChangeTag::Insert => "+++",
116                ChangeTag::Equal => "   ",
117            };
118            res.push(format!("{sign}{change}"));
119        }
120
121        res.join("")
122    }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct ProtocolVersionSummary {
127    /// Protocol version at this point
128    pub protocol_version: u64,
129    /// The first epoch that uses this protocol version
130    pub epoch_start: u64,
131    /// The last epoch that uses this protocol version
132    pub epoch_end: u64,
133    /// The first checkpoint in this protocol v ersion
134    pub checkpoint_start: Option<u64>,
135    /// The last checkpoint in this protocol version
136    pub checkpoint_end: Option<u64>,
137    /// The transaction which triggered this epoch change
138    pub epoch_change_tx: TransactionDigest,
139}
140
141#[derive(Clone)]
142pub struct Storage {
143    /// These are objects at the frontier of the execution's view
144    /// They might not be the latest object currently but they are the latest
145    /// objects for the TX at the time it was run
146    /// This store cannot be shared between runners
147    pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
148
149    /// Package cache and object version cache can be shared between runners
150    /// Non system packages are immutable so we can cache these
151    pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
152    /// Object contents are frozen at their versions so we can cache these
153    /// We must place system packages here as well
154    pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
155}
156
157impl std::fmt::Display for Storage {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        writeln!(f, "Live object store")?;
160        for (id, obj) in self
161            .live_objects_store
162            .lock()
163            .expect("Unable to lock")
164            .iter()
165        {
166            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
167        }
168        writeln!(f, "Package cache")?;
169        for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
170            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
171        }
172        writeln!(f, "Object version cache")?;
173        for (id, _) in self
174            .object_version_cache
175            .lock()
176            .expect("Unable to lock")
177            .iter()
178        {
179            writeln!(f, "{}: {}", id.0, id.1)?;
180        }
181
182        write!(f, "")
183    }
184}
185
186impl Storage {
187    pub fn default() -> Self {
188        Self {
189            live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
190            package_cache: Arc::new(Mutex::new(BTreeMap::new())),
191            object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
192        }
193    }
194
195    pub fn all_objects(&self) -> Vec<Object> {
196        self.live_objects_store
197            .lock()
198            .expect("Unable to lock")
199            .values()
200            .cloned()
201            .chain(
202                self.package_cache
203                    .lock()
204                    .expect("Unable to lock")
205                    .iter()
206                    .map(|(_, obj)| obj.clone()),
207            )
208            .chain(
209                self.object_version_cache
210                    .lock()
211                    .expect("Unable to lock")
212                    .iter()
213                    .map(|(_, obj)| obj.clone()),
214            )
215            .collect::<Vec<_>>()
216    }
217}
218
219#[derive(Clone)]
220pub struct LocalExec {
221    pub client: Option<IotaClient>,
222    // For a given protocol version, what TX created it, and what is the valid range of epochs
223    // at this protocol version.
224    pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
225    // For a given protocol version, the mapping valid sequence numbers for each framework package
226    pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
227    // The current protocol version for this execution
228    pub current_protocol_version: u64,
229    // All state is contained here
230    pub storage: Storage,
231    // Debug events
232    pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
233    // Debug events
234    pub metrics: Arc<LimitsMetrics>,
235    // Used for fetching data from the network or remote store
236    pub fetcher: Fetchers,
237
238    // One can optionally override the executor version
239    // -1 implies use latest version
240    pub executor_version: Option<i64>,
241    // One can optionally override the protocol version
242    // -1 implies use latest version
243    // None implies use the protocol version at the time of execution
244    pub protocol_version: Option<i64>,
245    // Whether or not to enable the gas profiler, the PathBuf contains either a user specified
246    // filepath or the default current directory and name format for the profile output
247    pub enable_profiler: Option<PathBuf>,
248    pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
249    // Retry policies due to RPC errors
250    pub num_retries_for_timeout: u32,
251    pub sleep_period_for_timeout: std::time::Duration,
252}
253
254impl LocalExec {
255    /// Wrapper around fetcher in case we want to add more functionality
256    /// Such as fetching from local DB from snapshot
257    pub async fn multi_download(
258        &self,
259        objs: &[(ObjectID, SequenceNumber)],
260    ) -> Result<Vec<Object>, ReplayEngineError> {
261        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
262        while num_retries_for_timeout >= 0 {
263            match self.fetcher.multi_get_versioned(objs).await {
264                Ok(objs) => return Ok(objs),
265                Err(ReplayEngineError::IotaRpcRequestTimeout) => {
266                    warn!(
267                        "RPC request timed out. Retries left {}. Sleeping for {}s",
268                        num_retries_for_timeout,
269                        self.sleep_period_for_timeout.as_secs()
270                    );
271                    num_retries_for_timeout -= 1;
272                    tokio::time::sleep(self.sleep_period_for_timeout).await;
273                }
274                Err(e) => return Err(e),
275            }
276        }
277        Err(ReplayEngineError::IotaRpcRequestTimeout)
278    }
279    /// Wrapper around fetcher in case we want to add more functionality
280    /// Such as fetching from local DB from snapshot
281    pub async fn multi_download_latest(
282        &self,
283        objs: &[ObjectID],
284    ) -> Result<Vec<Object>, ReplayEngineError> {
285        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
286        while num_retries_for_timeout >= 0 {
287            match self.fetcher.multi_get_latest(objs).await {
288                Ok(objs) => return Ok(objs),
289                Err(ReplayEngineError::IotaRpcRequestTimeout) => {
290                    warn!(
291                        "RPC request timed out. Retries left {}. Sleeping for {}s",
292                        num_retries_for_timeout,
293                        self.sleep_period_for_timeout.as_secs()
294                    );
295                    num_retries_for_timeout -= 1;
296                    tokio::time::sleep(self.sleep_period_for_timeout).await;
297                }
298                Err(e) => return Err(e),
299            }
300        }
301        Err(ReplayEngineError::IotaRpcRequestTimeout)
302    }
303
304    pub async fn fetch_loaded_child_refs(
305        &self,
306        tx_digest: &TransactionDigest,
307    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
308        // Get the child objects loaded
309        self.fetcher.get_loaded_child_objects(tx_digest).await
310    }
311
312    pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
313        Self::new_for_remote(
314            IotaClientBuilder::default()
315                .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
316                .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
317                .build(http_url)
318                .await?,
319            None,
320        )
321        .await
322    }
323
324    pub async fn replay_with_network_config(
325        rpc_url: String,
326        tx_digest: TransactionDigest,
327        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
328        use_authority: bool,
329        executor_version: Option<i64>,
330        protocol_version: Option<i64>,
331        enable_profiler: Option<PathBuf>,
332        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
333    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
334        info!("Using RPC URL: {}", rpc_url);
335        LocalExec::new_from_fn_url(&rpc_url)
336            .await?
337            .init_for_execution()
338            .await?
339            .execute_transaction(
340                &tx_digest,
341                expensive_safety_check_config,
342                use_authority,
343                executor_version,
344                protocol_version,
345                enable_profiler,
346                config_and_versions,
347            )
348            .await
349    }
350
351    /// This captures the state of the network at a given point in time and
352    /// populates prptocol version tables including which system packages to
353    /// fetch If this function is called across epoch boundaries, the info
354    /// might be stale. But it should only be called once per epoch.
355    pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
356        self.populate_protocol_version_tables().await?;
357        tokio::task::yield_now().await;
358        Ok(self)
359    }
360
361    pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
362        Self::new_for_remote(
363            self.client.expect("Remote client not initialized"),
364            Some(self.fetcher.into_remote()),
365        )
366        .await?
367        .init_for_execution()
368        .await
369    }
370
371    pub async fn new_for_remote(
372        client: IotaClient,
373        remote_fetcher: Option<RemoteFetcher>,
374    ) -> Result<Self, ReplayEngineError> {
375        // Use a throwaway metrics registry for local execution.
376        let registry = prometheus::Registry::new();
377        let metrics = Arc::new(LimitsMetrics::new(&registry));
378
379        let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
380
381        Ok(Self {
382            client: Some(client),
383            protocol_version_epoch_table: BTreeMap::new(),
384            protocol_version_system_package_table: BTreeMap::new(),
385            current_protocol_version: 0,
386            exec_store_events: Arc::new(Mutex::new(Vec::new())),
387            metrics,
388            storage: Storage::default(),
389            fetcher: Fetchers::Remote(fetcher),
390            // TODO: make these configurable
391            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
392            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
393            executor_version: None,
394            protocol_version: None,
395            enable_profiler: None,
396            config_and_versions: None,
397        })
398    }
399
400    pub async fn new_for_state_dump(
401        path: &str,
402        backup_rpc_url: Option<String>,
403    ) -> Result<Self, ReplayEngineError> {
404        // Use a throwaway metrics registry for local execution.
405        let registry = prometheus::Registry::new();
406        let metrics = Arc::new(LimitsMetrics::new(&registry));
407
408        let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
409        let current_protocol_version = state.protocol_version;
410        let fetcher = match backup_rpc_url {
411            Some(url) => NodeStateDumpFetcher::new(
412                state,
413                Some(RemoteFetcher::new(
414                    IotaClientBuilder::default()
415                        .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
416                        .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
417                        .build(url)
418                        .await?,
419                )),
420            ),
421            None => NodeStateDumpFetcher::new(state, None),
422        };
423
424        Ok(Self {
425            client: None,
426            protocol_version_epoch_table: BTreeMap::new(),
427            protocol_version_system_package_table: BTreeMap::new(),
428            current_protocol_version,
429            exec_store_events: Arc::new(Mutex::new(Vec::new())),
430            metrics,
431            storage: Storage::default(),
432            fetcher: Fetchers::NodeStateDump(fetcher),
433            // TODO: make these configurable
434            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
435            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
436            executor_version: None,
437            protocol_version: None,
438            enable_profiler: None,
439            config_and_versions: None,
440        })
441    }
442
443    pub async fn multi_download_and_store(
444        &mut self,
445        objs: &[(ObjectID, SequenceNumber)],
446    ) -> Result<Vec<Object>, ReplayEngineError> {
447        let objs = self.multi_download(objs).await?;
448
449        // Backfill the store
450        for obj in objs.iter() {
451            let o_ref = obj.compute_object_reference();
452            self.storage
453                .live_objects_store
454                .lock()
455                .expect("Can't lock")
456                .insert(o_ref.0, obj.clone());
457            self.storage
458                .object_version_cache
459                .lock()
460                .expect("Cannot lock")
461                .insert((o_ref.0, o_ref.1), obj.clone());
462            if obj.is_package() {
463                self.storage
464                    .package_cache
465                    .lock()
466                    .expect("Cannot lock")
467                    .insert(o_ref.0, obj.clone());
468            }
469        }
470        tokio::task::yield_now().await;
471        Ok(objs)
472    }
473
474    pub async fn multi_download_relevant_packages_and_store(
475        &mut self,
476        objs: Vec<ObjectID>,
477        protocol_version: u64,
478    ) -> Result<Vec<Object>, ReplayEngineError> {
479        let syst_packages_objs = if self.protocol_version.is_some_and(|i| i < 0) {
480            BuiltInFramework::genesis_objects().collect()
481        } else {
482            let syst_packages =
483                self.system_package_versions_for_protocol_version(protocol_version)?;
484            self.multi_download(&syst_packages).await?
485        };
486
487        // Download latest version of all packages that are not system packages
488        // This is okay since the versions can never change
489        let non_system_package_objs: Vec<_> = objs
490            .into_iter()
491            .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
492            .collect();
493        let objs = self
494            .multi_download_latest(&non_system_package_objs)
495            .await?
496            .into_iter()
497            .chain(syst_packages_objs.into_iter());
498
499        for obj in objs.clone() {
500            let o_ref = obj.compute_object_reference();
501            // We dont always want the latest in store
502            // self.storage.store.insert(o_ref.0, obj.clone());
503            self.storage
504                .object_version_cache
505                .lock()
506                .expect("Cannot lock")
507                .insert((o_ref.0, o_ref.1), obj.clone());
508            if obj.is_package() {
509                self.storage
510                    .package_cache
511                    .lock()
512                    .expect("Cannot lock")
513                    .insert(o_ref.0, obj.clone());
514            }
515        }
516        Ok(objs.collect())
517    }
518
519    // TODO: remove this after `futures::executor::block_on` is removed.
520    #[expect(clippy::disallowed_methods)]
521    pub fn download_object(
522        &self,
523        object_id: &ObjectID,
524        version: SequenceNumber,
525    ) -> Result<Object, ReplayEngineError> {
526        if self
527            .storage
528            .object_version_cache
529            .lock()
530            .expect("Cannot lock")
531            .contains_key(&(*object_id, version))
532        {
533            return Ok(self
534                .storage
535                .object_version_cache
536                .lock()
537                .expect("Cannot lock")
538                .get(&(*object_id, version))
539                .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
540                    id: *object_id,
541                    version: Some(version),
542                })?
543                .clone());
544        }
545
546        let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
547            q.pop().unwrap_or_else(|| {
548                panic!(
549                    "Downloaded obj response cannot be empty {:?}",
550                    (*object_id, version)
551                )
552            })
553        })?;
554
555        let o_ref = o.compute_object_reference();
556        self.storage
557            .object_version_cache
558            .lock()
559            .expect("Cannot lock")
560            .insert((o_ref.0, o_ref.1), o.clone());
561        Ok(o)
562    }
563
564    // TODO: remove this after `futures::executor::block_on` is removed.
565    #[expect(clippy::disallowed_methods)]
566    pub fn download_latest_object(
567        &self,
568        object_id: &ObjectID,
569    ) -> Result<Option<Object>, ReplayEngineError> {
570        let resp = block_on({
571            // info!("Downloading latest object {object_id}");
572            self.multi_download_latest(&[*object_id])
573        })
574        .map(|mut q| {
575            q.pop()
576                .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
577        });
578
579        match resp {
580            Ok(v) => Ok(Some(v)),
581            Err(ReplayEngineError::ObjectNotExist { id }) => {
582                error!(
583                    "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
584                );
585                Ok(None)
586            }
587            Err(ReplayEngineError::ObjectDeleted {
588                id,
589                version,
590                digest,
591            }) => {
592                error!("Object {id} {version} {digest} was deleted on RPC server.");
593                Ok(None)
594            }
595            Err(err) => Err(ReplayEngineError::IotaRpcError {
596                err: err.to_string(),
597            }),
598        }
599    }
600
601    #[expect(clippy::disallowed_methods)]
602    pub fn download_object_by_upper_bound(
603        &self,
604        object_id: &ObjectID,
605        version_upper_bound: VersionNumber,
606    ) -> Result<Option<Object>, ReplayEngineError> {
607        let local_object = self
608            .storage
609            .live_objects_store
610            .lock()
611            .expect("Can't lock")
612            .get(object_id)
613            .cloned();
614        if local_object.is_some() {
615            return Ok(local_object);
616        }
617        let response = block_on({
618            self.fetcher
619                .get_child_object(object_id, version_upper_bound)
620        });
621        match response {
622            Ok(object) => {
623                let obj_ref = object.compute_object_reference();
624                self.storage
625                    .live_objects_store
626                    .lock()
627                    .expect("Can't lock")
628                    .insert(*object_id, object.clone());
629                self.storage
630                    .object_version_cache
631                    .lock()
632                    .expect("Can't lock")
633                    .insert((obj_ref.0, obj_ref.1), object.clone());
634                Ok(Some(object))
635            }
636            Err(ReplayEngineError::ObjectNotExist { id }) => {
637                error!(
638                    "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
639                );
640                Ok(None)
641            }
642            Err(ReplayEngineError::ObjectDeleted {
643                id,
644                version,
645                digest,
646            }) => {
647                error!("Object {id} {version} {digest} was deleted on RPC server.");
648                Ok(None)
649            }
650            // This is a child object which was not found in the store (e.g., due to exists
651            // check before creating the dynamic field).
652            Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
653                info!(
654                    "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
655                );
656                Ok(None)
657            }
658            Err(err) => Err(ReplayEngineError::IotaRpcError {
659                err: err.to_string(),
660            }),
661        }
662    }
663
664    pub async fn get_checkpoint_txs(
665        &self,
666        checkpoint_id: u64,
667    ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
668        self.fetcher
669            .get_checkpoint_txs(checkpoint_id)
670            .await
671            .map_err(|e| ReplayEngineError::IotaRpcError { err: e.to_string() })
672    }
673
674    pub async fn execute_all_in_checkpoints(
675        &mut self,
676        checkpoint_ids: &[u64],
677        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
678        terminate_early: bool,
679        use_authority: bool,
680    ) -> Result<(u64, u64), ReplayEngineError> {
681        // Get all the TXs at this checkpoint
682        let mut txs = Vec::new();
683        for checkpoint_id in checkpoint_ids {
684            txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
685        }
686        let num = txs.len();
687        let mut succeeded = 0;
688        for tx in txs {
689            match self
690                .execute_transaction(
691                    &tx,
692                    expensive_safety_check_config.clone(),
693                    use_authority,
694                    None,
695                    None,
696                    None,
697                    None,
698                )
699                .await
700                .map(|q| q.check_effects())
701            {
702                Err(e) | Ok(Err(e)) => {
703                    if terminate_early {
704                        return Err(e);
705                    }
706                    error!("Error executing tx: {},  {:#?}", tx, e);
707                    continue;
708                }
709                _ => (),
710            }
711
712            succeeded += 1;
713        }
714        Ok((succeeded, num as u64))
715    }
716
717    pub async fn execution_engine_execute_with_tx_info_impl(
718        &mut self,
719        tx_info: &OnChainTransactionInfo,
720        override_transaction_kind: Option<TransactionKind>,
721        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
722    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
723        let tx_digest = &tx_info.tx_digest;
724
725        // Initialize the state necessary for execution
726        // Get the input objects
727        let input_objects = self.initialize_execution_env_state(tx_info).await?;
728        assert_eq!(
729            &input_objects.filter_shared_objects().len(),
730            &tx_info.shared_object_refs.len()
731        );
732        // At this point we have all the objects needed for replay
733
734        // This assumes we already initialized the protocol version table
735        // `protocol_version_epoch_table`
736        let protocol_config =
737            &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
738
739        let metrics = self.metrics.clone();
740
741        let ov = self.executor_version;
742
743        // We could probably cache the executor per protocol config
744        let executor = get_executor(
745            ov,
746            protocol_config,
747            expensive_safety_check_config,
748            self.enable_profiler.clone(),
749        );
750
751        // All prep done
752        let expensive_checks = true;
753        let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
754        let certificate_deny_set = HashSet::new();
755        let gas_status = if tx_info.kind.is_system_tx() {
756            IotaGasStatus::new_unmetered()
757        } else {
758            IotaGasStatus::new(
759                tx_info.gas_budget,
760                tx_info.gas_price,
761                tx_info.reference_gas_price,
762                protocol_config,
763            )
764            .expect("Failed to create gas status")
765        };
766        let (inner_store, gas_status, effects, result) = executor.execute_transaction_to_effects(
767            &self,
768            protocol_config,
769            metrics.clone(),
770            expensive_checks,
771            &certificate_deny_set,
772            &tx_info.executed_epoch,
773            tx_info.epoch_start_timestamp,
774            CheckedInputObjects::new_for_replay(input_objects.clone()),
775            tx_info.gas.clone(),
776            gas_status,
777            transaction_kind.clone(),
778            tx_info.sender,
779            *tx_digest,
780            &mut None,
781        );
782
783        if let Err(err) = self.pretty_print_for_tracing(
784            &gas_status,
785            &executor,
786            tx_info,
787            &transaction_kind,
788            protocol_config,
789            metrics,
790            expensive_checks,
791            input_objects.clone(),
792        ) {
793            error!("Failed to pretty print for tracing: {:?}", err);
794        }
795
796        let all_required_objects = self.storage.all_objects();
797
798        let effects =
799            IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
800
801        Ok(ExecutionSandboxState {
802            transaction_info: tx_info.clone(),
803            required_objects: all_required_objects,
804            local_exec_temporary_store: Some(inner_store),
805            local_exec_effects: effects,
806            local_exec_status: Some(result),
807        })
808    }
809
810    fn pretty_print_for_tracing(
811        &self,
812        gas_status: &IotaGasStatus,
813        executor: &Arc<dyn Executor + Send + Sync>,
814        tx_info: &OnChainTransactionInfo,
815        transaction_kind: &TransactionKind,
816        protocol_config: &ProtocolConfig,
817        metrics: Arc<LimitsMetrics>,
818        expensive_checks: bool,
819        input_objects: InputObjects,
820    ) -> anyhow::Result<()> {
821        trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
822
823        let skip_checks = true;
824        if let ProgrammableTransaction(pt) = transaction_kind {
825            trace!(
826                target: "replay_ptb_info",
827                "{}",
828                Pretty(&FullPTB {
829                    ptb: pt.clone(),
830                    results: transform_command_results_to_annotated(
831                        executor,
832                        &self.clone(),
833                        executor.dev_inspect_transaction(
834                            &self,
835                            protocol_config,
836                            metrics,
837                            expensive_checks,
838                            &HashSet::new(),
839                            &tx_info.executed_epoch,
840                            tx_info.epoch_start_timestamp,
841                            CheckedInputObjects::new_for_replay(input_objects),
842                            tx_info.gas.clone(),
843                            IotaGasStatus::new(
844                                tx_info.gas_budget,
845                                tx_info.gas_price,
846                                tx_info.reference_gas_price,
847                                protocol_config,
848                            )?,
849                            transaction_kind.clone(),
850                            tx_info.sender,
851                            tx_info.sender_signed_data.digest(),
852                            skip_checks,
853                        )
854                        .3
855                        .unwrap_or_default(),
856                    )?,
857            }));
858        }
859        Ok(())
860    }
861
862    /// Must be called after `init_for_execution`
863    pub async fn execution_engine_execute_impl(
864        &mut self,
865        tx_digest: &TransactionDigest,
866        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
867    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
868        if self.is_remote_replay() {
869            assert!(
870                !self.protocol_version_system_package_table.is_empty()
871                    || !self.protocol_version_epoch_table.is_empty(),
872                "Required tables not populated. Must call `init_for_execution` before executing transactions"
873            );
874        }
875
876        let tx_info = if self.is_remote_replay() {
877            self.resolve_tx_components(tx_digest).await?
878        } else {
879            self.resolve_tx_components_from_dump(tx_digest).await?
880        };
881        self.execution_engine_execute_with_tx_info_impl(
882            &tx_info,
883            None,
884            expensive_safety_check_config,
885        )
886        .await
887    }
888
889    /// Executes a transaction with the state specified in `pre_run_sandbox`
890    /// This is useful for executing a transaction with a specific state
891    /// However if the state in invalid, the behavior is undefined.
892    pub async fn certificate_execute_with_sandbox_state(
893        pre_run_sandbox: &ExecutionSandboxState,
894    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
895        // These cannot be changed and are inherited from the sandbox state
896        let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
897        let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
898        let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
899        let protocol_config = ProtocolConfig::get_for_version(
900            pre_run_sandbox.transaction_info.protocol_version,
901            pre_run_sandbox.transaction_info.chain,
902        );
903        let required_objects = pre_run_sandbox.required_objects.clone();
904        let store = InMemoryStorage::new(required_objects.clone());
905
906        let transaction =
907            Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
908
909        // TODO: This will not work for deleted shared objects. We need to persist that
910        // information in the sandbox. TODO: A lot of the following code is
911        // replicated in several places. We should introduce a few traits and
912        // make them shared so that we don't have to fix one by one when we have major
913        // execution layer changes.
914        let input_objects = store.read_input_objects_for_transaction(&transaction);
915        let executable = VerifiedExecutableTransaction::new_from_quorum_execution(
916            VerifiedTransaction::new_unchecked(transaction),
917            executed_epoch,
918        );
919        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
920            &executable,
921            input_objects,
922            &protocol_config,
923            reference_gas_price,
924        )
925        .unwrap();
926        let (kind, signer, gas) = executable.transaction_data().execution_parts();
927        let executor = iota_execution::executor(&protocol_config, true, None).unwrap();
928        let (_, _, effects, exec_res) = executor.execute_transaction_to_effects(
929            &store,
930            &protocol_config,
931            Arc::new(LimitsMetrics::new(&Registry::new())),
932            true,
933            &HashSet::new(),
934            &executed_epoch,
935            epoch_start_timestamp,
936            input_objects,
937            gas,
938            gas_status,
939            kind,
940            signer,
941            *executable.digest(),
942            &mut None,
943        );
944
945        let effects =
946            IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
947
948        Ok(ExecutionSandboxState {
949            transaction_info: pre_run_sandbox.transaction_info.clone(),
950            required_objects,
951            local_exec_temporary_store: None, // We dont capture it for cert exec run
952            local_exec_effects: effects,
953            local_exec_status: Some(exec_res),
954        })
955    }
956
957    /// Must be called after `init_for_execution`
958    /// This executes from
959    /// `iota_core::authority::AuthorityState::try_execute_immediately`
960    pub async fn certificate_execute(
961        &mut self,
962        tx_digest: &TransactionDigest,
963        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
964    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
965        // Use the lighterweight execution engine to get the pre-run state
966        let pre_run_sandbox = self
967            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
968            .await?;
969        Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
970    }
971
972    /// Must be called after `init_for_execution`
973    /// This executes from
974    /// `iota_adapter::execution_engine::execute_transaction_to_effects`
975    pub async fn execution_engine_execute(
976        &mut self,
977        tx_digest: &TransactionDigest,
978        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
979    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
980        let sandbox_state = self
981            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
982            .await?;
983
984        Ok(sandbox_state)
985    }
986
987    pub async fn execute_state_dump(
988        &mut self,
989        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
990    ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
991        assert!(!self.is_remote_replay());
992
993        let d = match self.fetcher.clone() {
994            Fetchers::NodeStateDump(d) => d,
995            _ => panic!("Invalid fetcher for state dump"),
996        };
997        let tx_digest = d.node_state_dump.clone().tx_digest;
998        let sandbox_state = self
999            .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1000            .await?;
1001
1002        Ok((sandbox_state, d.node_state_dump))
1003    }
1004
1005    pub async fn execute_transaction(
1006        &mut self,
1007        tx_digest: &TransactionDigest,
1008        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1009        use_authority: bool,
1010        executor_version: Option<i64>,
1011        protocol_version: Option<i64>,
1012        enable_profiler: Option<PathBuf>,
1013        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1014    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1015        self.executor_version = executor_version;
1016        self.protocol_version = protocol_version;
1017        self.enable_profiler = enable_profiler;
1018        self.config_and_versions = config_and_versions;
1019        if use_authority {
1020            self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1021                .await
1022        } else {
1023            self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1024                .await
1025        }
1026    }
1027    fn system_package_ids(_protocol_version: u64) -> Vec<ObjectID> {
1028        BuiltInFramework::all_package_ids()
1029    }
1030
1031    /// This is the only function which accesses the network during execution
1032    pub fn get_or_download_object(
1033        &self,
1034        obj_id: &ObjectID,
1035        package_expected: bool,
1036    ) -> Result<Option<Object>, ReplayEngineError> {
1037        if package_expected {
1038            if let Some(obj) = self
1039                .storage
1040                .package_cache
1041                .lock()
1042                .expect("Cannot lock")
1043                .get(obj_id)
1044            {
1045                return Ok(Some(obj.clone()));
1046            };
1047            // Check if its a system package because we must've downloaded all
1048            // TODO: Will return this check once we can download completely for
1049            // other networks assert!(
1050            //     !self.system_package_ids().contains(obj_id),
1051            //     "All system packages should be downloaded already"
1052            // );
1053        } else if let Some(obj) = self
1054            .storage
1055            .live_objects_store
1056            .lock()
1057            .expect("Can't lock")
1058            .get(obj_id)
1059        {
1060            return Ok(Some(obj.clone()));
1061        }
1062
1063        let Some(o) = self.download_latest_object(obj_id)? else {
1064            return Ok(None);
1065        };
1066
1067        if o.is_package() {
1068            assert!(
1069                package_expected,
1070                "Did not expect package but downloaded object is a package: {obj_id}"
1071            );
1072
1073            self.storage
1074                .package_cache
1075                .lock()
1076                .expect("Cannot lock")
1077                .insert(*obj_id, o.clone());
1078        }
1079        let o_ref = o.compute_object_reference();
1080        self.storage
1081            .object_version_cache
1082            .lock()
1083            .expect("Cannot lock")
1084            .insert((o_ref.0, o_ref.1), o.clone());
1085        Ok(Some(o))
1086    }
1087
1088    pub fn is_remote_replay(&self) -> bool {
1089        matches!(self.fetcher, Fetchers::Remote(_))
1090    }
1091
1092    /// Must be called after `populate_protocol_version_tables`
1093    pub fn system_package_versions_for_protocol_version(
1094        &self,
1095        protocol_version: u64,
1096    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1097        match &self.fetcher {
1098            Fetchers::Remote(_) => Ok(self
1099                .protocol_version_system_package_table
1100                .get(&protocol_version)
1101                .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1102                    protocol_version,
1103                })?
1104                .clone()
1105                .into_iter()
1106                .collect()),
1107
1108            Fetchers::NodeStateDump(d) => Ok(d
1109                .node_state_dump
1110                .relevant_system_packages
1111                .iter()
1112                .map(|w| (w.id, w.version, w.digest))
1113                .map(|q| (q.0, q.1))
1114                .collect()),
1115        }
1116    }
1117
1118    pub async fn protocol_ver_to_epoch_map(
1119        &self,
1120    ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1121        let mut range_map = BTreeMap::new();
1122        let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1123
1124        // Exception for Genesis: Protocol version 1 at epoch 0
1125        let mut tx_digest = *self
1126            .fetcher
1127            .get_checkpoint_txs(0)
1128            .await?
1129            .first()
1130            .expect("Genesis TX must be in first checkpoint");
1131        // Somehow the genesis TX did not emit any event, but we know it was the start
1132        // of version 1 So we need to manually add this range
1133        let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1134            (0, 1, Some(0u64));
1135
1136        let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1137            (start_epoch, start_protocol_version, start_checkpoint);
1138
1139        (start_epoch, start_protocol_version, start_checkpoint) =
1140            (curr_epoch, curr_protocol_version, curr_checkpoint);
1141
1142        // This is the final tx digest for the epoch change. We need this to track the
1143        // final checkpoint
1144        let mut end_epoch_tx_digest = tx_digest;
1145
1146        for event in epoch_change_events {
1147            (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1148            end_epoch_tx_digest = event.id.tx_digest;
1149
1150            if start_protocol_version == curr_protocol_version {
1151                // Same range
1152                continue;
1153            }
1154
1155            // Change in prot version
1156            // Find the last checkpoint
1157            curr_checkpoint = self
1158                .fetcher
1159                .get_transaction(&event.id.tx_digest)
1160                .await?
1161                .checkpoint;
1162            // Insert the last range
1163            range_map.insert(
1164                start_protocol_version,
1165                ProtocolVersionSummary {
1166                    protocol_version: start_protocol_version,
1167                    epoch_start: start_epoch,
1168                    epoch_end: curr_epoch - 1,
1169                    checkpoint_start: start_checkpoint,
1170                    checkpoint_end: curr_checkpoint.map(|x| x - 1),
1171                    epoch_change_tx: tx_digest,
1172                },
1173            );
1174
1175            start_epoch = curr_epoch;
1176            start_protocol_version = curr_protocol_version;
1177            tx_digest = event.id.tx_digest;
1178            start_checkpoint = curr_checkpoint;
1179        }
1180
1181        // Insert the last range
1182        range_map.insert(
1183            curr_protocol_version,
1184            ProtocolVersionSummary {
1185                protocol_version: curr_protocol_version,
1186                epoch_start: start_epoch,
1187                epoch_end: curr_epoch,
1188                checkpoint_start: curr_checkpoint,
1189                checkpoint_end: self
1190                    .fetcher
1191                    .get_transaction(&end_epoch_tx_digest)
1192                    .await?
1193                    .checkpoint,
1194                epoch_change_tx: tx_digest,
1195            },
1196        );
1197
1198        Ok(range_map)
1199    }
1200
1201    pub fn protocol_version_for_epoch(
1202        epoch: u64,
1203        mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1204    ) -> u64 {
1205        // Naive impl but works for now
1206        // Can improve with range algos & data structures
1207        let mut version = 1;
1208        for (k, v) in mp.iter().rev() {
1209            if v.1 <= epoch {
1210                version = *k;
1211                break;
1212            }
1213        }
1214        version
1215    }
1216
1217    pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1218        self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1219
1220        let system_package_revisions = self.system_package_versions().await?;
1221
1222        // This can be more efficient but small footprint so okay for now
1223        // Table is sorted from earliest to latest
1224        for (
1225            prot_ver,
1226            ProtocolVersionSummary {
1227                epoch_change_tx: tx_digest,
1228                ..
1229            },
1230        ) in self.protocol_version_epoch_table.clone()
1231        {
1232            // Use the previous versions protocol version table
1233            let mut working = if prot_ver <= 1 {
1234                BTreeMap::new()
1235            } else {
1236                self.protocol_version_system_package_table
1237                    .iter()
1238                    .rev()
1239                    .find(|(ver, _)| **ver <= prot_ver)
1240                    .expect("Prev entry must exist")
1241                    .1
1242                    .clone()
1243            };
1244
1245            for (id, versions) in system_package_revisions.iter() {
1246                // Oldest appears first in list, so reverse
1247                for ver in versions.iter().rev() {
1248                    if ver.1 == tx_digest {
1249                        // Found the version for this protocol version
1250                        working.insert(*id, ver.0);
1251                        break;
1252                    }
1253                }
1254            }
1255            self.protocol_version_system_package_table
1256                .insert(prot_ver, working);
1257        }
1258        Ok(())
1259    }
1260
1261    pub async fn system_package_versions(
1262        &self,
1263    ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1264    {
1265        let system_package_ids = Self::system_package_ids(
1266            *self
1267                .protocol_version_epoch_table
1268                .keys()
1269                .peekable()
1270                .last()
1271                .expect("Protocol version epoch table not populated"),
1272        );
1273        let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1274
1275        let mut mapping = BTreeMap::new();
1276
1277        // Extract all the transactions which created or mutated this object
1278        while !system_package_objs.is_empty() {
1279            // For the given object and its version, record the transaction which upgraded
1280            // or created it
1281            let previous_txs: Vec<_> = system_package_objs
1282                .iter()
1283                .map(|o| (o.compute_object_reference(), o.previous_transaction))
1284                .collect();
1285
1286            previous_txs.iter().for_each(|((id, ver, _), tx)| {
1287                mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1288            });
1289
1290            // Next round
1291            // Get the previous version of each object if exists
1292            let previous_ver_refs: Vec<_> = previous_txs
1293                .iter()
1294                .filter_map(|(q, _)| {
1295                    let prev_ver = u64::from(q.1) - 1;
1296                    if prev_ver == 0 {
1297                        None
1298                    } else {
1299                        Some((q.0, SequenceNumber::from(prev_ver)))
1300                    }
1301                })
1302                .collect();
1303            system_package_objs = match self.multi_download(&previous_ver_refs).await {
1304                Ok(packages) => packages,
1305                Err(ReplayEngineError::ObjectNotExist { id }) => {
1306                    // This happens when the RPC server prunes older object
1307                    // Replays in the current protocol version will work but old ones might not
1308                    // as we cannot fetch the package
1309                    warn!(
1310                        "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1311                        id
1312                    );
1313                    break;
1314                }
1315                Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1316                    // This happens when the RPC server prunes older object
1317                    // Replays in the current protocol version will work but old ones might not
1318                    // as we cannot fetch the package
1319                    warn!(
1320                        "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1321                        id, version
1322                    );
1323                    break;
1324                }
1325                Err(ReplayEngineError::ObjectVersionTooHigh {
1326                    id,
1327                    asked_version,
1328                    latest_version,
1329                }) => {
1330                    warn!(
1331                        "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1332                        id, asked_version, latest_version
1333                    );
1334                    break;
1335                }
1336                Err(ReplayEngineError::ObjectDeleted {
1337                    id,
1338                    version,
1339                    digest,
1340                }) => {
1341                    // This happens when the RPC server prunes older object
1342                    // Replays in the current protocol version will work but old ones might not
1343                    // as we cannot fetch the package
1344                    warn!(
1345                        "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1346                        id, version, digest
1347                    );
1348                    break;
1349                }
1350                Err(e) => return Err(e),
1351            };
1352        }
1353        Ok(mapping)
1354    }
1355
1356    pub async fn get_protocol_config(
1357        &self,
1358        epoch_id: EpochId,
1359        chain: Chain,
1360    ) -> Result<ProtocolConfig, ReplayEngineError> {
1361        match self.protocol_version {
1362            Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1363            Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1364            None => self
1365                .protocol_version_epoch_table
1366                .iter()
1367                .rev()
1368                .find(|(_, rg)| epoch_id >= rg.epoch_start)
1369                .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1370                .unwrap_or_else(|| {
1371                    Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1372                }),
1373        }
1374    }
1375
1376    pub async fn checkpoints_for_epoch(
1377        &self,
1378        epoch_id: u64,
1379    ) -> Result<(u64, u64), ReplayEngineError> {
1380        let epoch_change_events = self
1381            .fetcher
1382            .get_epoch_change_events(true)
1383            .await?
1384            .into_iter()
1385            .collect::<Vec<_>>();
1386        let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1387            (0, 1)
1388        } else {
1389            let idx = epoch_change_events
1390                .iter()
1391                .position(|ev| match extract_epoch_and_version(ev.clone()) {
1392                    Ok((epoch, _)) => epoch == epoch_id,
1393                    Err(_) => false,
1394                })
1395                .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1396            let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1397            (
1398                self.fetcher
1399                    .get_transaction(&epoch_change_tx)
1400                    .await?
1401                    .checkpoint
1402                    .unwrap_or_else(|| {
1403                        panic!(
1404                            "Checkpoint for transaction {epoch_change_tx} not present. Could be due to pruning"
1405                        )
1406                    }),
1407                idx,
1408            )
1409        };
1410
1411        let next_epoch_change_tx = epoch_change_events
1412            .get(start_epoch_idx + 1)
1413            .map(|v| v.id.tx_digest)
1414            .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1415
1416        let next_epoch_checkpoint = self
1417            .fetcher
1418            .get_transaction(&next_epoch_change_tx)
1419            .await?
1420            .checkpoint
1421            .unwrap_or_else(|| {
1422                panic!(
1423                    "Checkpoint for transaction {next_epoch_change_tx} not present. Could be due to pruning"
1424                )
1425            });
1426
1427        Ok((start_checkpoint, next_epoch_checkpoint - 1))
1428    }
1429
1430    pub async fn get_epoch_start_timestamp_and_rgp(
1431        &self,
1432        epoch_id: u64,
1433        tx_digest: &TransactionDigest,
1434    ) -> Result<(u64, u64), ReplayEngineError> {
1435        if epoch_id == 0 {
1436            return Err(ReplayEngineError::TransactionNotSupported {
1437                digest: *tx_digest,
1438                reason: "Transactions from epoch 0 not supported".to_string(),
1439            });
1440        }
1441        self.fetcher
1442            .get_epoch_start_timestamp_and_rgp(epoch_id)
1443            .await
1444    }
1445
1446    fn add_config_objects_if_needed(
1447        &self,
1448        status: &IotaExecutionStatus,
1449    ) -> Vec<(ObjectID, SequenceNumber)> {
1450        match parse_effect_error_for_denied_coins(status) {
1451            Some(coin_type) => {
1452                let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1453                    panic!(
1454                        "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1455                    );
1456                };
1457                // NB: the version of the deny list object doesn't matter
1458                if !config_id_and_version
1459                    .iter()
1460                    .any(|(id, _)| id == &IOTA_DENY_LIST_OBJECT_ID)
1461                {
1462                    let deny_list_oid_version = self.download_latest_object(&IOTA_DENY_LIST_OBJECT_ID)
1463                        .ok()
1464                        .flatten()
1465                        .expect("Unable to download the deny list object for a transaction that requires it")
1466                        .version();
1467                    config_id_and_version.push((IOTA_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1468                }
1469                config_id_and_version
1470            }
1471            None => vec![],
1472        }
1473    }
1474
1475    async fn resolve_tx_components(
1476        &self,
1477        tx_digest: &TransactionDigest,
1478    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1479        assert!(self.is_remote_replay());
1480        // Fetch full transaction content
1481        let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1482        let sender = match tx_info.clone().transaction.unwrap().data {
1483            iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.sender,
1484        };
1485        let IotaTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1486
1487        let config_objects = self.add_config_objects_if_needed(effects.status());
1488
1489        let raw_tx_bytes = tx_info.clone().raw_transaction;
1490        let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1491        let input_objs = orig_tx
1492            .transaction_data()
1493            .input_objects()
1494            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1495        let tx_kind_orig = orig_tx.transaction_data().kind();
1496
1497        // Download the objects at the version right before the execution of this TX
1498        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1499
1500        let shared_object_refs: Vec<ObjectRef> = effects
1501            .shared_objects()
1502            .iter()
1503            .map(|so_ref| {
1504                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1505                    unimplemented!(
1506                        "Replay of deleted shared object transactions is not supported yet"
1507                    );
1508                } else {
1509                    so_ref.to_object_ref()
1510                }
1511            })
1512            .collect();
1513        let gas_data = match tx_info.clone().transaction.unwrap().data {
1514            iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.gas_data,
1515        };
1516        let gas_object_refs: Vec<_> = gas_data
1517            .payment
1518            .iter()
1519            .map(|obj_ref| obj_ref.to_object_ref())
1520            .collect();
1521        let receiving_objs = orig_tx
1522            .transaction_data()
1523            .receiving_objects()
1524            .into_iter()
1525            .map(|(obj_id, version, _)| (obj_id, version))
1526            .collect();
1527
1528        let epoch_id = effects.executed_epoch;
1529        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1530
1531        // Extract the epoch start timestamp
1532        let (epoch_start_timestamp, reference_gas_price) = self
1533            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1534            .await?;
1535
1536        Ok(OnChainTransactionInfo {
1537            kind: tx_kind_orig.clone(),
1538            sender,
1539            modified_at_versions,
1540            input_objects: input_objs,
1541            shared_object_refs,
1542            gas: gas_object_refs,
1543            gas_budget: gas_data.budget,
1544            gas_price: gas_data.price,
1545            executed_epoch: epoch_id,
1546            dependencies: effects.dependencies().to_vec(),
1547            effects: IotaTransactionBlockEffects::V1(effects),
1548            receiving_objs,
1549            config_objects,
1550            // Find the protocol version for this epoch
1551            // This assumes we already initialized the protocol version table
1552            // `protocol_version_epoch_table`
1553            protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1554            tx_digest: *tx_digest,
1555            epoch_start_timestamp,
1556            sender_signed_data: orig_tx.clone(),
1557            reference_gas_price,
1558            chain,
1559        })
1560    }
1561
1562    async fn resolve_tx_components_from_dump(
1563        &self,
1564        tx_digest: &TransactionDigest,
1565    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1566        assert!(!self.is_remote_replay());
1567
1568        let dp = self.fetcher.as_node_state_dump();
1569
1570        let sender = dp
1571            .node_state_dump
1572            .sender_signed_data
1573            .transaction_data()
1574            .sender();
1575        let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1576        let effects = dp.node_state_dump.computed_effects.clone();
1577        let effects = IotaTransactionBlockEffects::try_from(effects).unwrap();
1578        // Config objects don't show up in the node state dump so they need to be
1579        // provided.
1580        let config_objects = self.add_config_objects_if_needed(effects.status());
1581
1582        // Fetch full transaction content
1583        // let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1584
1585        let input_objs = orig_tx
1586            .transaction_data()
1587            .input_objects()
1588            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1589        let tx_kind_orig = orig_tx.transaction_data().kind();
1590
1591        // Download the objects at the version right before the execution of this TX
1592        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1593
1594        let shared_object_refs: Vec<ObjectRef> = effects
1595            .shared_objects()
1596            .iter()
1597            .map(|so_ref| {
1598                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1599                    unimplemented!(
1600                        "Replay of deleted shared object transactions is not supported yet"
1601                    );
1602                } else {
1603                    so_ref.to_object_ref()
1604                }
1605            })
1606            .collect();
1607        let gas_data = orig_tx.transaction_data().gas_data();
1608        let gas_object_refs: Vec<_> = gas_data.clone().payment;
1609        let receiving_objs = orig_tx
1610            .transaction_data()
1611            .receiving_objects()
1612            .into_iter()
1613            .map(|(obj_id, version, _)| (obj_id, version))
1614            .collect();
1615
1616        let epoch_id = dp.node_state_dump.executed_epoch;
1617
1618        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1619
1620        let protocol_config =
1621            ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1622        // Extract the epoch start timestamp
1623        let (epoch_start_timestamp, reference_gas_price) = self
1624            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1625            .await?;
1626
1627        Ok(OnChainTransactionInfo {
1628            kind: tx_kind_orig.clone(),
1629            sender,
1630            modified_at_versions,
1631            input_objects: input_objs,
1632            shared_object_refs,
1633            gas: gas_object_refs,
1634            gas_budget: gas_data.budget,
1635            gas_price: gas_data.price,
1636            executed_epoch: epoch_id,
1637            dependencies: effects.dependencies().to_vec(),
1638            effects,
1639            receiving_objs,
1640            config_objects,
1641            protocol_version: protocol_config.version,
1642            tx_digest: *tx_digest,
1643            epoch_start_timestamp,
1644            sender_signed_data: orig_tx.clone(),
1645            reference_gas_price,
1646            chain,
1647        })
1648    }
1649
1650    async fn resolve_download_input_objects(
1651        &mut self,
1652        tx_info: &OnChainTransactionInfo,
1653        deleted_shared_objects: Vec<ObjectRef>,
1654    ) -> Result<InputObjects, ReplayEngineError> {
1655        // Download the input objects
1656        let mut package_inputs = vec![];
1657        let mut imm_owned_inputs = vec![];
1658        let mut shared_inputs = vec![];
1659        let mut deleted_shared_info_map = BTreeMap::new();
1660
1661        // for deleted shared objects, we need to look at the transaction dependencies
1662        // to find the correct transaction dependency for a deleted shared
1663        // object.
1664        if !deleted_shared_objects.is_empty() {
1665            for tx_digest in tx_info.dependencies.iter() {
1666                let tx_info = self.resolve_tx_components(tx_digest).await?;
1667                for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1668                    deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1669                }
1670            }
1671        }
1672
1673        tx_info
1674            .input_objects
1675            .iter()
1676            .map(|kind| match kind {
1677                InputObjectKind::MovePackage(i) => {
1678                    package_inputs.push(*i);
1679                    Ok(())
1680                }
1681                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1682                    imm_owned_inputs.push((o_ref.0, o_ref.1));
1683                    Ok(())
1684                }
1685                InputObjectKind::SharedMoveObject {
1686                    id,
1687                    initial_shared_version: _,
1688                    mutable: _,
1689                } if !deleted_shared_info_map.contains_key(id) => {
1690                    // We already downloaded
1691                    if let Some(o) = self
1692                        .storage
1693                        .live_objects_store
1694                        .lock()
1695                        .expect("Can't lock")
1696                        .get(id)
1697                    {
1698                        shared_inputs.push(o.clone());
1699                        Ok(())
1700                    } else {
1701                        Err(ReplayEngineError::InternalCacheInvariantViolation {
1702                            id: *id,
1703                            version: None,
1704                        })
1705                    }
1706                }
1707                _ => Ok(()),
1708            })
1709            .collect::<Result<Vec<_>, _>>()?;
1710
1711        // Download the imm and owned objects
1712        let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1713
1714        // For packages, download latest if non framework
1715        // If framework, download relevant for the current protocol version
1716        in_objs.extend(
1717            self.multi_download_relevant_packages_and_store(
1718                package_inputs,
1719                tx_info.protocol_version.as_u64(),
1720            )
1721            .await?,
1722        );
1723        // Add shared objects
1724        in_objs.extend(shared_inputs);
1725
1726        // TODO(Zhe): Account for cancelled transaction assigned version here, and
1727        // tests.
1728        let resolved_input_objs = tx_info
1729            .input_objects
1730            .iter()
1731            .flat_map(|kind| match kind {
1732                InputObjectKind::MovePackage(i) => {
1733                    // Okay to unwrap since we downloaded it
1734                    Some(ObjectReadResult::new(
1735                        *kind,
1736                        self.storage
1737                            .package_cache
1738                            .lock()
1739                            .expect("Cannot lock")
1740                            .get(i)
1741                            .unwrap_or(
1742                                &self
1743                                    .download_latest_object(i)
1744                                    .expect("Object download failed")
1745                                    .expect("Object not found on chain"),
1746                            )
1747                            .clone()
1748                            .into(),
1749                    ))
1750                }
1751                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1752                    *kind,
1753                    self.storage
1754                        .object_version_cache
1755                        .lock()
1756                        .expect("Cannot lock")
1757                        .get(&(o_ref.0, o_ref.1))
1758                        .unwrap()
1759                        .clone()
1760                        .into(),
1761                )),
1762                InputObjectKind::SharedMoveObject { id, .. }
1763                    if !deleted_shared_info_map.contains_key(id) =>
1764                {
1765                    // we already downloaded
1766                    Some(ObjectReadResult::new(
1767                        *kind,
1768                        self.storage
1769                            .live_objects_store
1770                            .lock()
1771                            .expect("Can't lock")
1772                            .get(id)
1773                            .unwrap()
1774                            .clone()
1775                            .into(),
1776                    ))
1777                }
1778                InputObjectKind::SharedMoveObject { id, .. } => {
1779                    let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1780                    Some(ObjectReadResult::new(
1781                        *kind,
1782                        ObjectReadResultKind::DeletedSharedObject(*version, *digest),
1783                    ))
1784                }
1785            })
1786            .collect();
1787
1788        Ok(InputObjects::new(resolved_input_objs))
1789    }
1790
1791    /// Given the OnChainTransactionInfo, download and store the input objects,
1792    /// and other info necessary for execution
1793    async fn initialize_execution_env_state(
1794        &mut self,
1795        tx_info: &OnChainTransactionInfo,
1796    ) -> Result<InputObjects, ReplayEngineError> {
1797        // We need this for other activities in this session
1798        self.current_protocol_version = tx_info.protocol_version.as_u64();
1799
1800        // Download the objects at the version right before the execution of this TX
1801        self.multi_download_and_store(&tx_info.modified_at_versions)
1802            .await?;
1803
1804        let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1805            .shared_object_refs
1806            .iter()
1807            .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1808
1809        // Download shared objects at the version right before the execution of this TX
1810        let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1811        self.multi_download_and_store(&shared_refs).await?;
1812
1813        // Download gas (although this should already be in cache from modified at
1814        // versions?)
1815        let gas_refs: Vec<_> = tx_info
1816            .gas
1817            .iter()
1818            .filter_map(|w| (w.0 != ObjectID::ZERO).then_some((w.0, w.1)))
1819            .collect();
1820        self.multi_download_and_store(&gas_refs).await?;
1821
1822        // Fetch the input objects we know from the raw transaction
1823        let input_objs = self
1824            .resolve_download_input_objects(tx_info, deleted_shared_refs)
1825            .await?;
1826
1827        // Fetch the receiving objects
1828        self.multi_download_and_store(&tx_info.receiving_objs)
1829            .await?;
1830
1831        // Fetch specified config objects if any
1832        self.multi_download_and_store(&tx_info.config_objects)
1833            .await?;
1834
1835        // Prep the object runtime for dynamic fields
1836        // Download the child objects accessed at the version right before the execution
1837        // of this TX
1838        let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1839        self.multi_download_and_store(&loaded_child_refs).await?;
1840        tokio::task::yield_now().await;
1841
1842        Ok(input_objs)
1843    }
1844}
1845
1846// <---------------------  Implement necessary traits for LocalExec to work with
1847// exec engine ----------------------->
1848
1849impl BackingPackageStore for LocalExec {
1850    /// In this case we might need to download a dependency package which was
1851    /// not present in the modified at versions list because packages are
1852    /// immutable
1853    fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1854        fn inner(self_: &LocalExec, package_id: &ObjectID) -> IotaResult<Option<Object>> {
1855            // If package not present fetch it from the network
1856            self_
1857                .get_or_download_object(package_id, true /* we expect a Move package */)
1858                .map_err(|e| IotaError::Storage(e.to_string()))
1859        }
1860
1861        let res = inner(self, package_id);
1862        self.exec_store_events
1863            .lock()
1864            .expect("Unable to lock events list")
1865            .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1866                package_id: *package_id,
1867                result: res.clone(),
1868            });
1869        res.map(|o| o.map(PackageObject::new))
1870    }
1871}
1872
1873impl ChildObjectResolver for LocalExec {
1874    /// This uses `get_object`, which does not download from the network
1875    /// Hence all objects must be in store already
1876    fn read_child_object(
1877        &self,
1878        parent: &ObjectID,
1879        child: &ObjectID,
1880        child_version_upper_bound: SequenceNumber,
1881    ) -> IotaResult<Option<Object>> {
1882        fn inner(
1883            self_: &LocalExec,
1884            parent: &ObjectID,
1885            child: &ObjectID,
1886            child_version_upper_bound: SequenceNumber,
1887        ) -> IotaResult<Option<Object>> {
1888            let child_object =
1889                match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1890                    None => return Ok(None),
1891                    Some(o) => o,
1892                };
1893            let child_version = child_object.version();
1894            if child_object.version() > child_version_upper_bound {
1895                return Err(IotaError::Unknown(format!(
1896                    "Invariant Violation. Replay loaded child_object {child} at version \
1897                    {child_version} but expected the version to be <= {child_version_upper_bound}"
1898                )));
1899            }
1900            let parent = *parent;
1901            if child_object.owner != Owner::ObjectOwner(parent.into()) {
1902                return Err(IotaError::InvalidChildObjectAccess {
1903                    object: *child,
1904                    given_parent: parent,
1905                    actual_owner: child_object.owner,
1906                });
1907            }
1908            Ok(Some(child_object))
1909        }
1910
1911        let res = inner(self, parent, child, child_version_upper_bound);
1912        self.exec_store_events
1913            .lock()
1914            .expect("Unable to lock events list")
1915            .push(
1916                ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1917                    parent: *parent,
1918                    child: *child,
1919                    result: res.clone(),
1920                },
1921            );
1922        res
1923    }
1924
1925    fn get_object_received_at_version(
1926        &self,
1927        owner: &ObjectID,
1928        receiving_object_id: &ObjectID,
1929        receive_object_at_version: SequenceNumber,
1930        _epoch_id: EpochId,
1931    ) -> IotaResult<Option<Object>> {
1932        fn inner(
1933            self_: &LocalExec,
1934            owner: &ObjectID,
1935            receiving_object_id: &ObjectID,
1936            receive_object_at_version: SequenceNumber,
1937        ) -> IotaResult<Option<Object>> {
1938            let recv_object = match self_.get_object(receiving_object_id)? {
1939                None => return Ok(None),
1940                Some(o) => o,
1941            };
1942            if recv_object.version() != receive_object_at_version {
1943                return Err(IotaError::Unknown(format!(
1944                    "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
1945                    {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
1946                )));
1947            }
1948            if recv_object.owner != Owner::AddressOwner((*owner).into()) {
1949                return Ok(None);
1950            }
1951            Ok(Some(recv_object))
1952        }
1953
1954        let res = inner(self, owner, receiving_object_id, receive_object_at_version);
1955        self.exec_store_events
1956            .lock()
1957            .expect("Unable to lock events list")
1958            .push(ExecutionStoreEvent::ReceiveObject {
1959                owner: *owner,
1960                receive: *receiving_object_id,
1961                receive_at_version: receive_object_at_version,
1962                result: res.clone(),
1963            });
1964        res
1965    }
1966}
1967
1968impl ResourceResolver for LocalExec {
1969    type Error = IotaError;
1970
1971    /// In this case we might need to download a Move object on the fly which
1972    /// was not present in the modified at versions list because packages
1973    /// are immutable
1974    fn get_resource(
1975        &self,
1976        address: &AccountAddress,
1977        typ: &StructTag,
1978    ) -> IotaResult<Option<Vec<u8>>> {
1979        fn inner(
1980            self_: &LocalExec,
1981            address: &AccountAddress,
1982            typ: &StructTag,
1983        ) -> IotaResult<Option<Vec<u8>>> {
1984            // If package not present fetch it from the network or some remote location
1985            let Some(object) = self_.get_or_download_object(
1986                &ObjectID::from(*address),
1987                false, // we expect a Move obj
1988            )?
1989            else {
1990                return Ok(None);
1991            };
1992
1993            match &object.data {
1994                Data::Move(m) => {
1995                    assert!(
1996                        m.is_type(typ),
1997                        "Invariant violation: ill-typed object in storage \
1998                        or bad object request from caller"
1999                    );
2000                    Ok(Some(m.contents().to_vec()))
2001                }
2002                other => unimplemented!(
2003                    "Bad object lookup: expected Move object, but got {:#?}",
2004                    other
2005                ),
2006            }
2007        }
2008
2009        let res = inner(self, address, typ);
2010        self.exec_store_events
2011            .lock()
2012            .expect("Unable to lock events list")
2013            .push(ExecutionStoreEvent::ResourceResolverGetResource {
2014                address: *address,
2015                typ: typ.clone(),
2016                result: res.clone(),
2017            });
2018        res
2019    }
2020}
2021
2022impl ModuleResolver for LocalExec {
2023    type Error = IotaError;
2024
2025    /// This fetches a module which must already be present in the store
2026    /// We do not download
2027    fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2028        fn inner(self_: &LocalExec, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2029            get_module(self_, module_id)
2030        }
2031
2032        let res = inner(self, module_id);
2033        self.exec_store_events
2034            .lock()
2035            .expect("Unable to lock events list")
2036            .push(ExecutionStoreEvent::ModuleResolverGetModule {
2037                module_id: module_id.clone(),
2038                result: res.clone(),
2039            });
2040        res
2041    }
2042}
2043
2044impl ModuleResolver for &mut LocalExec {
2045    type Error = IotaError;
2046
2047    fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2048        // Recording event here will be double-counting since its already recorded in
2049        // the get_module fn
2050        (**self).get_module(module_id)
2051    }
2052}
2053
2054impl ObjectStore for LocalExec {
2055    /// The object must be present in store by normal process we used to
2056    /// backfill store in init We dont download if not present
2057    fn get_object(
2058        &self,
2059        object_id: &ObjectID,
2060    ) -> iota_types::storage::error::Result<Option<Object>> {
2061        let res = self
2062            .storage
2063            .live_objects_store
2064            .lock()
2065            .expect("Can't lock")
2066            .get(object_id)
2067            .cloned();
2068        self.exec_store_events
2069            .lock()
2070            .expect("Unable to lock events list")
2071            .push(ExecutionStoreEvent::ObjectStoreGetObject {
2072                object_id: *object_id,
2073                result: Ok(res.clone()),
2074            });
2075        Ok(res)
2076    }
2077
2078    /// The object must be present in store by normal process we used to
2079    /// backfill store in init We dont download if not present
2080    fn get_object_by_key(
2081        &self,
2082        object_id: &ObjectID,
2083        version: VersionNumber,
2084    ) -> iota_types::storage::error::Result<Option<Object>> {
2085        let res = self
2086            .storage
2087            .live_objects_store
2088            .lock()
2089            .expect("Can't lock")
2090            .get(object_id)
2091            .and_then(|obj| {
2092                if obj.version() == version {
2093                    Some(obj.clone())
2094                } else {
2095                    None
2096                }
2097            });
2098
2099        self.exec_store_events
2100            .lock()
2101            .expect("Unable to lock events list")
2102            .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2103                object_id: *object_id,
2104                version,
2105                result: Ok(res.clone()),
2106            });
2107
2108        Ok(res)
2109    }
2110}
2111
2112impl ObjectStore for &mut LocalExec {
2113    fn get_object(
2114        &self,
2115        object_id: &ObjectID,
2116    ) -> iota_types::storage::error::Result<Option<Object>> {
2117        // Recording event here will be double-counting since its already recorded in
2118        // the get_module fn
2119        (**self).get_object(object_id)
2120    }
2121
2122    fn get_object_by_key(
2123        &self,
2124        object_id: &ObjectID,
2125        version: VersionNumber,
2126    ) -> iota_types::storage::error::Result<Option<Object>> {
2127        // Recording event here will be double-counting since its already recorded in
2128        // the get_module fn
2129        (**self).get_object_by_key(object_id, version)
2130    }
2131}
2132
2133impl GetModule for LocalExec {
2134    type Error = IotaError;
2135    type Item = CompiledModule;
2136
2137    fn get_module_by_id(&self, id: &ModuleId) -> IotaResult<Option<Self::Item>> {
2138        let res = get_module_by_id(self, id);
2139
2140        self.exec_store_events
2141            .lock()
2142            .expect("Unable to lock events list")
2143            .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2144                id: id.clone(),
2145                result: res.clone(),
2146            });
2147        res
2148    }
2149}
2150
2151// <--------------------- Util functions ----------------------->
2152
2153pub fn get_executor(
2154    executor_version_override: Option<i64>,
2155    protocol_config: &ProtocolConfig,
2156    _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2157    enable_profiler: Option<PathBuf>,
2158) -> Arc<dyn Executor + Send + Sync> {
2159    let protocol_config = executor_version_override
2160        .map(|q| {
2161            let ver = if q < 0 {
2162                ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2163            } else {
2164                q as u64
2165            };
2166
2167            let mut c = protocol_config.clone();
2168            c.set_execution_version_for_testing(ver);
2169            c
2170        })
2171        .unwrap_or(protocol_config.clone());
2172
2173    let silent = true;
2174    iota_execution::executor(&protocol_config, silent, enable_profiler)
2175        .expect("Creating an executor should not fail here")
2176}
2177
2178fn parse_effect_error_for_denied_coins(status: &IotaExecutionStatus) -> Option<String> {
2179    let IotaExecutionStatus::Failure { error } = status else {
2180        return None;
2181    };
2182    parse_denied_error_string(error)
2183}
2184
2185fn parse_denied_error_string(error: &str) -> Option<String> {
2186    let regulated_regex = regex::Regex::new(
2187        r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2188    )
2189    .unwrap();
2190
2191    let caps = regulated_regex.captures(error)?;
2192    Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2193}
2194
2195#[cfg(test)]
2196mod tests {
2197    use super::parse_denied_error_string;
2198    #[test]
2199    fn test_regex_regulated_coin_errors() {
2200        let test_bank = vec![
2201            "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2202            "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2203        ];
2204        let expected_string =
2205            "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2206
2207        for test in &test_bank {
2208            assert!(parse_denied_error_string(test).unwrap() == expected_string);
2209        }
2210    }
2211}