iota_replay/
replay.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashSet},
7    path::PathBuf,
8    sync::{Arc, Mutex},
9};
10
11use futures::executor::block_on;
12use iota_config::node::ExpensiveSafetyCheckConfig;
13use iota_core::authority::NodeStateDump;
14use iota_execution::Executor;
15use iota_framework::BuiltInFramework;
16use iota_json_rpc_types::{
17    IotaExecutionStatus, IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI,
18};
19use iota_protocol_config::{Chain, ProtocolConfig};
20use iota_sdk::{IotaClient, IotaClientBuilder};
21use iota_types::{
22    IOTA_DENY_LIST_OBJECT_ID,
23    base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
24    committee::EpochId,
25    digests::{ObjectDigest, TransactionDigest},
26    error::{ExecutionError, IotaError, IotaResult},
27    executable_transaction::VerifiedExecutableTransaction,
28    gas::IotaGasStatus,
29    in_memory_storage::InMemoryStorage,
30    inner_temporary_store::InnerTemporaryStore,
31    message_envelope::Message,
32    metrics::LimitsMetrics,
33    object::{Data, Object, Owner},
34    storage::{
35        BackingPackageStore, ChildObjectResolver, ObjectStore, PackageObject, get_module,
36        get_module_by_id,
37    },
38    transaction::{
39        CheckedInputObjects, InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
40        SenderSignedData, Transaction, TransactionDataAPI, TransactionKind,
41        TransactionKind::ProgrammableTransaction, VerifiedTransaction,
42    },
43};
44use move_binary_format::CompiledModule;
45use move_bytecode_utils::module_cache::GetModule;
46use move_core_types::{
47    account_address::AccountAddress,
48    language_storage::{ModuleId, StructTag},
49    resolver::{ModuleResolver, ResourceResolver},
50};
51use prometheus::Registry;
52use serde::{Deserialize, Serialize};
53use similar::{ChangeTag, TextDiff};
54use tracing::{error, info, trace, warn};
55
56use crate::{
57    chain_from_chain_id,
58    data_fetcher::{
59        DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
60    },
61    displays::{
62        Pretty,
63        transaction_displays::{FullPTB, transform_command_results_to_annotated},
64    },
65    types::*,
66};
67
68// TODO: add persistent cache. But perf is good enough already.
69
70#[derive(Debug, Serialize, Deserialize)]
71pub struct ExecutionSandboxState {
72    /// Information describing the transaction
73    pub transaction_info: OnChainTransactionInfo,
74    /// All the objects that are required for the execution of the transaction
75    pub required_objects: Vec<Object>,
76    /// Temporary store from executing this locally in
77    /// `execute_transaction_to_effects`
78    #[serde(skip)]
79    pub local_exec_temporary_store: Option<InnerTemporaryStore>,
80    /// Effects from executing this locally in `execute_transaction_to_effects`
81    pub local_exec_effects: IotaTransactionBlockEffects,
82    /// Status from executing this locally in `execute_transaction_to_effects`
83    #[serde(skip)]
84    pub local_exec_status: Option<Result<(), ExecutionError>>,
85}
86
87impl ExecutionSandboxState {
88    pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
89        if self.transaction_info.effects != self.local_exec_effects {
90            error!("Replay tool forked {}", self.transaction_info.tx_digest);
91            let diff = self.diff_effects();
92            println!("{}", diff);
93            return Err(ReplayEngineError::EffectsForked {
94                digest: self.transaction_info.tx_digest,
95                diff: format!("\n{}", diff),
96                on_chain: Box::new(self.transaction_info.effects.clone()),
97                local: Box::new(self.local_exec_effects.clone()),
98            });
99        }
100        Ok(())
101    }
102
103    /// Utility to diff effects in a human readable format
104    pub fn diff_effects(&self) -> String {
105        let eff1 = &self.transaction_info.effects;
106        let eff2 = &self.local_exec_effects;
107        let on_chain_str = format!("{:#?}", eff1);
108        let local_chain_str = format!("{:#?}", eff2);
109        let mut res = vec![];
110
111        let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
112        for change in diff.iter_all_changes() {
113            let sign = match change.tag() {
114                ChangeTag::Delete => "---",
115                ChangeTag::Insert => "+++",
116                ChangeTag::Equal => "   ",
117            };
118            res.push(format!("{}{}", sign, change));
119        }
120
121        res.join("")
122    }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct ProtocolVersionSummary {
127    /// Protocol version at this point
128    pub protocol_version: u64,
129    /// The first epoch that uses this protocol version
130    pub epoch_start: u64,
131    /// The last epoch that uses this protocol version
132    pub epoch_end: u64,
133    /// The first checkpoint in this protocol v ersion
134    pub checkpoint_start: Option<u64>,
135    /// The last checkpoint in this protocol version
136    pub checkpoint_end: Option<u64>,
137    /// The transaction which triggered this epoch change
138    pub epoch_change_tx: TransactionDigest,
139}
140
141#[derive(Clone)]
142pub struct Storage {
143    /// These are objects at the frontier of the execution's view
144    /// They might not be the latest object currently but they are the latest
145    /// objects for the TX at the time it was run
146    /// This store cannot be shared between runners
147    pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
148
149    /// Package cache and object version cache can be shared between runners
150    /// Non system packages are immutable so we can cache these
151    pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
152    /// Object contents are frozen at their versions so we can cache these
153    /// We must place system packages here as well
154    pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
155}
156
157impl std::fmt::Display for Storage {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        writeln!(f, "Live object store")?;
160        for (id, obj) in self
161            .live_objects_store
162            .lock()
163            .expect("Unable to lock")
164            .iter()
165        {
166            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
167        }
168        writeln!(f, "Package cache")?;
169        for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
170            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
171        }
172        writeln!(f, "Object version cache")?;
173        for (id, _) in self
174            .object_version_cache
175            .lock()
176            .expect("Unable to lock")
177            .iter()
178        {
179            writeln!(f, "{}: {}", id.0, id.1)?;
180        }
181
182        write!(f, "")
183    }
184}
185
186impl Storage {
187    pub fn default() -> Self {
188        Self {
189            live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
190            package_cache: Arc::new(Mutex::new(BTreeMap::new())),
191            object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
192        }
193    }
194
195    pub fn all_objects(&self) -> Vec<Object> {
196        self.live_objects_store
197            .lock()
198            .expect("Unable to lock")
199            .values()
200            .cloned()
201            .chain(
202                self.package_cache
203                    .lock()
204                    .expect("Unable to lock")
205                    .iter()
206                    .map(|(_, obj)| obj.clone()),
207            )
208            .chain(
209                self.object_version_cache
210                    .lock()
211                    .expect("Unable to lock")
212                    .iter()
213                    .map(|(_, obj)| obj.clone()),
214            )
215            .collect::<Vec<_>>()
216    }
217}
218
219#[derive(Clone)]
220pub struct LocalExec {
221    pub client: Option<IotaClient>,
222    // For a given protocol version, what TX created it, and what is the valid range of epochs
223    // at this protocol version.
224    pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
225    // For a given protocol version, the mapping valid sequence numbers for each framework package
226    pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
227    // The current protocol version for this execution
228    pub current_protocol_version: u64,
229    // All state is contained here
230    pub storage: Storage,
231    // Debug events
232    pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
233    // Debug events
234    pub metrics: Arc<LimitsMetrics>,
235    // Used for fetching data from the network or remote store
236    pub fetcher: Fetchers,
237
238    // One can optionally override the executor version
239    // -1 implies use latest version
240    pub executor_version: Option<i64>,
241    // One can optionally override the protocol version
242    // -1 implies use latest version
243    // None implies use the protocol version at the time of execution
244    pub protocol_version: Option<i64>,
245    // Whether or not to enable the gas profiler, the PathBuf contains either a user specified
246    // filepath or the default current directory and name format for the profile output
247    pub enable_profiler: Option<PathBuf>,
248    pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
249    // Retry policies due to RPC errors
250    pub num_retries_for_timeout: u32,
251    pub sleep_period_for_timeout: std::time::Duration,
252}
253
254impl LocalExec {
255    /// Wrapper around fetcher in case we want to add more functionality
256    /// Such as fetching from local DB from snapshot
257    pub async fn multi_download(
258        &self,
259        objs: &[(ObjectID, SequenceNumber)],
260    ) -> Result<Vec<Object>, ReplayEngineError> {
261        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
262        while num_retries_for_timeout >= 0 {
263            match self.fetcher.multi_get_versioned(objs).await {
264                Ok(objs) => return Ok(objs),
265                Err(ReplayEngineError::IotaRpcRequestTimeout) => {
266                    warn!(
267                        "RPC request timed out. Retries left {}. Sleeping for {}s",
268                        num_retries_for_timeout,
269                        self.sleep_period_for_timeout.as_secs()
270                    );
271                    num_retries_for_timeout -= 1;
272                    tokio::time::sleep(self.sleep_period_for_timeout).await;
273                }
274                Err(e) => return Err(e),
275            }
276        }
277        Err(ReplayEngineError::IotaRpcRequestTimeout)
278    }
279    /// Wrapper around fetcher in case we want to add more functionality
280    /// Such as fetching from local DB from snapshot
281    pub async fn multi_download_latest(
282        &self,
283        objs: &[ObjectID],
284    ) -> Result<Vec<Object>, ReplayEngineError> {
285        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
286        while num_retries_for_timeout >= 0 {
287            match self.fetcher.multi_get_latest(objs).await {
288                Ok(objs) => return Ok(objs),
289                Err(ReplayEngineError::IotaRpcRequestTimeout) => {
290                    warn!(
291                        "RPC request timed out. Retries left {}. Sleeping for {}s",
292                        num_retries_for_timeout,
293                        self.sleep_period_for_timeout.as_secs()
294                    );
295                    num_retries_for_timeout -= 1;
296                    tokio::time::sleep(self.sleep_period_for_timeout).await;
297                }
298                Err(e) => return Err(e),
299            }
300        }
301        Err(ReplayEngineError::IotaRpcRequestTimeout)
302    }
303
304    pub async fn fetch_loaded_child_refs(
305        &self,
306        tx_digest: &TransactionDigest,
307    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
308        // Get the child objects loaded
309        self.fetcher.get_loaded_child_objects(tx_digest).await
310    }
311
312    pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
313        Self::new_for_remote(
314            IotaClientBuilder::default()
315                .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
316                .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
317                .build(http_url)
318                .await?,
319            None,
320        )
321        .await
322    }
323
324    pub async fn replay_with_network_config(
325        rpc_url: String,
326        tx_digest: TransactionDigest,
327        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
328        use_authority: bool,
329        executor_version: Option<i64>,
330        protocol_version: Option<i64>,
331        enable_profiler: Option<PathBuf>,
332        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
333    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
334        info!("Using RPC URL: {}", rpc_url);
335        LocalExec::new_from_fn_url(&rpc_url)
336            .await?
337            .init_for_execution()
338            .await?
339            .execute_transaction(
340                &tx_digest,
341                expensive_safety_check_config,
342                use_authority,
343                executor_version,
344                protocol_version,
345                enable_profiler,
346                config_and_versions,
347            )
348            .await
349    }
350
351    /// This captures the state of the network at a given point in time and
352    /// populates prptocol version tables including which system packages to
353    /// fetch If this function is called across epoch boundaries, the info
354    /// might be stale. But it should only be called once per epoch.
355    pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
356        self.populate_protocol_version_tables().await?;
357        tokio::task::yield_now().await;
358        Ok(self)
359    }
360
361    pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
362        Self::new_for_remote(
363            self.client.expect("Remote client not initialized"),
364            Some(self.fetcher.into_remote()),
365        )
366        .await?
367        .init_for_execution()
368        .await
369    }
370
371    pub async fn new_for_remote(
372        client: IotaClient,
373        remote_fetcher: Option<RemoteFetcher>,
374    ) -> Result<Self, ReplayEngineError> {
375        // Use a throwaway metrics registry for local execution.
376        let registry = prometheus::Registry::new();
377        let metrics = Arc::new(LimitsMetrics::new(&registry));
378
379        let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
380
381        Ok(Self {
382            client: Some(client),
383            protocol_version_epoch_table: BTreeMap::new(),
384            protocol_version_system_package_table: BTreeMap::new(),
385            current_protocol_version: 0,
386            exec_store_events: Arc::new(Mutex::new(Vec::new())),
387            metrics,
388            storage: Storage::default(),
389            fetcher: Fetchers::Remote(fetcher),
390            // TODO: make these configurable
391            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
392            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
393            executor_version: None,
394            protocol_version: None,
395            enable_profiler: None,
396            config_and_versions: None,
397        })
398    }
399
400    pub async fn new_for_state_dump(
401        path: &str,
402        backup_rpc_url: Option<String>,
403    ) -> Result<Self, ReplayEngineError> {
404        // Use a throwaway metrics registry for local execution.
405        let registry = prometheus::Registry::new();
406        let metrics = Arc::new(LimitsMetrics::new(&registry));
407
408        let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
409        let current_protocol_version = state.protocol_version;
410        let fetcher = match backup_rpc_url {
411            Some(url) => NodeStateDumpFetcher::new(
412                state,
413                Some(RemoteFetcher::new(
414                    IotaClientBuilder::default()
415                        .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
416                        .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
417                        .build(url)
418                        .await?,
419                )),
420            ),
421            None => NodeStateDumpFetcher::new(state, None),
422        };
423
424        Ok(Self {
425            client: None,
426            protocol_version_epoch_table: BTreeMap::new(),
427            protocol_version_system_package_table: BTreeMap::new(),
428            current_protocol_version,
429            exec_store_events: Arc::new(Mutex::new(Vec::new())),
430            metrics,
431            storage: Storage::default(),
432            fetcher: Fetchers::NodeStateDump(fetcher),
433            // TODO: make these configurable
434            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
435            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
436            executor_version: None,
437            protocol_version: None,
438            enable_profiler: None,
439            config_and_versions: None,
440        })
441    }
442
443    pub async fn multi_download_and_store(
444        &mut self,
445        objs: &[(ObjectID, SequenceNumber)],
446    ) -> Result<Vec<Object>, ReplayEngineError> {
447        let objs = self.multi_download(objs).await?;
448
449        // Backfill the store
450        for obj in objs.iter() {
451            let o_ref = obj.compute_object_reference();
452            self.storage
453                .live_objects_store
454                .lock()
455                .expect("Can't lock")
456                .insert(o_ref.0, obj.clone());
457            self.storage
458                .object_version_cache
459                .lock()
460                .expect("Cannot lock")
461                .insert((o_ref.0, o_ref.1), obj.clone());
462            if obj.is_package() {
463                self.storage
464                    .package_cache
465                    .lock()
466                    .expect("Cannot lock")
467                    .insert(o_ref.0, obj.clone());
468            }
469        }
470        tokio::task::yield_now().await;
471        Ok(objs)
472    }
473
474    pub async fn multi_download_relevant_packages_and_store(
475        &mut self,
476        objs: Vec<ObjectID>,
477        protocol_version: u64,
478    ) -> Result<Vec<Object>, ReplayEngineError> {
479        let syst_packages = self.system_package_versions_for_protocol_version(protocol_version)?;
480        let syst_packages_objs = self.multi_download(&syst_packages).await?;
481
482        // Download latest version of all packages that are not system packages
483        // This is okay since the versions can never change
484        let non_system_package_objs: Vec<_> = objs
485            .into_iter()
486            .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
487            .collect();
488        let objs = self
489            .multi_download_latest(&non_system_package_objs)
490            .await?
491            .into_iter()
492            .chain(syst_packages_objs.into_iter());
493
494        for obj in objs.clone() {
495            let o_ref = obj.compute_object_reference();
496            // We dont always want the latest in store
497            // self.storage.store.insert(o_ref.0, obj.clone());
498            self.storage
499                .object_version_cache
500                .lock()
501                .expect("Cannot lock")
502                .insert((o_ref.0, o_ref.1), obj.clone());
503            if obj.is_package() {
504                self.storage
505                    .package_cache
506                    .lock()
507                    .expect("Cannot lock")
508                    .insert(o_ref.0, obj.clone());
509            }
510        }
511        Ok(objs.collect())
512    }
513
514    // TODO: remove this after `futures::executor::block_on` is removed.
515    #[expect(clippy::disallowed_methods)]
516    pub fn download_object(
517        &self,
518        object_id: &ObjectID,
519        version: SequenceNumber,
520    ) -> Result<Object, ReplayEngineError> {
521        if self
522            .storage
523            .object_version_cache
524            .lock()
525            .expect("Cannot lock")
526            .contains_key(&(*object_id, version))
527        {
528            return Ok(self
529                .storage
530                .object_version_cache
531                .lock()
532                .expect("Cannot lock")
533                .get(&(*object_id, version))
534                .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
535                    id: *object_id,
536                    version: Some(version),
537                })?
538                .clone());
539        }
540
541        let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
542            q.pop().unwrap_or_else(|| {
543                panic!(
544                    "Downloaded obj response cannot be empty {:?}",
545                    (*object_id, version)
546                )
547            })
548        })?;
549
550        let o_ref = o.compute_object_reference();
551        self.storage
552            .object_version_cache
553            .lock()
554            .expect("Cannot lock")
555            .insert((o_ref.0, o_ref.1), o.clone());
556        Ok(o)
557    }
558
559    // TODO: remove this after `futures::executor::block_on` is removed.
560    #[expect(clippy::disallowed_methods)]
561    pub fn download_latest_object(
562        &self,
563        object_id: &ObjectID,
564    ) -> Result<Option<Object>, ReplayEngineError> {
565        let resp = block_on({
566            // info!("Downloading latest object {object_id}");
567            self.multi_download_latest(&[*object_id])
568        })
569        .map(|mut q| {
570            q.pop()
571                .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
572        });
573
574        match resp {
575            Ok(v) => Ok(Some(v)),
576            Err(ReplayEngineError::ObjectNotExist { id }) => {
577                error!(
578                    "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
579                );
580                Ok(None)
581            }
582            Err(ReplayEngineError::ObjectDeleted {
583                id,
584                version,
585                digest,
586            }) => {
587                error!("Object {id} {version} {digest} was deleted on RPC server.");
588                Ok(None)
589            }
590            Err(err) => Err(ReplayEngineError::IotaRpcError {
591                err: err.to_string(),
592            }),
593        }
594    }
595
596    #[expect(clippy::disallowed_methods)]
597    pub fn download_object_by_upper_bound(
598        &self,
599        object_id: &ObjectID,
600        version_upper_bound: VersionNumber,
601    ) -> Result<Option<Object>, ReplayEngineError> {
602        let local_object = self
603            .storage
604            .live_objects_store
605            .lock()
606            .expect("Can't lock")
607            .get(object_id)
608            .cloned();
609        if local_object.is_some() {
610            return Ok(local_object);
611        }
612        let response = block_on({
613            self.fetcher
614                .get_child_object(object_id, version_upper_bound)
615        });
616        match response {
617            Ok(object) => {
618                let obj_ref = object.compute_object_reference();
619                self.storage
620                    .live_objects_store
621                    .lock()
622                    .expect("Can't lock")
623                    .insert(*object_id, object.clone());
624                self.storage
625                    .object_version_cache
626                    .lock()
627                    .expect("Can't lock")
628                    .insert((obj_ref.0, obj_ref.1), object.clone());
629                Ok(Some(object))
630            }
631            Err(ReplayEngineError::ObjectNotExist { id }) => {
632                error!(
633                    "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
634                );
635                Ok(None)
636            }
637            Err(ReplayEngineError::ObjectDeleted {
638                id,
639                version,
640                digest,
641            }) => {
642                error!("Object {id} {version} {digest} was deleted on RPC server.");
643                Ok(None)
644            }
645            // This is a child object which was not found in the store (e.g., due to exists
646            // check before creating the dynamic field).
647            Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
648                info!(
649                    "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
650                );
651                Ok(None)
652            }
653            Err(err) => Err(ReplayEngineError::IotaRpcError {
654                err: err.to_string(),
655            }),
656        }
657    }
658
659    pub async fn get_checkpoint_txs(
660        &self,
661        checkpoint_id: u64,
662    ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
663        self.fetcher
664            .get_checkpoint_txs(checkpoint_id)
665            .await
666            .map_err(|e| ReplayEngineError::IotaRpcError { err: e.to_string() })
667    }
668
669    pub async fn execute_all_in_checkpoints(
670        &mut self,
671        checkpoint_ids: &[u64],
672        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
673        terminate_early: bool,
674        use_authority: bool,
675    ) -> Result<(u64, u64), ReplayEngineError> {
676        // Get all the TXs at this checkpoint
677        let mut txs = Vec::new();
678        for checkpoint_id in checkpoint_ids {
679            txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
680        }
681        let num = txs.len();
682        let mut succeeded = 0;
683        for tx in txs {
684            match self
685                .execute_transaction(
686                    &tx,
687                    expensive_safety_check_config.clone(),
688                    use_authority,
689                    None,
690                    None,
691                    None,
692                    None,
693                )
694                .await
695                .map(|q| q.check_effects())
696            {
697                Err(e) | Ok(Err(e)) => {
698                    if terminate_early {
699                        return Err(e);
700                    }
701                    error!("Error executing tx: {},  {:#?}", tx, e);
702                    continue;
703                }
704                _ => (),
705            }
706
707            succeeded += 1;
708        }
709        Ok((succeeded, num as u64))
710    }
711
712    pub async fn execution_engine_execute_with_tx_info_impl(
713        &mut self,
714        tx_info: &OnChainTransactionInfo,
715        override_transaction_kind: Option<TransactionKind>,
716        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
717    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
718        let tx_digest = &tx_info.tx_digest;
719        // TODO: Support system transactions.
720        if tx_info.sender_signed_data.transaction_data().is_system_tx() {
721            warn!(
722                "System TX replay not supported: {}, skipping transaction",
723                tx_digest
724            );
725            return Err(ReplayEngineError::TransactionNotSupported {
726                digest: *tx_digest,
727                reason: "System transaction".to_string(),
728            });
729        }
730
731        // Initialize the state necessary for execution
732        // Get the input objects
733        let input_objects = self.initialize_execution_env_state(tx_info).await?;
734        assert_eq!(
735            &input_objects.filter_shared_objects().len(),
736            &tx_info.shared_object_refs.len()
737        );
738        // At this point we have all the objects needed for replay
739
740        // This assumes we already initialized the protocol version table
741        // `protocol_version_epoch_table`
742        let protocol_config =
743            &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
744
745        let metrics = self.metrics.clone();
746
747        let ov = self.executor_version;
748
749        // We could probably cache the executor per protocol config
750        let executor = get_executor(
751            ov,
752            protocol_config,
753            expensive_safety_check_config,
754            self.enable_profiler.clone(),
755        );
756
757        // All prep done
758        let expensive_checks = true;
759        let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
760        let certificate_deny_set = HashSet::new();
761        let (inner_store, gas_status, effects, result) = if let Ok(gas_status) = IotaGasStatus::new(
762            tx_info.gas_budget,
763            tx_info.gas_price,
764            tx_info.reference_gas_price,
765            protocol_config,
766        ) {
767            executor.execute_transaction_to_effects(
768                &self,
769                protocol_config,
770                metrics.clone(),
771                expensive_checks,
772                &certificate_deny_set,
773                &tx_info.executed_epoch,
774                tx_info.epoch_start_timestamp,
775                CheckedInputObjects::new_for_replay(input_objects.clone()),
776                tx_info.gas.clone(),
777                gas_status,
778                transaction_kind.clone(),
779                tx_info.sender,
780                *tx_digest,
781                &mut None,
782            )
783        } else {
784            unreachable!("Transaction was valid so gas status must be valid");
785        };
786
787        if let Err(err) = self.pretty_print_for_tracing(
788            &gas_status,
789            &executor,
790            tx_info,
791            &transaction_kind,
792            protocol_config,
793            metrics,
794            expensive_checks,
795            input_objects.clone(),
796        ) {
797            error!("Failed to pretty print for tracing: {:?}", err);
798        }
799
800        let all_required_objects = self.storage.all_objects();
801
802        let effects =
803            IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
804
805        Ok(ExecutionSandboxState {
806            transaction_info: tx_info.clone(),
807            required_objects: all_required_objects,
808            local_exec_temporary_store: Some(inner_store),
809            local_exec_effects: effects,
810            local_exec_status: Some(result),
811        })
812    }
813
814    fn pretty_print_for_tracing(
815        &self,
816        gas_status: &IotaGasStatus,
817        executor: &Arc<dyn Executor + Send + Sync>,
818        tx_info: &OnChainTransactionInfo,
819        transaction_kind: &TransactionKind,
820        protocol_config: &ProtocolConfig,
821        metrics: Arc<LimitsMetrics>,
822        expensive_checks: bool,
823        input_objects: InputObjects,
824    ) -> anyhow::Result<()> {
825        trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
826
827        let skip_checks = true;
828        if let ProgrammableTransaction(pt) = transaction_kind {
829            trace!(
830                target: "replay_ptb_info",
831                "{}",
832                Pretty(&FullPTB {
833                    ptb: pt.clone(),
834                    results: transform_command_results_to_annotated(
835                        executor,
836                        &self.clone(),
837                        executor.dev_inspect_transaction(
838                            &self,
839                            protocol_config,
840                            metrics,
841                            expensive_checks,
842                            &HashSet::new(),
843                            &tx_info.executed_epoch,
844                            tx_info.epoch_start_timestamp,
845                            CheckedInputObjects::new_for_replay(input_objects),
846                            tx_info.gas.clone(),
847                            IotaGasStatus::new(
848                                tx_info.gas_budget,
849                                tx_info.gas_price,
850                                tx_info.reference_gas_price,
851                                protocol_config,
852                            )?,
853                            transaction_kind.clone(),
854                            tx_info.sender,
855                            tx_info.sender_signed_data.digest(),
856                            skip_checks,
857                        )
858                        .3
859                        .unwrap_or_default(),
860                    )?,
861            }));
862        }
863        Ok(())
864    }
865
866    /// Must be called after `init_for_execution`
867    pub async fn execution_engine_execute_impl(
868        &mut self,
869        tx_digest: &TransactionDigest,
870        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
871    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
872        if self.is_remote_replay() {
873            assert!(
874                !self.protocol_version_system_package_table.is_empty()
875                    || !self.protocol_version_epoch_table.is_empty(),
876                "Required tables not populated. Must call `init_for_execution` before executing transactions"
877            );
878        }
879
880        let tx_info = if self.is_remote_replay() {
881            self.resolve_tx_components(tx_digest).await?
882        } else {
883            self.resolve_tx_components_from_dump(tx_digest).await?
884        };
885        self.execution_engine_execute_with_tx_info_impl(
886            &tx_info,
887            None,
888            expensive_safety_check_config,
889        )
890        .await
891    }
892
893    /// Executes a transaction with the state specified in `pre_run_sandbox`
894    /// This is useful for executing a transaction with a specific state
895    /// However if the state in invalid, the behavior is undefined.
896    pub async fn certificate_execute_with_sandbox_state(
897        pre_run_sandbox: &ExecutionSandboxState,
898    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
899        // These cannot be changed and are inherited from the sandbox state
900        let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
901        let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
902        let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
903        let protocol_config = ProtocolConfig::get_for_version(
904            pre_run_sandbox.transaction_info.protocol_version,
905            pre_run_sandbox.transaction_info.chain,
906        );
907        let required_objects = pre_run_sandbox.required_objects.clone();
908        let store = InMemoryStorage::new(required_objects.clone());
909
910        let transaction =
911            Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
912
913        // TODO: This will not work for deleted shared objects. We need to persist that
914        // information in the sandbox. TODO: A lot of the following code is
915        // replicated in several places. We should introduce a few traits and
916        // make them shared so that we don't have to fix one by one when we have major
917        // execution layer changes.
918        let input_objects = store.read_input_objects_for_transaction(&transaction);
919        let executable = VerifiedExecutableTransaction::new_from_quorum_execution(
920            VerifiedTransaction::new_unchecked(transaction),
921            executed_epoch,
922        );
923        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
924            &executable,
925            input_objects,
926            &protocol_config,
927            reference_gas_price,
928        )
929        .unwrap();
930        let (kind, signer, gas) = executable.transaction_data().execution_parts();
931        let executor = iota_execution::executor(&protocol_config, true, None).unwrap();
932        let (_, _, effects, exec_res) = executor.execute_transaction_to_effects(
933            &store,
934            &protocol_config,
935            Arc::new(LimitsMetrics::new(&Registry::new())),
936            true,
937            &HashSet::new(),
938            &executed_epoch,
939            epoch_start_timestamp,
940            input_objects,
941            gas,
942            gas_status,
943            kind,
944            signer,
945            *executable.digest(),
946            &mut None,
947        );
948
949        let effects =
950            IotaTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
951
952        Ok(ExecutionSandboxState {
953            transaction_info: pre_run_sandbox.transaction_info.clone(),
954            required_objects,
955            local_exec_temporary_store: None, // We dont capture it for cert exec run
956            local_exec_effects: effects,
957            local_exec_status: Some(exec_res),
958        })
959    }
960
961    /// Must be called after `init_for_execution`
962    /// This executes from
963    /// `iota_core::authority::AuthorityState::try_execute_immediately`
964    pub async fn certificate_execute(
965        &mut self,
966        tx_digest: &TransactionDigest,
967        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
968    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
969        // Use the lighterweight execution engine to get the pre-run state
970        let pre_run_sandbox = self
971            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
972            .await?;
973        Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
974    }
975
976    /// Must be called after `init_for_execution`
977    /// This executes from
978    /// `iota_adapter::execution_engine::execute_transaction_to_effects`
979    pub async fn execution_engine_execute(
980        &mut self,
981        tx_digest: &TransactionDigest,
982        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
983    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
984        let sandbox_state = self
985            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
986            .await?;
987
988        Ok(sandbox_state)
989    }
990
991    pub async fn execute_state_dump(
992        &mut self,
993        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
994    ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
995        assert!(!self.is_remote_replay());
996
997        let d = match self.fetcher.clone() {
998            Fetchers::NodeStateDump(d) => d,
999            _ => panic!("Invalid fetcher for state dump"),
1000        };
1001        let tx_digest = d.node_state_dump.clone().tx_digest;
1002        let sandbox_state = self
1003            .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1004            .await?;
1005
1006        Ok((sandbox_state, d.node_state_dump))
1007    }
1008
1009    pub async fn execute_transaction(
1010        &mut self,
1011        tx_digest: &TransactionDigest,
1012        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1013        use_authority: bool,
1014        executor_version: Option<i64>,
1015        protocol_version: Option<i64>,
1016        enable_profiler: Option<PathBuf>,
1017        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1018    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1019        self.executor_version = executor_version;
1020        self.protocol_version = protocol_version;
1021        self.enable_profiler = enable_profiler;
1022        self.config_and_versions = config_and_versions;
1023        if use_authority {
1024            self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1025                .await
1026        } else {
1027            self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1028                .await
1029        }
1030    }
1031    fn system_package_ids(_protocol_version: u64) -> Vec<ObjectID> {
1032        BuiltInFramework::all_package_ids()
1033    }
1034
1035    /// This is the only function which accesses the network during execution
1036    pub fn get_or_download_object(
1037        &self,
1038        obj_id: &ObjectID,
1039        package_expected: bool,
1040    ) -> Result<Option<Object>, ReplayEngineError> {
1041        if package_expected {
1042            if let Some(obj) = self
1043                .storage
1044                .package_cache
1045                .lock()
1046                .expect("Cannot lock")
1047                .get(obj_id)
1048            {
1049                return Ok(Some(obj.clone()));
1050            };
1051            // Check if its a system package because we must've downloaded all
1052            // TODO: Will return this check once we can download completely for
1053            // other networks assert!(
1054            //     !self.system_package_ids().contains(obj_id),
1055            //     "All system packages should be downloaded already"
1056            // );
1057        } else if let Some(obj) = self
1058            .storage
1059            .live_objects_store
1060            .lock()
1061            .expect("Can't lock")
1062            .get(obj_id)
1063        {
1064            return Ok(Some(obj.clone()));
1065        }
1066
1067        let Some(o) = self.download_latest_object(obj_id)? else {
1068            return Ok(None);
1069        };
1070
1071        if o.is_package() {
1072            assert!(
1073                package_expected,
1074                "Did not expect package but downloaded object is a package: {obj_id}"
1075            );
1076
1077            self.storage
1078                .package_cache
1079                .lock()
1080                .expect("Cannot lock")
1081                .insert(*obj_id, o.clone());
1082        }
1083        let o_ref = o.compute_object_reference();
1084        self.storage
1085            .object_version_cache
1086            .lock()
1087            .expect("Cannot lock")
1088            .insert((o_ref.0, o_ref.1), o.clone());
1089        Ok(Some(o))
1090    }
1091
1092    pub fn is_remote_replay(&self) -> bool {
1093        matches!(self.fetcher, Fetchers::Remote(_))
1094    }
1095
1096    /// Must be called after `populate_protocol_version_tables`
1097    pub fn system_package_versions_for_protocol_version(
1098        &self,
1099        protocol_version: u64,
1100    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1101        match &self.fetcher {
1102            Fetchers::Remote(_) => Ok(self
1103                .protocol_version_system_package_table
1104                .get(&protocol_version)
1105                .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1106                    protocol_version,
1107                })?
1108                .clone()
1109                .into_iter()
1110                .collect()),
1111
1112            Fetchers::NodeStateDump(d) => Ok(d
1113                .node_state_dump
1114                .relevant_system_packages
1115                .iter()
1116                .map(|w| (w.id, w.version, w.digest))
1117                .map(|q| (q.0, q.1))
1118                .collect()),
1119        }
1120    }
1121
1122    pub async fn protocol_ver_to_epoch_map(
1123        &self,
1124    ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1125        let mut range_map = BTreeMap::new();
1126        let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1127
1128        // Exception for Genesis: Protocol version 1 at epoch 0
1129        let mut tx_digest = *self
1130            .fetcher
1131            .get_checkpoint_txs(0)
1132            .await?
1133            .first()
1134            .expect("Genesis TX must be in first checkpoint");
1135        // Somehow the genesis TX did not emit any event, but we know it was the start
1136        // of version 1 So we need to manually add this range
1137        let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1138            (0, 1, Some(0u64));
1139
1140        let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1141            (start_epoch, start_protocol_version, start_checkpoint);
1142
1143        (start_epoch, start_protocol_version, start_checkpoint) =
1144            (curr_epoch, curr_protocol_version, curr_checkpoint);
1145
1146        // This is the final tx digest for the epoch change. We need this to track the
1147        // final checkpoint
1148        let mut end_epoch_tx_digest = tx_digest;
1149
1150        for event in epoch_change_events {
1151            (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1152            end_epoch_tx_digest = event.id.tx_digest;
1153
1154            if start_protocol_version == curr_protocol_version {
1155                // Same range
1156                continue;
1157            }
1158
1159            // Change in prot version
1160            // Find the last checkpoint
1161            curr_checkpoint = self
1162                .fetcher
1163                .get_transaction(&event.id.tx_digest)
1164                .await?
1165                .checkpoint;
1166            // Insert the last range
1167            range_map.insert(
1168                start_protocol_version,
1169                ProtocolVersionSummary {
1170                    protocol_version: start_protocol_version,
1171                    epoch_start: start_epoch,
1172                    epoch_end: curr_epoch - 1,
1173                    checkpoint_start: start_checkpoint,
1174                    checkpoint_end: curr_checkpoint.map(|x| x - 1),
1175                    epoch_change_tx: tx_digest,
1176                },
1177            );
1178
1179            start_epoch = curr_epoch;
1180            start_protocol_version = curr_protocol_version;
1181            tx_digest = event.id.tx_digest;
1182            start_checkpoint = curr_checkpoint;
1183        }
1184
1185        // Insert the last range
1186        range_map.insert(
1187            curr_protocol_version,
1188            ProtocolVersionSummary {
1189                protocol_version: curr_protocol_version,
1190                epoch_start: start_epoch,
1191                epoch_end: curr_epoch,
1192                checkpoint_start: curr_checkpoint,
1193                checkpoint_end: self
1194                    .fetcher
1195                    .get_transaction(&end_epoch_tx_digest)
1196                    .await?
1197                    .checkpoint,
1198                epoch_change_tx: tx_digest,
1199            },
1200        );
1201
1202        Ok(range_map)
1203    }
1204
1205    pub fn protocol_version_for_epoch(
1206        epoch: u64,
1207        mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1208    ) -> u64 {
1209        // Naive impl but works for now
1210        // Can improve with range algos & data structures
1211        let mut version = 1;
1212        for (k, v) in mp.iter().rev() {
1213            if v.1 <= epoch {
1214                version = *k;
1215                break;
1216            }
1217        }
1218        version
1219    }
1220
1221    pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1222        self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1223
1224        let system_package_revisions = self.system_package_versions().await?;
1225
1226        // This can be more efficient but small footprint so okay for now
1227        // Table is sorted from earliest to latest
1228        for (
1229            prot_ver,
1230            ProtocolVersionSummary {
1231                epoch_change_tx: tx_digest,
1232                ..
1233            },
1234        ) in self.protocol_version_epoch_table.clone()
1235        {
1236            // Use the previous versions protocol version table
1237            let mut working = if prot_ver <= 1 {
1238                BTreeMap::new()
1239            } else {
1240                self.protocol_version_system_package_table
1241                    .iter()
1242                    .rev()
1243                    .find(|(ver, _)| **ver <= prot_ver)
1244                    .expect("Prev entry must exist")
1245                    .1
1246                    .clone()
1247            };
1248
1249            for (id, versions) in system_package_revisions.iter() {
1250                // Oldest appears first in list, so reverse
1251                for ver in versions.iter().rev() {
1252                    if ver.1 == tx_digest {
1253                        // Found the version for this protocol version
1254                        working.insert(*id, ver.0);
1255                        break;
1256                    }
1257                }
1258            }
1259            self.protocol_version_system_package_table
1260                .insert(prot_ver, working);
1261        }
1262        Ok(())
1263    }
1264
1265    pub async fn system_package_versions(
1266        &self,
1267    ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1268    {
1269        let system_package_ids = Self::system_package_ids(
1270            *self
1271                .protocol_version_epoch_table
1272                .keys()
1273                .peekable()
1274                .last()
1275                .expect("Protocol version epoch table not populated"),
1276        );
1277        let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1278
1279        let mut mapping = BTreeMap::new();
1280
1281        // Extract all the transactions which created or mutated this object
1282        while !system_package_objs.is_empty() {
1283            // For the given object and its version, record the transaction which upgraded
1284            // or created it
1285            let previous_txs: Vec<_> = system_package_objs
1286                .iter()
1287                .map(|o| (o.compute_object_reference(), o.previous_transaction))
1288                .collect();
1289
1290            previous_txs.iter().for_each(|((id, ver, _), tx)| {
1291                mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1292            });
1293
1294            // Next round
1295            // Get the previous version of each object if exists
1296            let previous_ver_refs: Vec<_> = previous_txs
1297                .iter()
1298                .filter_map(|(q, _)| {
1299                    let prev_ver = u64::from(q.1) - 1;
1300                    if prev_ver == 0 {
1301                        None
1302                    } else {
1303                        Some((q.0, SequenceNumber::from(prev_ver)))
1304                    }
1305                })
1306                .collect();
1307            system_package_objs = match self.multi_download(&previous_ver_refs).await {
1308                Ok(packages) => packages,
1309                Err(ReplayEngineError::ObjectNotExist { id }) => {
1310                    // This happens when the RPC server prunes older object
1311                    // Replays in the current protocol version will work but old ones might not
1312                    // as we cannot fetch the package
1313                    warn!(
1314                        "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1315                        id
1316                    );
1317                    break;
1318                }
1319                Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1320                    // This happens when the RPC server prunes older object
1321                    // Replays in the current protocol version will work but old ones might not
1322                    // as we cannot fetch the package
1323                    warn!(
1324                        "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1325                        id, version
1326                    );
1327                    break;
1328                }
1329                Err(ReplayEngineError::ObjectVersionTooHigh {
1330                    id,
1331                    asked_version,
1332                    latest_version,
1333                }) => {
1334                    warn!(
1335                        "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1336                        id, asked_version, latest_version
1337                    );
1338                    break;
1339                }
1340                Err(ReplayEngineError::ObjectDeleted {
1341                    id,
1342                    version,
1343                    digest,
1344                }) => {
1345                    // This happens when the RPC server prunes older object
1346                    // Replays in the current protocol version will work but old ones might not
1347                    // as we cannot fetch the package
1348                    warn!(
1349                        "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1350                        id, version, digest
1351                    );
1352                    break;
1353                }
1354                Err(e) => return Err(e),
1355            };
1356        }
1357        Ok(mapping)
1358    }
1359
1360    pub async fn get_protocol_config(
1361        &self,
1362        epoch_id: EpochId,
1363        chain: Chain,
1364    ) -> Result<ProtocolConfig, ReplayEngineError> {
1365        match self.protocol_version {
1366            Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1367            Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1368            None => self
1369                .protocol_version_epoch_table
1370                .iter()
1371                .rev()
1372                .find(|(_, rg)| epoch_id >= rg.epoch_start)
1373                .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1374                .unwrap_or_else(|| {
1375                    Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1376                }),
1377        }
1378    }
1379
1380    pub async fn checkpoints_for_epoch(
1381        &self,
1382        epoch_id: u64,
1383    ) -> Result<(u64, u64), ReplayEngineError> {
1384        let epoch_change_events = self
1385            .fetcher
1386            .get_epoch_change_events(true)
1387            .await?
1388            .into_iter()
1389            .collect::<Vec<_>>();
1390        let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1391            (0, 1)
1392        } else {
1393            let idx = epoch_change_events
1394                .iter()
1395                .position(|ev| match extract_epoch_and_version(ev.clone()) {
1396                    Ok((epoch, _)) => epoch == epoch_id,
1397                    Err(_) => false,
1398                })
1399                .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1400            let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1401            (
1402                self.fetcher
1403                    .get_transaction(&epoch_change_tx)
1404                    .await?
1405                    .checkpoint
1406                    .unwrap_or_else(|| {
1407                        panic!(
1408                            "Checkpoint for transaction {} not present. Could be due to pruning",
1409                            epoch_change_tx
1410                        )
1411                    }),
1412                idx,
1413            )
1414        };
1415
1416        let next_epoch_change_tx = epoch_change_events
1417            .get(start_epoch_idx + 1)
1418            .map(|v| v.id.tx_digest)
1419            .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1420
1421        let next_epoch_checkpoint = self
1422            .fetcher
1423            .get_transaction(&next_epoch_change_tx)
1424            .await?
1425            .checkpoint
1426            .unwrap_or_else(|| {
1427                panic!(
1428                    "Checkpoint for transaction {} not present. Could be due to pruning",
1429                    next_epoch_change_tx
1430                )
1431            });
1432
1433        Ok((start_checkpoint, next_epoch_checkpoint - 1))
1434    }
1435
1436    pub async fn get_epoch_start_timestamp_and_rgp(
1437        &self,
1438        epoch_id: u64,
1439        tx_digest: &TransactionDigest,
1440    ) -> Result<(u64, u64), ReplayEngineError> {
1441        if epoch_id == 0 {
1442            return Err(ReplayEngineError::TransactionNotSupported {
1443                digest: *tx_digest,
1444                reason: "Transactions from epoch 0 not supported".to_string(),
1445            });
1446        }
1447        self.fetcher
1448            .get_epoch_start_timestamp_and_rgp(epoch_id)
1449            .await
1450    }
1451
1452    fn add_config_objects_if_needed(
1453        &self,
1454        status: &IotaExecutionStatus,
1455    ) -> Vec<(ObjectID, SequenceNumber)> {
1456        match parse_effect_error_for_denied_coins(status) {
1457            Some(coin_type) => {
1458                let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1459                    panic!(
1460                        "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1461                    );
1462                };
1463                // NB: the version of the deny list object doesn't matter
1464                if !config_id_and_version
1465                    .iter()
1466                    .any(|(id, _)| id == &IOTA_DENY_LIST_OBJECT_ID)
1467                {
1468                    let deny_list_oid_version = self.download_latest_object(&IOTA_DENY_LIST_OBJECT_ID)
1469                        .ok()
1470                        .flatten()
1471                        .expect("Unable to download the deny list object for a transaction that requires it")
1472                        .version();
1473                    config_id_and_version.push((IOTA_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1474                }
1475                config_id_and_version
1476            }
1477            None => vec![],
1478        }
1479    }
1480
1481    async fn resolve_tx_components(
1482        &self,
1483        tx_digest: &TransactionDigest,
1484    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1485        assert!(self.is_remote_replay());
1486        // Fetch full transaction content
1487        let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1488        let sender = match tx_info.clone().transaction.unwrap().data {
1489            iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.sender,
1490        };
1491        let IotaTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1492
1493        let config_objects = self.add_config_objects_if_needed(effects.status());
1494
1495        let raw_tx_bytes = tx_info.clone().raw_transaction;
1496        let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1497        let input_objs = orig_tx
1498            .transaction_data()
1499            .input_objects()
1500            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1501        let tx_kind_orig = orig_tx.transaction_data().kind();
1502
1503        // Download the objects at the version right before the execution of this TX
1504        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1505
1506        let shared_object_refs: Vec<ObjectRef> = effects
1507            .shared_objects()
1508            .iter()
1509            .map(|so_ref| {
1510                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1511                    unimplemented!(
1512                        "Replay of deleted shared object transactions is not supported yet"
1513                    );
1514                } else {
1515                    so_ref.to_object_ref()
1516                }
1517            })
1518            .collect();
1519        let gas_data = match tx_info.clone().transaction.unwrap().data {
1520            iota_json_rpc_types::IotaTransactionBlockData::V1(tx) => tx.gas_data,
1521        };
1522        let gas_object_refs: Vec<_> = gas_data
1523            .payment
1524            .iter()
1525            .map(|obj_ref| obj_ref.to_object_ref())
1526            .collect();
1527        let receiving_objs = orig_tx
1528            .transaction_data()
1529            .receiving_objects()
1530            .into_iter()
1531            .map(|(obj_id, version, _)| (obj_id, version))
1532            .collect();
1533
1534        let epoch_id = effects.executed_epoch;
1535        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1536
1537        // Extract the epoch start timestamp
1538        let (epoch_start_timestamp, reference_gas_price) = self
1539            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1540            .await?;
1541
1542        Ok(OnChainTransactionInfo {
1543            kind: tx_kind_orig.clone(),
1544            sender,
1545            modified_at_versions,
1546            input_objects: input_objs,
1547            shared_object_refs,
1548            gas: gas_object_refs,
1549            gas_budget: gas_data.budget,
1550            gas_price: gas_data.price,
1551            executed_epoch: epoch_id,
1552            dependencies: effects.dependencies().to_vec(),
1553            effects: IotaTransactionBlockEffects::V1(effects),
1554            receiving_objs,
1555            config_objects,
1556            // Find the protocol version for this epoch
1557            // This assumes we already initialized the protocol version table
1558            // `protocol_version_epoch_table`
1559            protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1560            tx_digest: *tx_digest,
1561            epoch_start_timestamp,
1562            sender_signed_data: orig_tx.clone(),
1563            reference_gas_price,
1564            chain,
1565        })
1566    }
1567
1568    async fn resolve_tx_components_from_dump(
1569        &self,
1570        tx_digest: &TransactionDigest,
1571    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1572        assert!(!self.is_remote_replay());
1573
1574        let dp = self.fetcher.as_node_state_dump();
1575
1576        let sender = dp
1577            .node_state_dump
1578            .sender_signed_data
1579            .transaction_data()
1580            .sender();
1581        let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1582        let effects = dp.node_state_dump.computed_effects.clone();
1583        let effects = IotaTransactionBlockEffects::try_from(effects).unwrap();
1584        // Config objects don't show up in the node state dump so they need to be
1585        // provided.
1586        let config_objects = self.add_config_objects_if_needed(effects.status());
1587
1588        // Fetch full transaction content
1589        // let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1590
1591        let input_objs = orig_tx
1592            .transaction_data()
1593            .input_objects()
1594            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1595        let tx_kind_orig = orig_tx.transaction_data().kind();
1596
1597        // Download the objects at the version right before the execution of this TX
1598        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1599
1600        let shared_object_refs: Vec<ObjectRef> = effects
1601            .shared_objects()
1602            .iter()
1603            .map(|so_ref| {
1604                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1605                    unimplemented!(
1606                        "Replay of deleted shared object transactions is not supported yet"
1607                    );
1608                } else {
1609                    so_ref.to_object_ref()
1610                }
1611            })
1612            .collect();
1613        let gas_data = orig_tx.transaction_data().gas_data();
1614        let gas_object_refs: Vec<_> = gas_data.clone().payment;
1615        let receiving_objs = orig_tx
1616            .transaction_data()
1617            .receiving_objects()
1618            .into_iter()
1619            .map(|(obj_id, version, _)| (obj_id, version))
1620            .collect();
1621
1622        let epoch_id = dp.node_state_dump.executed_epoch;
1623
1624        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1625
1626        let protocol_config =
1627            ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1628        // Extract the epoch start timestamp
1629        let (epoch_start_timestamp, reference_gas_price) = self
1630            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1631            .await?;
1632
1633        Ok(OnChainTransactionInfo {
1634            kind: tx_kind_orig.clone(),
1635            sender,
1636            modified_at_versions,
1637            input_objects: input_objs,
1638            shared_object_refs,
1639            gas: gas_object_refs,
1640            gas_budget: gas_data.budget,
1641            gas_price: gas_data.price,
1642            executed_epoch: epoch_id,
1643            dependencies: effects.dependencies().to_vec(),
1644            effects,
1645            receiving_objs,
1646            config_objects,
1647            protocol_version: protocol_config.version,
1648            tx_digest: *tx_digest,
1649            epoch_start_timestamp,
1650            sender_signed_data: orig_tx.clone(),
1651            reference_gas_price,
1652            chain,
1653        })
1654    }
1655
1656    async fn resolve_download_input_objects(
1657        &mut self,
1658        tx_info: &OnChainTransactionInfo,
1659        deleted_shared_objects: Vec<ObjectRef>,
1660    ) -> Result<InputObjects, ReplayEngineError> {
1661        // Download the input objects
1662        let mut package_inputs = vec![];
1663        let mut imm_owned_inputs = vec![];
1664        let mut shared_inputs = vec![];
1665        let mut deleted_shared_info_map = BTreeMap::new();
1666
1667        // for deleted shared objects, we need to look at the transaction dependencies
1668        // to find the correct transaction dependency for a deleted shared
1669        // object.
1670        if !deleted_shared_objects.is_empty() {
1671            for tx_digest in tx_info.dependencies.iter() {
1672                let tx_info = self.resolve_tx_components(tx_digest).await?;
1673                for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1674                    deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1675                }
1676            }
1677        }
1678
1679        tx_info
1680            .input_objects
1681            .iter()
1682            .map(|kind| match kind {
1683                InputObjectKind::MovePackage(i) => {
1684                    package_inputs.push(*i);
1685                    Ok(())
1686                }
1687                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1688                    imm_owned_inputs.push((o_ref.0, o_ref.1));
1689                    Ok(())
1690                }
1691                InputObjectKind::SharedMoveObject {
1692                    id,
1693                    initial_shared_version: _,
1694                    mutable: _,
1695                } if !deleted_shared_info_map.contains_key(id) => {
1696                    // We already downloaded
1697                    if let Some(o) = self
1698                        .storage
1699                        .live_objects_store
1700                        .lock()
1701                        .expect("Can't lock")
1702                        .get(id)
1703                    {
1704                        shared_inputs.push(o.clone());
1705                        Ok(())
1706                    } else {
1707                        Err(ReplayEngineError::InternalCacheInvariantViolation {
1708                            id: *id,
1709                            version: None,
1710                        })
1711                    }
1712                }
1713                _ => Ok(()),
1714            })
1715            .collect::<Result<Vec<_>, _>>()?;
1716
1717        // Download the imm and owned objects
1718        let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1719
1720        // For packages, download latest if non framework
1721        // If framework, download relevant for the current protocol version
1722        in_objs.extend(
1723            self.multi_download_relevant_packages_and_store(
1724                package_inputs,
1725                tx_info.protocol_version.as_u64(),
1726            )
1727            .await?,
1728        );
1729        // Add shared objects
1730        in_objs.extend(shared_inputs);
1731
1732        // TODO(Zhe): Account for cancelled transaction assigned version here, and
1733        // tests.
1734        let resolved_input_objs = tx_info
1735            .input_objects
1736            .iter()
1737            .flat_map(|kind| match kind {
1738                InputObjectKind::MovePackage(i) => {
1739                    // Okay to unwrap since we downloaded it
1740                    Some(ObjectReadResult::new(
1741                        *kind,
1742                        self.storage
1743                            .package_cache
1744                            .lock()
1745                            .expect("Cannot lock")
1746                            .get(i)
1747                            .unwrap_or(
1748                                &self
1749                                    .download_latest_object(i)
1750                                    .expect("Object download failed")
1751                                    .expect("Object not found on chain"),
1752                            )
1753                            .clone()
1754                            .into(),
1755                    ))
1756                }
1757                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1758                    *kind,
1759                    self.storage
1760                        .object_version_cache
1761                        .lock()
1762                        .expect("Cannot lock")
1763                        .get(&(o_ref.0, o_ref.1))
1764                        .unwrap()
1765                        .clone()
1766                        .into(),
1767                )),
1768                InputObjectKind::SharedMoveObject { id, .. }
1769                    if !deleted_shared_info_map.contains_key(id) =>
1770                {
1771                    // we already downloaded
1772                    Some(ObjectReadResult::new(
1773                        *kind,
1774                        self.storage
1775                            .live_objects_store
1776                            .lock()
1777                            .expect("Can't lock")
1778                            .get(id)
1779                            .unwrap()
1780                            .clone()
1781                            .into(),
1782                    ))
1783                }
1784                InputObjectKind::SharedMoveObject { id, .. } => {
1785                    let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1786                    Some(ObjectReadResult::new(
1787                        *kind,
1788                        ObjectReadResultKind::DeletedSharedObject(*version, *digest),
1789                    ))
1790                }
1791            })
1792            .collect();
1793
1794        Ok(InputObjects::new(resolved_input_objs))
1795    }
1796
1797    /// Given the OnChainTransactionInfo, download and store the input objects,
1798    /// and other info necessary for execution
1799    async fn initialize_execution_env_state(
1800        &mut self,
1801        tx_info: &OnChainTransactionInfo,
1802    ) -> Result<InputObjects, ReplayEngineError> {
1803        // We need this for other activities in this session
1804        self.current_protocol_version = tx_info.protocol_version.as_u64();
1805
1806        // Download the objects at the version right before the execution of this TX
1807        self.multi_download_and_store(&tx_info.modified_at_versions)
1808            .await?;
1809
1810        let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1811            .shared_object_refs
1812            .iter()
1813            .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1814
1815        // Download shared objects at the version right before the execution of this TX
1816        let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1817        self.multi_download_and_store(&shared_refs).await?;
1818
1819        // Download gas (although this should already be in cache from modified at
1820        // versions?)
1821        let gas_refs: Vec<_> = tx_info.gas.iter().map(|w| (w.0, w.1)).collect();
1822        self.multi_download_and_store(&gas_refs).await?;
1823
1824        // Fetch the input objects we know from the raw transaction
1825        let input_objs = self
1826            .resolve_download_input_objects(tx_info, deleted_shared_refs)
1827            .await?;
1828
1829        // Fetch the receiving objects
1830        self.multi_download_and_store(&tx_info.receiving_objs)
1831            .await?;
1832
1833        // Fetch specified config objects if any
1834        self.multi_download_and_store(&tx_info.config_objects)
1835            .await?;
1836
1837        // Prep the object runtime for dynamic fields
1838        // Download the child objects accessed at the version right before the execution
1839        // of this TX
1840        let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1841        self.multi_download_and_store(&loaded_child_refs).await?;
1842        tokio::task::yield_now().await;
1843
1844        Ok(input_objs)
1845    }
1846}
1847
1848// <---------------------  Implement necessary traits for LocalExec to work with
1849// exec engine ----------------------->
1850
1851impl BackingPackageStore for LocalExec {
1852    /// In this case we might need to download a dependency package which was
1853    /// not present in the modified at versions list because packages are
1854    /// immutable
1855    fn get_package_object(&self, package_id: &ObjectID) -> IotaResult<Option<PackageObject>> {
1856        fn inner(self_: &LocalExec, package_id: &ObjectID) -> IotaResult<Option<Object>> {
1857            // If package not present fetch it from the network
1858            self_
1859                .get_or_download_object(package_id, true /* we expect a Move package */)
1860                .map_err(|e| IotaError::Storage(e.to_string()))
1861        }
1862
1863        let res = inner(self, package_id);
1864        self.exec_store_events
1865            .lock()
1866            .expect("Unable to lock events list")
1867            .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1868                package_id: *package_id,
1869                result: res.clone(),
1870            });
1871        res.map(|o| o.map(PackageObject::new))
1872    }
1873}
1874
1875impl ChildObjectResolver for LocalExec {
1876    /// This uses `get_object`, which does not download from the network
1877    /// Hence all objects must be in store already
1878    fn read_child_object(
1879        &self,
1880        parent: &ObjectID,
1881        child: &ObjectID,
1882        child_version_upper_bound: SequenceNumber,
1883    ) -> IotaResult<Option<Object>> {
1884        fn inner(
1885            self_: &LocalExec,
1886            parent: &ObjectID,
1887            child: &ObjectID,
1888            child_version_upper_bound: SequenceNumber,
1889        ) -> IotaResult<Option<Object>> {
1890            let child_object =
1891                match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1892                    None => return Ok(None),
1893                    Some(o) => o,
1894                };
1895            let child_version = child_object.version();
1896            if child_object.version() > child_version_upper_bound {
1897                return Err(IotaError::Unknown(format!(
1898                    "Invariant Violation. Replay loaded child_object {child} at version \
1899                    {child_version} but expected the version to be <= {child_version_upper_bound}"
1900                )));
1901            }
1902            let parent = *parent;
1903            if child_object.owner != Owner::ObjectOwner(parent.into()) {
1904                return Err(IotaError::InvalidChildObjectAccess {
1905                    object: *child,
1906                    given_parent: parent,
1907                    actual_owner: child_object.owner,
1908                });
1909            }
1910            Ok(Some(child_object))
1911        }
1912
1913        let res = inner(self, parent, child, child_version_upper_bound);
1914        self.exec_store_events
1915            .lock()
1916            .expect("Unable to lock events list")
1917            .push(
1918                ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1919                    parent: *parent,
1920                    child: *child,
1921                    result: res.clone(),
1922                },
1923            );
1924        res
1925    }
1926
1927    fn get_object_received_at_version(
1928        &self,
1929        owner: &ObjectID,
1930        receiving_object_id: &ObjectID,
1931        receive_object_at_version: SequenceNumber,
1932        _epoch_id: EpochId,
1933    ) -> IotaResult<Option<Object>> {
1934        fn inner(
1935            self_: &LocalExec,
1936            owner: &ObjectID,
1937            receiving_object_id: &ObjectID,
1938            receive_object_at_version: SequenceNumber,
1939        ) -> IotaResult<Option<Object>> {
1940            let recv_object = match self_.get_object(receiving_object_id)? {
1941                None => return Ok(None),
1942                Some(o) => o,
1943            };
1944            if recv_object.version() != receive_object_at_version {
1945                return Err(IotaError::Unknown(format!(
1946                    "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
1947                    {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
1948                )));
1949            }
1950            if recv_object.owner != Owner::AddressOwner((*owner).into()) {
1951                return Ok(None);
1952            }
1953            Ok(Some(recv_object))
1954        }
1955
1956        let res = inner(self, owner, receiving_object_id, receive_object_at_version);
1957        self.exec_store_events
1958            .lock()
1959            .expect("Unable to lock events list")
1960            .push(ExecutionStoreEvent::ReceiveObject {
1961                owner: *owner,
1962                receive: *receiving_object_id,
1963                receive_at_version: receive_object_at_version,
1964                result: res.clone(),
1965            });
1966        res
1967    }
1968}
1969
1970impl ResourceResolver for LocalExec {
1971    type Error = IotaError;
1972
1973    /// In this case we might need to download a Move object on the fly which
1974    /// was not present in the modified at versions list because packages
1975    /// are immutable
1976    fn get_resource(
1977        &self,
1978        address: &AccountAddress,
1979        typ: &StructTag,
1980    ) -> IotaResult<Option<Vec<u8>>> {
1981        fn inner(
1982            self_: &LocalExec,
1983            address: &AccountAddress,
1984            typ: &StructTag,
1985        ) -> IotaResult<Option<Vec<u8>>> {
1986            // If package not present fetch it from the network or some remote location
1987            let Some(object) = self_.get_or_download_object(
1988                &ObjectID::from(*address),
1989                false, // we expect a Move obj
1990            )?
1991            else {
1992                return Ok(None);
1993            };
1994
1995            match &object.data {
1996                Data::Move(m) => {
1997                    assert!(
1998                        m.is_type(typ),
1999                        "Invariant violation: ill-typed object in storage \
2000                        or bad object request from caller"
2001                    );
2002                    Ok(Some(m.contents().to_vec()))
2003                }
2004                other => unimplemented!(
2005                    "Bad object lookup: expected Move object, but got {:#?}",
2006                    other
2007                ),
2008            }
2009        }
2010
2011        let res = inner(self, address, typ);
2012        self.exec_store_events
2013            .lock()
2014            .expect("Unable to lock events list")
2015            .push(ExecutionStoreEvent::ResourceResolverGetResource {
2016                address: *address,
2017                typ: typ.clone(),
2018                result: res.clone(),
2019            });
2020        res
2021    }
2022}
2023
2024impl ModuleResolver for LocalExec {
2025    type Error = IotaError;
2026
2027    /// This fetches a module which must already be present in the store
2028    /// We do not download
2029    fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2030        fn inner(self_: &LocalExec, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2031            get_module(self_, module_id)
2032        }
2033
2034        let res = inner(self, module_id);
2035        self.exec_store_events
2036            .lock()
2037            .expect("Unable to lock events list")
2038            .push(ExecutionStoreEvent::ModuleResolverGetModule {
2039                module_id: module_id.clone(),
2040                result: res.clone(),
2041            });
2042        res
2043    }
2044}
2045
2046impl ModuleResolver for &mut LocalExec {
2047    type Error = IotaError;
2048
2049    fn get_module(&self, module_id: &ModuleId) -> IotaResult<Option<Vec<u8>>> {
2050        // Recording event here will be double-counting since its already recorded in
2051        // the get_module fn
2052        (**self).get_module(module_id)
2053    }
2054}
2055
2056impl ObjectStore for LocalExec {
2057    /// The object must be present in store by normal process we used to
2058    /// backfill store in init We dont download if not present
2059    fn get_object(
2060        &self,
2061        object_id: &ObjectID,
2062    ) -> iota_types::storage::error::Result<Option<Object>> {
2063        let res = self
2064            .storage
2065            .live_objects_store
2066            .lock()
2067            .expect("Can't lock")
2068            .get(object_id)
2069            .cloned();
2070        self.exec_store_events
2071            .lock()
2072            .expect("Unable to lock events list")
2073            .push(ExecutionStoreEvent::ObjectStoreGetObject {
2074                object_id: *object_id,
2075                result: Ok(res.clone()),
2076            });
2077        Ok(res)
2078    }
2079
2080    /// The object must be present in store by normal process we used to
2081    /// backfill store in init We dont download if not present
2082    fn get_object_by_key(
2083        &self,
2084        object_id: &ObjectID,
2085        version: VersionNumber,
2086    ) -> iota_types::storage::error::Result<Option<Object>> {
2087        let res = self
2088            .storage
2089            .live_objects_store
2090            .lock()
2091            .expect("Can't lock")
2092            .get(object_id)
2093            .and_then(|obj| {
2094                if obj.version() == version {
2095                    Some(obj.clone())
2096                } else {
2097                    None
2098                }
2099            });
2100
2101        self.exec_store_events
2102            .lock()
2103            .expect("Unable to lock events list")
2104            .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2105                object_id: *object_id,
2106                version,
2107                result: Ok(res.clone()),
2108            });
2109
2110        Ok(res)
2111    }
2112}
2113
2114impl ObjectStore for &mut LocalExec {
2115    fn get_object(
2116        &self,
2117        object_id: &ObjectID,
2118    ) -> iota_types::storage::error::Result<Option<Object>> {
2119        // Recording event here will be double-counting since its already recorded in
2120        // the get_module fn
2121        (**self).get_object(object_id)
2122    }
2123
2124    fn get_object_by_key(
2125        &self,
2126        object_id: &ObjectID,
2127        version: VersionNumber,
2128    ) -> iota_types::storage::error::Result<Option<Object>> {
2129        // Recording event here will be double-counting since its already recorded in
2130        // the get_module fn
2131        (**self).get_object_by_key(object_id, version)
2132    }
2133}
2134
2135impl GetModule for LocalExec {
2136    type Error = IotaError;
2137    type Item = CompiledModule;
2138
2139    fn get_module_by_id(&self, id: &ModuleId) -> IotaResult<Option<Self::Item>> {
2140        let res = get_module_by_id(self, id);
2141
2142        self.exec_store_events
2143            .lock()
2144            .expect("Unable to lock events list")
2145            .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2146                id: id.clone(),
2147                result: res.clone(),
2148            });
2149        res
2150    }
2151}
2152
2153// <--------------------- Util functions ----------------------->
2154
2155pub fn get_executor(
2156    executor_version_override: Option<i64>,
2157    protocol_config: &ProtocolConfig,
2158    _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2159    enable_profiler: Option<PathBuf>,
2160) -> Arc<dyn Executor + Send + Sync> {
2161    let protocol_config = executor_version_override
2162        .map(|q| {
2163            let ver = if q < 0 {
2164                ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2165            } else {
2166                q as u64
2167            };
2168
2169            let mut c = protocol_config.clone();
2170            c.set_execution_version_for_testing(ver);
2171            c
2172        })
2173        .unwrap_or(protocol_config.clone());
2174
2175    let silent = true;
2176    iota_execution::executor(&protocol_config, silent, enable_profiler)
2177        .expect("Creating an executor should not fail here")
2178}
2179
2180fn parse_effect_error_for_denied_coins(status: &IotaExecutionStatus) -> Option<String> {
2181    let IotaExecutionStatus::Failure { error } = status else {
2182        return None;
2183    };
2184    parse_denied_error_string(error)
2185}
2186
2187fn parse_denied_error_string(error: &str) -> Option<String> {
2188    let regulated_regex = regex::Regex::new(
2189        r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2190    )
2191    .unwrap();
2192
2193    let caps = regulated_regex.captures(error)?;
2194    Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2195}
2196
2197#[cfg(test)]
2198mod tests {
2199    use super::parse_denied_error_string;
2200    #[test]
2201    fn test_regex_regulated_coin_errors() {
2202        let test_bank = vec![
2203            "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2204            "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2205        ];
2206        let expected_string =
2207            "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2208
2209        for test in &test_bank {
2210            assert!(parse_denied_error_string(test).unwrap() == expected_string);
2211        }
2212    }
2213}