iota_replay/
data_fetcher.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, num::NonZeroUsize, str::FromStr};
6
7use async_trait::async_trait;
8use futures::future::join_all;
9use iota_core::authority::NodeStateDump;
10use iota_json_rpc_api::QUERY_MAX_RESULT_LIMIT;
11use iota_json_rpc_types::{
12    EventFilter, IotaEvent, IotaGetPastObjectRequest, IotaObjectData, IotaObjectDataOptions,
13    IotaObjectResponse, IotaPastObjectResponse, IotaTransactionBlockResponse,
14    IotaTransactionBlockResponseOptions,
15};
16use iota_sdk::IotaClient;
17use iota_types::{
18    base_types::{ObjectID, SequenceNumber, VersionNumber},
19    digests::TransactionDigest,
20    object::Object,
21    transaction::{
22        EndOfEpochTransactionKind, SenderSignedData, TransactionDataAPI, TransactionKind,
23    },
24};
25use lru::LruCache;
26use move_core_types::language_storage::StructTag;
27use parking_lot::RwLock;
28use rand::Rng;
29
30use crate::types::{EPOCH_CHANGE_STRUCT_TAG, ReplayEngineError};
31
32/// This trait defines the interfaces for fetching data from some local or
33/// remote store
34#[async_trait]
35pub(crate) trait DataFetcher {
36    #![allow(implied_bounds_entailment)]
37    /// Fetch the specified versions of objects
38    async fn multi_get_versioned(
39        &self,
40        objects: &[(ObjectID, SequenceNumber)],
41    ) -> Result<Vec<Object>, ReplayEngineError>;
42
43    /// Fetch the latest versions of objects
44    async fn multi_get_latest(
45        &self,
46        objects: &[ObjectID],
47    ) -> Result<Vec<Object>, ReplayEngineError>;
48
49    /// Fetch the TXs for this checkpoint
50    async fn get_checkpoint_txs(
51        &self,
52        id: u64,
53    ) -> Result<Vec<TransactionDigest>, ReplayEngineError>;
54
55    /// Fetch the transaction info for a given transaction digest
56    async fn get_transaction(
57        &self,
58        tx_digest: &TransactionDigest,
59    ) -> Result<IotaTransactionBlockResponse, ReplayEngineError>;
60
61    async fn get_loaded_child_objects(
62        &self,
63        tx_digest: &TransactionDigest,
64    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError>;
65
66    async fn get_latest_checkpoint_sequence_number(&self) -> Result<u64, ReplayEngineError>;
67
68    async fn fetch_random_transaction(
69        &self,
70        // TODO: add more params
71        checkpoint_id_start: Option<u64>,
72        checkpoint_id_end: Option<u64>,
73    ) -> Result<TransactionDigest, ReplayEngineError>;
74
75    async fn get_epoch_start_timestamp_and_rgp(
76        &self,
77        epoch_id: u64,
78    ) -> Result<(u64, u64), ReplayEngineError>;
79
80    async fn get_epoch_change_events(
81        &self,
82        reverse: bool,
83    ) -> Result<Vec<IotaEvent>, ReplayEngineError>;
84
85    async fn get_chain_id(&self) -> Result<String, ReplayEngineError>;
86
87    async fn get_child_object(
88        &self,
89        object_id: &ObjectID,
90        version_upper_bound: VersionNumber,
91    ) -> Result<Object, ReplayEngineError>;
92}
93
94#[derive(Clone)]
95pub enum Fetchers {
96    Remote(RemoteFetcher),
97    NodeStateDump(NodeStateDumpFetcher),
98}
99
100impl Fetchers {
101    pub fn as_remote(&self) -> &RemoteFetcher {
102        match self {
103            Fetchers::Remote(q) => q,
104            Fetchers::NodeStateDump(_) => panic!("not a remote fetcher"),
105        }
106    }
107
108    pub fn into_remote(self) -> RemoteFetcher {
109        match self {
110            Fetchers::Remote(q) => {
111                // Since `into_remote` is called when we use this fetcher to create a new
112                // fetcher, we should clear the cache to avoid using stale data.
113                q.clear_cache_for_new_task();
114                q
115            }
116            Fetchers::NodeStateDump(_) => panic!("not a remote fetcher"),
117        }
118    }
119
120    pub fn as_node_state_dump(&self) -> &NodeStateDumpFetcher {
121        match self {
122            Fetchers::Remote(_) => panic!("not a node state dump fetcher"),
123            Fetchers::NodeStateDump(q) => q,
124        }
125    }
126}
127
128#[async_trait]
129impl DataFetcher for Fetchers {
130    #![allow(implied_bounds_entailment)]
131    async fn multi_get_versioned(
132        &self,
133        objects: &[(ObjectID, SequenceNumber)],
134    ) -> Result<Vec<Object>, ReplayEngineError> {
135        match self {
136            Fetchers::Remote(q) => q.multi_get_versioned(objects).await,
137            Fetchers::NodeStateDump(q) => q.multi_get_versioned(objects).await,
138        }
139    }
140
141    async fn multi_get_latest(
142        &self,
143        objects: &[ObjectID],
144    ) -> Result<Vec<Object>, ReplayEngineError> {
145        match self {
146            Fetchers::Remote(q) => q.multi_get_latest(objects).await,
147            Fetchers::NodeStateDump(q) => q.multi_get_latest(objects).await,
148        }
149    }
150
151    async fn get_checkpoint_txs(
152        &self,
153        id: u64,
154    ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
155        match self {
156            Fetchers::Remote(q) => q.get_checkpoint_txs(id).await,
157            Fetchers::NodeStateDump(q) => q.get_checkpoint_txs(id).await,
158        }
159    }
160
161    async fn get_transaction(
162        &self,
163        tx_digest: &TransactionDigest,
164    ) -> Result<IotaTransactionBlockResponse, ReplayEngineError> {
165        match self {
166            Fetchers::Remote(q) => q.get_transaction(tx_digest).await,
167            Fetchers::NodeStateDump(q) => q.get_transaction(tx_digest).await,
168        }
169    }
170
171    async fn get_loaded_child_objects(
172        &self,
173        tx_digest: &TransactionDigest,
174    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
175        match self {
176            Fetchers::Remote(q) => q.get_loaded_child_objects(tx_digest).await,
177            Fetchers::NodeStateDump(q) => q.get_loaded_child_objects(tx_digest).await,
178        }
179    }
180
181    async fn get_latest_checkpoint_sequence_number(&self) -> Result<u64, ReplayEngineError> {
182        match self {
183            Fetchers::Remote(q) => q.get_latest_checkpoint_sequence_number().await,
184            Fetchers::NodeStateDump(q) => q.get_latest_checkpoint_sequence_number().await,
185        }
186    }
187
188    async fn fetch_random_transaction(
189        &self,
190        checkpoint_id_start: Option<u64>,
191        checkpoint_id_end: Option<u64>,
192    ) -> Result<TransactionDigest, ReplayEngineError> {
193        match self {
194            Fetchers::Remote(q) => {
195                q.fetch_random_transaction(checkpoint_id_start, checkpoint_id_end)
196                    .await
197            }
198            Fetchers::NodeStateDump(q) => {
199                q.fetch_random_transaction(checkpoint_id_start, checkpoint_id_end)
200                    .await
201            }
202        }
203    }
204
205    async fn get_epoch_start_timestamp_and_rgp(
206        &self,
207        epoch_id: u64,
208    ) -> Result<(u64, u64), ReplayEngineError> {
209        match self {
210            Fetchers::Remote(q) => q.get_epoch_start_timestamp_and_rgp(epoch_id).await,
211            Fetchers::NodeStateDump(q) => q.get_epoch_start_timestamp_and_rgp(epoch_id).await,
212        }
213    }
214
215    async fn get_epoch_change_events(
216        &self,
217        reverse: bool,
218    ) -> Result<Vec<IotaEvent>, ReplayEngineError> {
219        match self {
220            Fetchers::Remote(q) => q.get_epoch_change_events(reverse).await,
221            Fetchers::NodeStateDump(q) => q.get_epoch_change_events(reverse).await,
222        }
223    }
224    async fn get_chain_id(&self) -> Result<String, ReplayEngineError> {
225        match self {
226            Fetchers::Remote(q) => q.get_chain_id().await,
227            Fetchers::NodeStateDump(q) => q.get_chain_id().await,
228        }
229    }
230    async fn get_child_object(
231        &self,
232        object_id: &ObjectID,
233        version_upper_bound: VersionNumber,
234    ) -> Result<Object, ReplayEngineError> {
235        match self {
236            Fetchers::Remote(q) => q.get_child_object(object_id, version_upper_bound).await,
237            Fetchers::NodeStateDump(q) => q.get_child_object(object_id, version_upper_bound).await,
238        }
239    }
240}
241
242const VERSIONED_OBJECT_CACHE_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1_000);
243const LATEST_OBJECT_CACHE_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1_000);
244const EPOCH_INFO_CACHE_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(10_000);
245
246pub struct RemoteFetcher {
247    /// This is used to download items not in store
248    pub rpc_client: IotaClient,
249    /// Cache versioned objects
250    pub versioned_object_cache: RwLock<LruCache<(ObjectID, VersionNumber), Object>>,
251    /// Cache non-versioned objects
252    pub latest_object_cache: RwLock<LruCache<ObjectID, Object>>,
253    /// Cache epoch info
254    pub epoch_info_cache: RwLock<LruCache<u64, (u64, u64)>>,
255}
256
257impl Clone for RemoteFetcher {
258    fn clone(&self) -> Self {
259        let mut latest =
260            LruCache::new(LATEST_OBJECT_CACHE_CAPACITY.expect("Cache size must be non zero"));
261        self.latest_object_cache.read().iter().for_each(|(k, v)| {
262            latest.put(*k, v.clone());
263        });
264
265        let mut versioned =
266            LruCache::new(VERSIONED_OBJECT_CACHE_CAPACITY.expect("Cache size must be non zero"));
267        self.versioned_object_cache
268            .read()
269            .iter()
270            .for_each(|(k, v)| {
271                versioned.put(*k, v.clone());
272            });
273
274        let mut ep = LruCache::new(EPOCH_INFO_CACHE_CAPACITY.expect("Cache size must be non zero"));
275        self.epoch_info_cache.read().iter().for_each(|(k, v)| {
276            ep.put(*k, *v);
277        });
278
279        Self {
280            rpc_client: self.rpc_client.clone(),
281            versioned_object_cache: RwLock::new(versioned),
282            latest_object_cache: RwLock::new(latest),
283            epoch_info_cache: RwLock::new(ep),
284        }
285    }
286}
287
288impl RemoteFetcher {
289    pub fn new(rpc_client: IotaClient) -> Self {
290        Self {
291            rpc_client,
292            versioned_object_cache: RwLock::new(LruCache::new(
293                VERSIONED_OBJECT_CACHE_CAPACITY.expect("Cache size must be non zero"),
294            )),
295            latest_object_cache: RwLock::new(LruCache::new(
296                LATEST_OBJECT_CACHE_CAPACITY.expect("Cache size must be non zero"),
297            )),
298            epoch_info_cache: RwLock::new(LruCache::new(
299                EPOCH_INFO_CACHE_CAPACITY.expect("Cache size must be non zero"),
300            )),
301        }
302    }
303
304    pub fn check_versioned_cache(
305        &self,
306        objects: &[(ObjectID, VersionNumber)],
307    ) -> (Vec<Object>, Vec<(ObjectID, VersionNumber)>) {
308        let mut to_fetch = Vec::new();
309        let mut cached = Vec::new();
310        for (object_id, version) in objects {
311            if let Some(obj) = self
312                .versioned_object_cache
313                .read()
314                .peek(&(*object_id, *version))
315            {
316                cached.push(obj.clone());
317            } else {
318                to_fetch.push((*object_id, *version));
319            }
320        }
321
322        (cached, to_fetch)
323    }
324
325    pub fn check_latest_cache(&self, objects: &[ObjectID]) -> (Vec<Object>, Vec<ObjectID>) {
326        let mut to_fetch = Vec::new();
327        let mut cached = Vec::new();
328        for object_id in objects {
329            if let Some(obj) = self.latest_object_cache.read().peek(object_id) {
330                cached.push(obj.clone());
331            } else {
332                to_fetch.push(*object_id);
333            }
334        }
335
336        (cached, to_fetch)
337    }
338
339    pub fn clear_cache_for_new_task(&self) {
340        // Only the latest object cache cannot be reused across tasks.
341        // All other caches should be valid as long as the network doesn't change.
342        self.latest_object_cache.write().clear();
343    }
344}
345
346#[async_trait]
347impl DataFetcher for RemoteFetcher {
348    #![allow(implied_bounds_entailment)]
349    async fn multi_get_versioned(
350        &self,
351        objects: &[(ObjectID, VersionNumber)],
352    ) -> Result<Vec<Object>, ReplayEngineError> {
353        // First check which we have in cache
354        let (cached, to_fetch) = self.check_versioned_cache(objects);
355
356        let options = IotaObjectDataOptions::bcs_lossless();
357
358        let objs: Vec<_> = to_fetch
359            .iter()
360            .map(|(object_id, version)| IotaGetPastObjectRequest {
361                object_id: *object_id,
362                version: *version,
363            })
364            .collect();
365
366        let objectsx = objs.chunks(*QUERY_MAX_RESULT_LIMIT).map(|q| {
367            self.rpc_client
368                .read_api()
369                .try_multi_get_parsed_past_object(q.to_vec(), options.clone())
370        });
371
372        join_all(objectsx)
373            .await
374            .into_iter()
375            .collect::<Result<Vec<Vec<_>>, _>>()
376            .map_err(ReplayEngineError::from)?
377            .iter()
378            .flatten()
379            .map(|q| convert_past_obj_response(q.clone()))
380            .collect::<Result<Vec<_>, _>>()
381            .map(|mut x| {
382                // Add the cached objects to the result
383                x.extend(cached);
384                // Backfill the cache
385                for obj in &x {
386                    let r = obj.compute_object_reference();
387                    self.versioned_object_cache
388                        .write()
389                        .put((r.0, r.1), obj.clone());
390                }
391                x
392            })
393    }
394
395    async fn get_child_object(
396        &self,
397        object_id: &ObjectID,
398        version_upper_bound: VersionNumber,
399    ) -> Result<Object, ReplayEngineError> {
400        let response = self
401            .rpc_client
402            .read_api()
403            .try_get_object_before_version(*object_id, version_upper_bound)
404            .await
405            .map_err(|q| ReplayEngineError::IotaRpcError { err: q.to_string() })?;
406        convert_past_obj_response(response)
407    }
408
409    async fn multi_get_latest(
410        &self,
411        objects: &[ObjectID],
412    ) -> Result<Vec<Object>, ReplayEngineError> {
413        // First check which we have in cache
414        let (cached, to_fetch) = self.check_latest_cache(objects);
415
416        let options = IotaObjectDataOptions::bcs_lossless();
417
418        let objectsx = to_fetch.chunks(*QUERY_MAX_RESULT_LIMIT).map(|q| {
419            self.rpc_client
420                .read_api()
421                .multi_get_object_with_options(q.to_vec(), options.clone())
422        });
423
424        join_all(objectsx)
425            .await
426            .into_iter()
427            .collect::<Result<Vec<Vec<_>>, _>>()
428            .map_err(ReplayEngineError::from)?
429            .iter()
430            .flatten()
431            .map(obj_from_iota_obj_response)
432            .collect::<Result<Vec<_>, _>>()
433            .map(|mut x| {
434                // Add the cached objects to the result
435                x.extend(cached);
436                // Backfill the cache
437                for obj in &x {
438                    self.latest_object_cache.write().put(obj.id(), obj.clone());
439                }
440                x
441            })
442    }
443
444    async fn get_checkpoint_txs(
445        &self,
446        id: u64,
447    ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
448        Ok(self
449            .rpc_client
450            .read_api()
451            .get_checkpoint(id.into())
452            .await
453            .map_err(|q| ReplayEngineError::IotaRpcError { err: q.to_string() })?
454            .transactions)
455    }
456
457    async fn get_transaction(
458        &self,
459        tx_digest: &TransactionDigest,
460    ) -> Result<IotaTransactionBlockResponse, ReplayEngineError> {
461        let tx_fetch_opts = IotaTransactionBlockResponseOptions::full_content();
462
463        self.rpc_client
464            .read_api()
465            .get_transaction_with_options(*tx_digest, tx_fetch_opts)
466            .await
467            .map_err(ReplayEngineError::from)
468    }
469
470    async fn get_loaded_child_objects(
471        &self,
472        _: &TransactionDigest,
473    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
474        Ok(vec![])
475    }
476
477    async fn get_latest_checkpoint_sequence_number(&self) -> Result<u64, ReplayEngineError> {
478        self.rpc_client
479            .read_api()
480            .get_latest_checkpoint_sequence_number()
481            .await
482            .map_err(ReplayEngineError::from)
483    }
484
485    async fn fetch_random_transaction(
486        &self,
487        // TODO: add more params
488        checkpoint_id_start_inclusive: Option<u64>,
489        checkpoint_id_end_inclusive: Option<u64>,
490    ) -> Result<TransactionDigest, ReplayEngineError> {
491        let checkpoint_id_end = checkpoint_id_end_inclusive
492            .unwrap_or(self.get_latest_checkpoint_sequence_number().await?);
493        let checkpoint_id_start = checkpoint_id_start_inclusive.unwrap_or(1);
494        let checkpoint_id = rand::thread_rng().gen_range(checkpoint_id_start..=checkpoint_id_end);
495
496        let txs = self.get_checkpoint_txs(checkpoint_id).await?;
497        let tx_idx = rand::thread_rng().gen_range(0..txs.len());
498
499        Ok(txs[tx_idx])
500    }
501
502    async fn get_epoch_start_timestamp_and_rgp(
503        &self,
504        epoch_id: u64,
505    ) -> Result<(u64, u64), ReplayEngineError> {
506        // Check epoch info cache
507        if let Some((ts, rgp)) = self.epoch_info_cache.read().peek(&epoch_id) {
508            return Ok((*ts, *rgp));
509        }
510
511        let event = self
512            .get_epoch_change_events(true)
513            .await?
514            .into_iter()
515            .find(|ev| match extract_epoch_and_version(ev.clone()) {
516                Ok((epoch, _)) => epoch == epoch_id,
517                Err(_) => false,
518            })
519            .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
520
521        let reference_gas_price = if let serde_json::Value::Object(w) = event.parsed_json {
522            u64::from_str(&w["reference_gas_price"].to_string().replace('\"', "")).unwrap()
523        } else {
524            return Err(ReplayEngineError::UnexpectedEventFormat {
525                event: Box::new(event.clone()),
526            });
527        };
528
529        let epoch_change_tx = event.id.tx_digest;
530
531        // Fetch full transaction content
532        let tx_info = self.get_transaction(&epoch_change_tx).await?;
533
534        let orig_tx: SenderSignedData = bcs::from_bytes(&tx_info.raw_transaction).unwrap();
535        let tx_kind_orig = orig_tx.transaction_data().kind();
536
537        if let TransactionKind::EndOfEpochTransaction(kinds) = tx_kind_orig {
538            for kind in kinds {
539                if let EndOfEpochTransactionKind::ChangeEpoch(change) = kind {
540                    // Backfill cache
541                    self.epoch_info_cache.write().put(
542                        epoch_id,
543                        (change.epoch_start_timestamp_ms, reference_gas_price),
544                    );
545
546                    return Ok((change.epoch_start_timestamp_ms, reference_gas_price));
547                }
548            }
549        }
550        Err(ReplayEngineError::InvalidEpochChangeTx { epoch: epoch_id })
551    }
552
553    async fn get_epoch_change_events(
554        &self,
555        reverse: bool,
556    ) -> Result<Vec<IotaEvent>, ReplayEngineError> {
557        let struct_tag_str = EPOCH_CHANGE_STRUCT_TAG.to_string();
558        let struct_tag = StructTag::from_str(&struct_tag_str)?;
559
560        let mut epoch_change_events: Vec<IotaEvent> = vec![];
561        let mut has_next_page = true;
562        let mut cursor = None;
563
564        while has_next_page {
565            let page_data = self
566                .rpc_client
567                .event_api()
568                .query_events(
569                    EventFilter::MoveEventType(struct_tag.clone()),
570                    cursor,
571                    None,
572                    reverse,
573                )
574                .await
575                .map_err(|e| ReplayEngineError::UnableToQuerySystemEvents {
576                    rpc_err: e.to_string(),
577                })?;
578            epoch_change_events.extend(page_data.data);
579            has_next_page = page_data.has_next_page;
580            cursor = page_data.next_cursor;
581        }
582
583        Ok(epoch_change_events)
584    }
585
586    async fn get_chain_id(&self) -> Result<String, ReplayEngineError> {
587        let chain_id = self
588            .rpc_client
589            .read_api()
590            .get_chain_identifier()
591            .await
592            .map_err(|e| ReplayEngineError::UnableToGetChainId { err: e.to_string() })?;
593        Ok(chain_id)
594    }
595}
596
597fn convert_past_obj_response(resp: IotaPastObjectResponse) -> Result<Object, ReplayEngineError> {
598    match resp {
599        IotaPastObjectResponse::VersionFound(o) => obj_from_iota_obj_data(&o),
600        IotaPastObjectResponse::ObjectDeleted(r) => Err(ReplayEngineError::ObjectDeleted {
601            id: r.object_id,
602            version: r.version,
603            digest: r.digest,
604        }),
605        IotaPastObjectResponse::ObjectNotExists(id) => {
606            Err(ReplayEngineError::ObjectNotExist { id })
607        }
608        IotaPastObjectResponse::VersionNotFound(id, version) => {
609            Err(ReplayEngineError::ObjectVersionNotFound { id, version })
610        }
611        IotaPastObjectResponse::VersionTooHigh {
612            object_id,
613            asked_version,
614            latest_version,
615        } => Err(ReplayEngineError::ObjectVersionTooHigh {
616            id: object_id,
617            asked_version,
618            latest_version,
619        }),
620    }
621}
622
623fn obj_from_iota_obj_response(o: &IotaObjectResponse) -> Result<Object, ReplayEngineError> {
624    let o = o.object().map_err(ReplayEngineError::from)?.clone();
625    obj_from_iota_obj_data(&o)
626}
627
628fn obj_from_iota_obj_data(o: &IotaObjectData) -> Result<Object, ReplayEngineError> {
629    match TryInto::<Object>::try_into(o.clone()) {
630        Ok(obj) => Ok(obj),
631        Err(e) => Err(e.into()),
632    }
633}
634
635pub fn extract_epoch_and_version(ev: IotaEvent) -> Result<(u64, u64), ReplayEngineError> {
636    if let serde_json::Value::Object(w) = ev.parsed_json {
637        let epoch = u64::from_str(&w["epoch"].to_string().replace('\"', "")).unwrap();
638        let version = u64::from_str(&w["protocol_version"].to_string().replace('\"', "")).unwrap();
639        return Ok((epoch, version));
640    }
641
642    Err(ReplayEngineError::UnexpectedEventFormat {
643        event: Box::new(ev),
644    })
645}
646
647#[derive(Clone)]
648pub struct NodeStateDumpFetcher {
649    pub node_state_dump: NodeStateDump,
650    pub object_ref_pool: BTreeMap<(ObjectID, SequenceNumber), Object>,
651    pub latest_object_version_pool: BTreeMap<ObjectID, Object>,
652
653    // Used when we need to fetch data from remote such as
654    pub backup_remote_fetcher: Option<RemoteFetcher>,
655}
656
657impl From<NodeStateDump> for NodeStateDumpFetcher {
658    fn from(node_state_dump: NodeStateDump) -> Self {
659        let mut object_ref_pool = BTreeMap::new();
660        let mut latest_object_version_pool: BTreeMap<ObjectID, Object> = BTreeMap::new();
661
662        node_state_dump
663            .all_objects()
664            .iter()
665            .for_each(|current_obj| {
666                // Dense storage
667                object_ref_pool.insert(
668                    (current_obj.id, current_obj.version),
669                    current_obj.object.clone(),
670                );
671
672                // Only most recent
673                if let Some(last_seen_obj) = latest_object_version_pool.get(&current_obj.id) {
674                    if current_obj.version <= last_seen_obj.version() {
675                        return;
676                    }
677                };
678                latest_object_version_pool.insert(current_obj.id, current_obj.object.clone());
679            });
680        Self {
681            node_state_dump,
682            object_ref_pool,
683            latest_object_version_pool,
684            backup_remote_fetcher: None,
685        }
686    }
687}
688
689impl NodeStateDumpFetcher {
690    pub fn new(
691        node_state_dump: NodeStateDump,
692        backup_remote_fetcher: Option<RemoteFetcher>,
693    ) -> Self {
694        let mut s = Self::from(node_state_dump);
695        s.backup_remote_fetcher = backup_remote_fetcher;
696        s
697    }
698}
699
700#[async_trait]
701impl DataFetcher for NodeStateDumpFetcher {
702    async fn multi_get_versioned(
703        &self,
704        objects: &[(ObjectID, SequenceNumber)],
705    ) -> Result<Vec<Object>, ReplayEngineError> {
706        let mut resp = vec![];
707        match objects.iter().try_for_each(|(id, version)| {
708            if let Some(obj) = self.object_ref_pool.get(&(*id, *version)) {
709                resp.push(obj.clone());
710                return Ok(());
711            }
712            Err(ReplayEngineError::ObjectVersionNotFound {
713                id: *id,
714                version: *version,
715            })
716        }) {
717            Ok(_) => return Ok(resp),
718            Err(e) => {
719                if let Some(backup_remote_fetcher) = &self.backup_remote_fetcher {
720                    return backup_remote_fetcher.multi_get_versioned(objects).await;
721                }
722                return Err(e);
723            }
724        };
725    }
726
727    async fn multi_get_latest(
728        &self,
729        objects: &[ObjectID],
730    ) -> Result<Vec<Object>, ReplayEngineError> {
731        let mut resp = vec![];
732        match objects.iter().try_for_each(|id| {
733            if let Some(obj) = self.latest_object_version_pool.get(id) {
734                resp.push(obj.clone());
735                return Ok(());
736            }
737            Err(ReplayEngineError::ObjectNotExist { id: *id })
738        }) {
739            Ok(_) => return Ok(resp),
740            Err(e) => {
741                if let Some(backup_remote_fetcher) = &self.backup_remote_fetcher {
742                    return backup_remote_fetcher.multi_get_latest(objects).await;
743                }
744                return Err(e);
745            }
746        };
747    }
748
749    async fn get_checkpoint_txs(
750        &self,
751        _id: u64,
752    ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
753        unimplemented!("get_checkpoint_txs for state dump is not implemented")
754    }
755
756    async fn get_transaction(
757        &self,
758        _tx_digest: &TransactionDigest,
759    ) -> Result<IotaTransactionBlockResponse, ReplayEngineError> {
760        unimplemented!("get_transaction for state dump is not implemented")
761    }
762
763    async fn get_loaded_child_objects(
764        &self,
765        _tx_digest: &TransactionDigest,
766    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
767        Ok(self
768            .node_state_dump
769            .loaded_child_objects
770            .iter()
771            .map(|q| (q.id, q.version, q.digest))
772            .map(|w| (w.0, w.1))
773            .collect())
774    }
775
776    async fn get_latest_checkpoint_sequence_number(&self) -> Result<u64, ReplayEngineError> {
777        unimplemented!("get_latest_checkpoint_sequence_number for state dump is not implemented")
778    }
779
780    async fn fetch_random_transaction(
781        &self,
782        // TODO: add more params
783        _checkpoint_id_start: Option<u64>,
784        _checkpoint_id_end: Option<u64>,
785    ) -> Result<TransactionDigest, ReplayEngineError> {
786        unimplemented!("fetch_random_tx for state dump is not implemented")
787    }
788
789    async fn get_epoch_start_timestamp_and_rgp(
790        &self,
791        _epoch_id: u64,
792    ) -> Result<(u64, u64), ReplayEngineError> {
793        Ok((
794            self.node_state_dump.epoch_start_timestamp_ms,
795            self.node_state_dump.reference_gas_price,
796        ))
797    }
798
799    async fn get_epoch_change_events(
800        &self,
801        _reverse: bool,
802    ) -> Result<Vec<IotaEvent>, ReplayEngineError> {
803        unimplemented!("get_epoch_change_events for state dump is not implemented")
804    }
805
806    async fn get_chain_id(&self) -> Result<String, ReplayEngineError> {
807        unimplemented!("get_chain_id for state dump is not implemented")
808    }
809
810    async fn get_child_object(
811        &self,
812        _object_id: &ObjectID,
813        _version_upper_bound: VersionNumber,
814    ) -> Result<Object, ReplayEngineError> {
815        unimplemented!("get child object is not implemented for state dump");
816    }
817}