1use std::{collections::BTreeMap, num::NonZeroUsize, str::FromStr};
6
7use async_trait::async_trait;
8use futures::future::join_all;
9use iota_core::authority::NodeStateDump;
10use iota_json_rpc_api::QUERY_MAX_RESULT_LIMIT;
11use iota_json_rpc_types::{
12 EventFilter, IotaEvent, IotaGetPastObjectRequest, IotaObjectData, IotaObjectDataOptions,
13 IotaObjectResponse, IotaPastObjectResponse, IotaTransactionBlockResponse,
14 IotaTransactionBlockResponseOptions,
15};
16use iota_sdk::IotaClient;
17use iota_types::{
18 base_types::{ObjectID, SequenceNumber, VersionNumber},
19 digests::TransactionDigest,
20 object::Object,
21 transaction::{
22 EndOfEpochTransactionKind, SenderSignedData, TransactionDataAPI, TransactionKind,
23 },
24};
25use lru::LruCache;
26use move_core_types::language_storage::StructTag;
27use parking_lot::RwLock;
28use rand::Rng;
29
30use crate::types::{EPOCH_CHANGE_STRUCT_TAG, ReplayEngineError};
31
32#[async_trait]
35pub(crate) trait DataFetcher {
36 #![allow(implied_bounds_entailment)]
37 async fn multi_get_versioned(
39 &self,
40 objects: &[(ObjectID, SequenceNumber)],
41 ) -> Result<Vec<Object>, ReplayEngineError>;
42
43 async fn multi_get_latest(
45 &self,
46 objects: &[ObjectID],
47 ) -> Result<Vec<Object>, ReplayEngineError>;
48
49 async fn get_checkpoint_txs(
51 &self,
52 id: u64,
53 ) -> Result<Vec<TransactionDigest>, ReplayEngineError>;
54
55 async fn get_transaction(
57 &self,
58 tx_digest: &TransactionDigest,
59 ) -> Result<IotaTransactionBlockResponse, ReplayEngineError>;
60
61 async fn get_loaded_child_objects(
62 &self,
63 tx_digest: &TransactionDigest,
64 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError>;
65
66 async fn get_latest_checkpoint_sequence_number(&self) -> Result<u64, ReplayEngineError>;
67
68 async fn fetch_random_transaction(
69 &self,
70 checkpoint_id_start: Option<u64>,
72 checkpoint_id_end: Option<u64>,
73 ) -> Result<TransactionDigest, ReplayEngineError>;
74
75 async fn get_epoch_start_timestamp_and_rgp(
76 &self,
77 epoch_id: u64,
78 ) -> Result<(u64, u64), ReplayEngineError>;
79
80 async fn get_epoch_change_events(
81 &self,
82 reverse: bool,
83 ) -> Result<Vec<IotaEvent>, ReplayEngineError>;
84
85 async fn get_chain_id(&self) -> Result<String, ReplayEngineError>;
86
87 async fn get_child_object(
88 &self,
89 object_id: &ObjectID,
90 version_upper_bound: VersionNumber,
91 ) -> Result<Object, ReplayEngineError>;
92}
93
94#[derive(Clone)]
95pub enum Fetchers {
96 Remote(RemoteFetcher),
97 NodeStateDump(NodeStateDumpFetcher),
98}
99
100impl Fetchers {
101 pub fn as_remote(&self) -> &RemoteFetcher {
102 match self {
103 Fetchers::Remote(q) => q,
104 Fetchers::NodeStateDump(_) => panic!("not a remote fetcher"),
105 }
106 }
107
108 pub fn into_remote(self) -> RemoteFetcher {
109 match self {
110 Fetchers::Remote(q) => {
111 q.clear_cache_for_new_task();
114 q
115 }
116 Fetchers::NodeStateDump(_) => panic!("not a remote fetcher"),
117 }
118 }
119
120 pub fn as_node_state_dump(&self) -> &NodeStateDumpFetcher {
121 match self {
122 Fetchers::Remote(_) => panic!("not a node state dump fetcher"),
123 Fetchers::NodeStateDump(q) => q,
124 }
125 }
126}
127
128#[async_trait]
129impl DataFetcher for Fetchers {
130 #![allow(implied_bounds_entailment)]
131 async fn multi_get_versioned(
132 &self,
133 objects: &[(ObjectID, SequenceNumber)],
134 ) -> Result<Vec<Object>, ReplayEngineError> {
135 match self {
136 Fetchers::Remote(q) => q.multi_get_versioned(objects).await,
137 Fetchers::NodeStateDump(q) => q.multi_get_versioned(objects).await,
138 }
139 }
140
141 async fn multi_get_latest(
142 &self,
143 objects: &[ObjectID],
144 ) -> Result<Vec<Object>, ReplayEngineError> {
145 match self {
146 Fetchers::Remote(q) => q.multi_get_latest(objects).await,
147 Fetchers::NodeStateDump(q) => q.multi_get_latest(objects).await,
148 }
149 }
150
151 async fn get_checkpoint_txs(
152 &self,
153 id: u64,
154 ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
155 match self {
156 Fetchers::Remote(q) => q.get_checkpoint_txs(id).await,
157 Fetchers::NodeStateDump(q) => q.get_checkpoint_txs(id).await,
158 }
159 }
160
161 async fn get_transaction(
162 &self,
163 tx_digest: &TransactionDigest,
164 ) -> Result<IotaTransactionBlockResponse, ReplayEngineError> {
165 match self {
166 Fetchers::Remote(q) => q.get_transaction(tx_digest).await,
167 Fetchers::NodeStateDump(q) => q.get_transaction(tx_digest).await,
168 }
169 }
170
171 async fn get_loaded_child_objects(
172 &self,
173 tx_digest: &TransactionDigest,
174 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
175 match self {
176 Fetchers::Remote(q) => q.get_loaded_child_objects(tx_digest).await,
177 Fetchers::NodeStateDump(q) => q.get_loaded_child_objects(tx_digest).await,
178 }
179 }
180
181 async fn get_latest_checkpoint_sequence_number(&self) -> Result<u64, ReplayEngineError> {
182 match self {
183 Fetchers::Remote(q) => q.get_latest_checkpoint_sequence_number().await,
184 Fetchers::NodeStateDump(q) => q.get_latest_checkpoint_sequence_number().await,
185 }
186 }
187
188 async fn fetch_random_transaction(
189 &self,
190 checkpoint_id_start: Option<u64>,
191 checkpoint_id_end: Option<u64>,
192 ) -> Result<TransactionDigest, ReplayEngineError> {
193 match self {
194 Fetchers::Remote(q) => {
195 q.fetch_random_transaction(checkpoint_id_start, checkpoint_id_end)
196 .await
197 }
198 Fetchers::NodeStateDump(q) => {
199 q.fetch_random_transaction(checkpoint_id_start, checkpoint_id_end)
200 .await
201 }
202 }
203 }
204
205 async fn get_epoch_start_timestamp_and_rgp(
206 &self,
207 epoch_id: u64,
208 ) -> Result<(u64, u64), ReplayEngineError> {
209 match self {
210 Fetchers::Remote(q) => q.get_epoch_start_timestamp_and_rgp(epoch_id).await,
211 Fetchers::NodeStateDump(q) => q.get_epoch_start_timestamp_and_rgp(epoch_id).await,
212 }
213 }
214
215 async fn get_epoch_change_events(
216 &self,
217 reverse: bool,
218 ) -> Result<Vec<IotaEvent>, ReplayEngineError> {
219 match self {
220 Fetchers::Remote(q) => q.get_epoch_change_events(reverse).await,
221 Fetchers::NodeStateDump(q) => q.get_epoch_change_events(reverse).await,
222 }
223 }
224 async fn get_chain_id(&self) -> Result<String, ReplayEngineError> {
225 match self {
226 Fetchers::Remote(q) => q.get_chain_id().await,
227 Fetchers::NodeStateDump(q) => q.get_chain_id().await,
228 }
229 }
230 async fn get_child_object(
231 &self,
232 object_id: &ObjectID,
233 version_upper_bound: VersionNumber,
234 ) -> Result<Object, ReplayEngineError> {
235 match self {
236 Fetchers::Remote(q) => q.get_child_object(object_id, version_upper_bound).await,
237 Fetchers::NodeStateDump(q) => q.get_child_object(object_id, version_upper_bound).await,
238 }
239 }
240}
241
242const VERSIONED_OBJECT_CACHE_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1_000);
243const LATEST_OBJECT_CACHE_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1_000);
244const EPOCH_INFO_CACHE_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(10_000);
245
246pub struct RemoteFetcher {
247 pub rpc_client: IotaClient,
249 pub versioned_object_cache: RwLock<LruCache<(ObjectID, VersionNumber), Object>>,
251 pub latest_object_cache: RwLock<LruCache<ObjectID, Object>>,
253 pub epoch_info_cache: RwLock<LruCache<u64, (u64, u64)>>,
255}
256
257impl Clone for RemoteFetcher {
258 fn clone(&self) -> Self {
259 let mut latest =
260 LruCache::new(LATEST_OBJECT_CACHE_CAPACITY.expect("Cache size must be non zero"));
261 self.latest_object_cache.read().iter().for_each(|(k, v)| {
262 latest.put(*k, v.clone());
263 });
264
265 let mut versioned =
266 LruCache::new(VERSIONED_OBJECT_CACHE_CAPACITY.expect("Cache size must be non zero"));
267 self.versioned_object_cache
268 .read()
269 .iter()
270 .for_each(|(k, v)| {
271 versioned.put(*k, v.clone());
272 });
273
274 let mut ep = LruCache::new(EPOCH_INFO_CACHE_CAPACITY.expect("Cache size must be non zero"));
275 self.epoch_info_cache.read().iter().for_each(|(k, v)| {
276 ep.put(*k, *v);
277 });
278
279 Self {
280 rpc_client: self.rpc_client.clone(),
281 versioned_object_cache: RwLock::new(versioned),
282 latest_object_cache: RwLock::new(latest),
283 epoch_info_cache: RwLock::new(ep),
284 }
285 }
286}
287
288impl RemoteFetcher {
289 pub fn new(rpc_client: IotaClient) -> Self {
290 Self {
291 rpc_client,
292 versioned_object_cache: RwLock::new(LruCache::new(
293 VERSIONED_OBJECT_CACHE_CAPACITY.expect("Cache size must be non zero"),
294 )),
295 latest_object_cache: RwLock::new(LruCache::new(
296 LATEST_OBJECT_CACHE_CAPACITY.expect("Cache size must be non zero"),
297 )),
298 epoch_info_cache: RwLock::new(LruCache::new(
299 EPOCH_INFO_CACHE_CAPACITY.expect("Cache size must be non zero"),
300 )),
301 }
302 }
303
304 pub fn check_versioned_cache(
305 &self,
306 objects: &[(ObjectID, VersionNumber)],
307 ) -> (Vec<Object>, Vec<(ObjectID, VersionNumber)>) {
308 let mut to_fetch = Vec::new();
309 let mut cached = Vec::new();
310 for (object_id, version) in objects {
311 if let Some(obj) = self
312 .versioned_object_cache
313 .read()
314 .peek(&(*object_id, *version))
315 {
316 cached.push(obj.clone());
317 } else {
318 to_fetch.push((*object_id, *version));
319 }
320 }
321
322 (cached, to_fetch)
323 }
324
325 pub fn check_latest_cache(&self, objects: &[ObjectID]) -> (Vec<Object>, Vec<ObjectID>) {
326 let mut to_fetch = Vec::new();
327 let mut cached = Vec::new();
328 for object_id in objects {
329 if let Some(obj) = self.latest_object_cache.read().peek(object_id) {
330 cached.push(obj.clone());
331 } else {
332 to_fetch.push(*object_id);
333 }
334 }
335
336 (cached, to_fetch)
337 }
338
339 pub fn clear_cache_for_new_task(&self) {
340 self.latest_object_cache.write().clear();
343 }
344}
345
346#[async_trait]
347impl DataFetcher for RemoteFetcher {
348 #![allow(implied_bounds_entailment)]
349 async fn multi_get_versioned(
350 &self,
351 objects: &[(ObjectID, VersionNumber)],
352 ) -> Result<Vec<Object>, ReplayEngineError> {
353 let (cached, to_fetch) = self.check_versioned_cache(objects);
355
356 let options = IotaObjectDataOptions::bcs_lossless();
357
358 let objs: Vec<_> = to_fetch
359 .iter()
360 .map(|(object_id, version)| IotaGetPastObjectRequest {
361 object_id: *object_id,
362 version: *version,
363 })
364 .collect();
365
366 let objectsx = objs.chunks(*QUERY_MAX_RESULT_LIMIT).map(|q| {
367 self.rpc_client
368 .read_api()
369 .try_multi_get_parsed_past_object(q.to_vec(), options.clone())
370 });
371
372 join_all(objectsx)
373 .await
374 .into_iter()
375 .collect::<Result<Vec<Vec<_>>, _>>()
376 .map_err(ReplayEngineError::from)?
377 .iter()
378 .flatten()
379 .map(|q| convert_past_obj_response(q.clone()))
380 .collect::<Result<Vec<_>, _>>()
381 .map(|mut x| {
382 x.extend(cached);
384 for obj in &x {
386 let r = obj.compute_object_reference();
387 self.versioned_object_cache
388 .write()
389 .put((r.0, r.1), obj.clone());
390 }
391 x
392 })
393 }
394
395 async fn get_child_object(
396 &self,
397 object_id: &ObjectID,
398 version_upper_bound: VersionNumber,
399 ) -> Result<Object, ReplayEngineError> {
400 let response = self
401 .rpc_client
402 .read_api()
403 .try_get_object_before_version(*object_id, version_upper_bound)
404 .await
405 .map_err(|q| ReplayEngineError::IotaRpcError { err: q.to_string() })?;
406 convert_past_obj_response(response)
407 }
408
409 async fn multi_get_latest(
410 &self,
411 objects: &[ObjectID],
412 ) -> Result<Vec<Object>, ReplayEngineError> {
413 let (cached, to_fetch) = self.check_latest_cache(objects);
415
416 let options = IotaObjectDataOptions::bcs_lossless();
417
418 let objectsx = to_fetch.chunks(*QUERY_MAX_RESULT_LIMIT).map(|q| {
419 self.rpc_client
420 .read_api()
421 .multi_get_object_with_options(q.to_vec(), options.clone())
422 });
423
424 join_all(objectsx)
425 .await
426 .into_iter()
427 .collect::<Result<Vec<Vec<_>>, _>>()
428 .map_err(ReplayEngineError::from)?
429 .iter()
430 .flatten()
431 .map(obj_from_iota_obj_response)
432 .collect::<Result<Vec<_>, _>>()
433 .map(|mut x| {
434 x.extend(cached);
436 for obj in &x {
438 self.latest_object_cache.write().put(obj.id(), obj.clone());
439 }
440 x
441 })
442 }
443
444 async fn get_checkpoint_txs(
445 &self,
446 id: u64,
447 ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
448 Ok(self
449 .rpc_client
450 .read_api()
451 .get_checkpoint(id.into())
452 .await
453 .map_err(|q| ReplayEngineError::IotaRpcError { err: q.to_string() })?
454 .transactions)
455 }
456
457 async fn get_transaction(
458 &self,
459 tx_digest: &TransactionDigest,
460 ) -> Result<IotaTransactionBlockResponse, ReplayEngineError> {
461 let tx_fetch_opts = IotaTransactionBlockResponseOptions::full_content();
462
463 self.rpc_client
464 .read_api()
465 .get_transaction_with_options(*tx_digest, tx_fetch_opts)
466 .await
467 .map_err(ReplayEngineError::from)
468 }
469
470 async fn get_loaded_child_objects(
471 &self,
472 _: &TransactionDigest,
473 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
474 Ok(vec![])
475 }
476
477 async fn get_latest_checkpoint_sequence_number(&self) -> Result<u64, ReplayEngineError> {
478 self.rpc_client
479 .read_api()
480 .get_latest_checkpoint_sequence_number()
481 .await
482 .map_err(ReplayEngineError::from)
483 }
484
485 async fn fetch_random_transaction(
486 &self,
487 checkpoint_id_start_inclusive: Option<u64>,
489 checkpoint_id_end_inclusive: Option<u64>,
490 ) -> Result<TransactionDigest, ReplayEngineError> {
491 let checkpoint_id_end = checkpoint_id_end_inclusive
492 .unwrap_or(self.get_latest_checkpoint_sequence_number().await?);
493 let checkpoint_id_start = checkpoint_id_start_inclusive.unwrap_or(1);
494 let checkpoint_id = rand::thread_rng().gen_range(checkpoint_id_start..=checkpoint_id_end);
495
496 let txs = self.get_checkpoint_txs(checkpoint_id).await?;
497 let tx_idx = rand::thread_rng().gen_range(0..txs.len());
498
499 Ok(txs[tx_idx])
500 }
501
502 async fn get_epoch_start_timestamp_and_rgp(
503 &self,
504 epoch_id: u64,
505 ) -> Result<(u64, u64), ReplayEngineError> {
506 if let Some((ts, rgp)) = self.epoch_info_cache.read().peek(&epoch_id) {
508 return Ok((*ts, *rgp));
509 }
510
511 let event = self
512 .get_epoch_change_events(true)
513 .await?
514 .into_iter()
515 .find(|ev| match extract_epoch_and_version(ev.clone()) {
516 Ok((epoch, _)) => epoch == epoch_id,
517 Err(_) => false,
518 })
519 .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
520
521 let reference_gas_price = if let serde_json::Value::Object(w) = event.parsed_json {
522 u64::from_str(&w["reference_gas_price"].to_string().replace('\"', "")).unwrap()
523 } else {
524 return Err(ReplayEngineError::UnexpectedEventFormat {
525 event: Box::new(event.clone()),
526 });
527 };
528
529 let epoch_change_tx = event.id.tx_digest;
530
531 let tx_info = self.get_transaction(&epoch_change_tx).await?;
533
534 let orig_tx: SenderSignedData = bcs::from_bytes(&tx_info.raw_transaction).unwrap();
535 let tx_kind_orig = orig_tx.transaction_data().kind();
536
537 if let TransactionKind::EndOfEpochTransaction(kinds) = tx_kind_orig {
538 for kind in kinds {
539 if let EndOfEpochTransactionKind::ChangeEpoch(change) = kind {
540 self.epoch_info_cache.write().put(
542 epoch_id,
543 (change.epoch_start_timestamp_ms, reference_gas_price),
544 );
545
546 return Ok((change.epoch_start_timestamp_ms, reference_gas_price));
547 }
548 }
549 }
550 Err(ReplayEngineError::InvalidEpochChangeTx { epoch: epoch_id })
551 }
552
553 async fn get_epoch_change_events(
554 &self,
555 reverse: bool,
556 ) -> Result<Vec<IotaEvent>, ReplayEngineError> {
557 let struct_tag_str = EPOCH_CHANGE_STRUCT_TAG.to_string();
558 let struct_tag = StructTag::from_str(&struct_tag_str)?;
559
560 let mut epoch_change_events: Vec<IotaEvent> = vec![];
561 let mut has_next_page = true;
562 let mut cursor = None;
563
564 while has_next_page {
565 let page_data = self
566 .rpc_client
567 .event_api()
568 .query_events(
569 EventFilter::MoveEventType(struct_tag.clone()),
570 cursor,
571 None,
572 reverse,
573 )
574 .await
575 .map_err(|e| ReplayEngineError::UnableToQuerySystemEvents {
576 rpc_err: e.to_string(),
577 })?;
578 epoch_change_events.extend(page_data.data);
579 has_next_page = page_data.has_next_page;
580 cursor = page_data.next_cursor;
581 }
582
583 Ok(epoch_change_events)
584 }
585
586 async fn get_chain_id(&self) -> Result<String, ReplayEngineError> {
587 let chain_id = self
588 .rpc_client
589 .read_api()
590 .get_chain_identifier()
591 .await
592 .map_err(|e| ReplayEngineError::UnableToGetChainId { err: e.to_string() })?;
593 Ok(chain_id)
594 }
595}
596
597fn convert_past_obj_response(resp: IotaPastObjectResponse) -> Result<Object, ReplayEngineError> {
598 match resp {
599 IotaPastObjectResponse::VersionFound(o) => obj_from_iota_obj_data(&o),
600 IotaPastObjectResponse::ObjectDeleted(r) => Err(ReplayEngineError::ObjectDeleted {
601 id: r.object_id,
602 version: r.version,
603 digest: r.digest,
604 }),
605 IotaPastObjectResponse::ObjectNotExists(id) => {
606 Err(ReplayEngineError::ObjectNotExist { id })
607 }
608 IotaPastObjectResponse::VersionNotFound(id, version) => {
609 Err(ReplayEngineError::ObjectVersionNotFound { id, version })
610 }
611 IotaPastObjectResponse::VersionTooHigh {
612 object_id,
613 asked_version,
614 latest_version,
615 } => Err(ReplayEngineError::ObjectVersionTooHigh {
616 id: object_id,
617 asked_version,
618 latest_version,
619 }),
620 }
621}
622
623fn obj_from_iota_obj_response(o: &IotaObjectResponse) -> Result<Object, ReplayEngineError> {
624 let o = o.object().map_err(ReplayEngineError::from)?.clone();
625 obj_from_iota_obj_data(&o)
626}
627
628fn obj_from_iota_obj_data(o: &IotaObjectData) -> Result<Object, ReplayEngineError> {
629 match TryInto::<Object>::try_into(o.clone()) {
630 Ok(obj) => Ok(obj),
631 Err(e) => Err(e.into()),
632 }
633}
634
635pub fn extract_epoch_and_version(ev: IotaEvent) -> Result<(u64, u64), ReplayEngineError> {
636 if let serde_json::Value::Object(w) = ev.parsed_json {
637 let epoch = u64::from_str(&w["epoch"].to_string().replace('\"', "")).unwrap();
638 let version = u64::from_str(&w["protocol_version"].to_string().replace('\"', "")).unwrap();
639 return Ok((epoch, version));
640 }
641
642 Err(ReplayEngineError::UnexpectedEventFormat {
643 event: Box::new(ev),
644 })
645}
646
647#[derive(Clone)]
648pub struct NodeStateDumpFetcher {
649 pub node_state_dump: NodeStateDump,
650 pub object_ref_pool: BTreeMap<(ObjectID, SequenceNumber), Object>,
651 pub latest_object_version_pool: BTreeMap<ObjectID, Object>,
652
653 pub backup_remote_fetcher: Option<RemoteFetcher>,
655}
656
657impl From<NodeStateDump> for NodeStateDumpFetcher {
658 fn from(node_state_dump: NodeStateDump) -> Self {
659 let mut object_ref_pool = BTreeMap::new();
660 let mut latest_object_version_pool: BTreeMap<ObjectID, Object> = BTreeMap::new();
661
662 node_state_dump
663 .all_objects()
664 .iter()
665 .for_each(|current_obj| {
666 object_ref_pool.insert(
668 (current_obj.id, current_obj.version),
669 current_obj.object.clone(),
670 );
671
672 if let Some(last_seen_obj) = latest_object_version_pool.get(¤t_obj.id) {
674 if current_obj.version <= last_seen_obj.version() {
675 return;
676 }
677 };
678 latest_object_version_pool.insert(current_obj.id, current_obj.object.clone());
679 });
680 Self {
681 node_state_dump,
682 object_ref_pool,
683 latest_object_version_pool,
684 backup_remote_fetcher: None,
685 }
686 }
687}
688
689impl NodeStateDumpFetcher {
690 pub fn new(
691 node_state_dump: NodeStateDump,
692 backup_remote_fetcher: Option<RemoteFetcher>,
693 ) -> Self {
694 let mut s = Self::from(node_state_dump);
695 s.backup_remote_fetcher = backup_remote_fetcher;
696 s
697 }
698}
699
700#[async_trait]
701impl DataFetcher for NodeStateDumpFetcher {
702 async fn multi_get_versioned(
703 &self,
704 objects: &[(ObjectID, SequenceNumber)],
705 ) -> Result<Vec<Object>, ReplayEngineError> {
706 let mut resp = vec![];
707 match objects.iter().try_for_each(|(id, version)| {
708 if let Some(obj) = self.object_ref_pool.get(&(*id, *version)) {
709 resp.push(obj.clone());
710 return Ok(());
711 }
712 Err(ReplayEngineError::ObjectVersionNotFound {
713 id: *id,
714 version: *version,
715 })
716 }) {
717 Ok(_) => return Ok(resp),
718 Err(e) => {
719 if let Some(backup_remote_fetcher) = &self.backup_remote_fetcher {
720 return backup_remote_fetcher.multi_get_versioned(objects).await;
721 }
722 return Err(e);
723 }
724 };
725 }
726
727 async fn multi_get_latest(
728 &self,
729 objects: &[ObjectID],
730 ) -> Result<Vec<Object>, ReplayEngineError> {
731 let mut resp = vec![];
732 match objects.iter().try_for_each(|id| {
733 if let Some(obj) = self.latest_object_version_pool.get(id) {
734 resp.push(obj.clone());
735 return Ok(());
736 }
737 Err(ReplayEngineError::ObjectNotExist { id: *id })
738 }) {
739 Ok(_) => return Ok(resp),
740 Err(e) => {
741 if let Some(backup_remote_fetcher) = &self.backup_remote_fetcher {
742 return backup_remote_fetcher.multi_get_latest(objects).await;
743 }
744 return Err(e);
745 }
746 };
747 }
748
749 async fn get_checkpoint_txs(
750 &self,
751 _id: u64,
752 ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
753 unimplemented!("get_checkpoint_txs for state dump is not implemented")
754 }
755
756 async fn get_transaction(
757 &self,
758 _tx_digest: &TransactionDigest,
759 ) -> Result<IotaTransactionBlockResponse, ReplayEngineError> {
760 unimplemented!("get_transaction for state dump is not implemented")
761 }
762
763 async fn get_loaded_child_objects(
764 &self,
765 _tx_digest: &TransactionDigest,
766 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
767 Ok(self
768 .node_state_dump
769 .loaded_child_objects
770 .iter()
771 .map(|q| (q.id, q.version, q.digest))
772 .map(|w| (w.0, w.1))
773 .collect())
774 }
775
776 async fn get_latest_checkpoint_sequence_number(&self) -> Result<u64, ReplayEngineError> {
777 unimplemented!("get_latest_checkpoint_sequence_number for state dump is not implemented")
778 }
779
780 async fn fetch_random_transaction(
781 &self,
782 _checkpoint_id_start: Option<u64>,
784 _checkpoint_id_end: Option<u64>,
785 ) -> Result<TransactionDigest, ReplayEngineError> {
786 unimplemented!("fetch_random_tx for state dump is not implemented")
787 }
788
789 async fn get_epoch_start_timestamp_and_rgp(
790 &self,
791 _epoch_id: u64,
792 ) -> Result<(u64, u64), ReplayEngineError> {
793 Ok((
794 self.node_state_dump.epoch_start_timestamp_ms,
795 self.node_state_dump.reference_gas_price,
796 ))
797 }
798
799 async fn get_epoch_change_events(
800 &self,
801 _reverse: bool,
802 ) -> Result<Vec<IotaEvent>, ReplayEngineError> {
803 unimplemented!("get_epoch_change_events for state dump is not implemented")
804 }
805
806 async fn get_chain_id(&self) -> Result<String, ReplayEngineError> {
807 unimplemented!("get_chain_id for state dump is not implemented")
808 }
809
810 async fn get_child_object(
811 &self,
812 _object_id: &ObjectID,
813 _version_upper_bound: VersionNumber,
814 ) -> Result<Object, ReplayEngineError> {
815 unimplemented!("get child object is not implemented for state dump");
816 }
817}