1use std::{
6 collections::{BTreeMap, HashMap},
7 path::PathBuf,
8 sync::{Arc, Mutex},
9 time::{Duration, Instant},
10};
11
12use iota_types::{
13 base_types::{IotaAddress, MoveObjectType, ObjectID, SequenceNumber},
14 committee::EpochId,
15 digests::TransactionDigest,
16 dynamic_field::visitor as DFV,
17 full_checkpoint_content::CheckpointData,
18 iota_system_state::IotaSystemStateTrait,
19 layout_resolver::LayoutResolver,
20 messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber},
21 object::{Object, Owner},
22 storage::{
23 BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, EpochInfo, TransactionInfo,
24 error::Error as StorageError,
25 },
26};
27use move_core_types::language_storage::StructTag;
28use rayon::iter::{IntoParallelIterator, ParallelIterator};
29use serde::{Deserialize, Serialize};
30use tracing::{debug, info};
31use typed_store::{
32 DBMapUtils, TypedStoreError,
33 rocks::{DBMap, MetricConf},
34 traits::{Map, TableSummary, TypedStoreDebug},
35};
36
37use crate::{
38 authority::{AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore},
39 checkpoints::CheckpointStore,
40 par_index_live_object_set::{LiveObjectIndexer, ParMakeLiveObjectIndexer},
41};
42
43const CURRENT_DB_VERSION: u64 = 1;
44
45#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
46struct MetadataInfo {
47 version: u64,
49}
50
51#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
53pub enum Watermark {
54 Indexed,
55 Pruned,
56}
57
58#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
59pub struct OwnerIndexKey {
60 pub owner: IotaAddress,
61 pub object_id: ObjectID,
62}
63
64impl OwnerIndexKey {
65 fn new(owner: IotaAddress, object_id: ObjectID) -> Self {
66 Self { owner, object_id }
67 }
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
71pub struct OwnerIndexInfo {
72 pub version: SequenceNumber,
74 pub type_: MoveObjectType,
75}
76
77impl OwnerIndexInfo {
78 pub fn new(object: &Object) -> Self {
79 Self {
80 version: object.version(),
81 type_: object.type_().expect("packages cannot be owned").to_owned(),
82 }
83 }
84}
85
86#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
87pub struct CoinIndexKey {
88 coin_type: StructTag,
89}
90
91#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
92pub struct CoinIndexInfo {
93 pub coin_metadata_object_id: Option<ObjectID>,
94 pub treasury_object_id: Option<ObjectID>,
95}
96
97impl CoinIndexInfo {
98 fn merge(&mut self, other: Self) {
99 self.coin_metadata_object_id = self
100 .coin_metadata_object_id
101 .or(other.coin_metadata_object_id);
102 self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
103 }
104}
105
106#[derive(DBMapUtils)]
116struct IndexStoreTables {
117 meta: DBMap<(), MetadataInfo>,
125
126 watermark: DBMap<Watermark, CheckpointSequenceNumber>,
133
134 epochs: DBMap<EpochId, EpochInfo>,
139
140 transactions: DBMap<TransactionDigest, TransactionInfo>,
145
146 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
151
152 dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
157
158 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
163 }
167
168impl IndexStoreTables {
169 fn open<P: Into<PathBuf>>(path: P) -> Self {
170 IndexStoreTables::open_tables_read_write(
171 path.into(),
172 MetricConf::new("rest-index"),
173 None,
174 None,
175 )
176 }
177
178 fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
179 (match self.meta.get(&()) {
180 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
181 Ok(None) => true,
182 Err(_) => true,
183 }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
184 }
185
186 fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
188 let highest_executed_checkpoint = checkpoint_store
189 .get_highest_executed_checkpoint_seq_number()
190 .ok()
191 .flatten();
192 let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
193 watermark < highest_executed_checkpoint
194 }
195
196 #[tracing::instrument(skip_all)]
197 fn init(
198 &mut self,
199 authority_store: &AuthorityStore,
200 checkpoint_store: &CheckpointStore,
201 epoch_store: &AuthorityPerEpochStore,
202 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
203 ) -> Result<(), StorageError> {
204 info!("Initializing REST indexes");
205
206 let highest_executed_checkpoint =
207 checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
208 let lowest_available_checkpoint = checkpoint_store
209 .get_highest_pruned_checkpoint_seq_number()?
210 .map(|c| c.saturating_add(1))
211 .unwrap_or(0);
212 let lowest_available_checkpoint_objects = authority_store
213 .perpetual_tables
214 .get_highest_pruned_checkpoint()?
215 .map(|c| c.saturating_add(1))
216 .unwrap_or(0);
217
218 let lowest_available_checkpoint =
221 lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
222
223 let checkpoint_range = highest_executed_checkpoint.map(|highest_executed_checkpoint| {
224 lowest_available_checkpoint..=highest_executed_checkpoint
225 });
226
227 if let Some(checkpoint_range) = checkpoint_range {
228 self.index_existing_transactions(authority_store, checkpoint_store, checkpoint_range)?;
229 }
230
231 self.initialize_current_epoch(authority_store, checkpoint_store)?;
232
233 let coin_index = Mutex::new(HashMap::new());
234
235 let make_live_object_indexer = RestParLiveObjectSetIndexer {
236 tables: self,
237 coin_index: &coin_index,
238 epoch_store,
239 package_store,
240 };
241
242 crate::par_index_live_object_set::par_index_live_object_set(
243 authority_store,
244 &make_live_object_indexer,
245 )?;
246
247 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
248
249 self.watermark.insert(
250 &Watermark::Indexed,
251 &highest_executed_checkpoint.unwrap_or(0),
252 )?;
253
254 self.meta.insert(
255 &(),
256 &MetadataInfo {
257 version: CURRENT_DB_VERSION,
258 },
259 )?;
260
261 info!("Finished initializing REST indexes");
262
263 Ok(())
264 }
265
266 #[tracing::instrument(skip(self, authority_store, checkpoint_store))]
267 fn index_existing_transactions(
268 &mut self,
269 authority_store: &AuthorityStore,
270 checkpoint_store: &CheckpointStore,
271 checkpoint_range: std::ops::RangeInclusive<u64>,
272 ) -> Result<(), StorageError> {
273 info!(
274 "Indexing {} checkpoints in range {checkpoint_range:?}",
275 checkpoint_range.size_hint().0
276 );
277 let start_time = Instant::now();
278
279 checkpoint_range.into_par_iter().try_for_each(|seq| {
280 let checkpoint_data =
281 sparse_checkpoint_data_for_backfill(authority_store, checkpoint_store, seq)?;
282
283 let mut batch = self.transactions.batch();
284
285 self.index_epoch(&checkpoint_data, &mut batch)?;
286 self.index_transactions(&checkpoint_data, &mut batch)?;
287
288 batch.write().map_err(StorageError::from)
289 })?;
290
291 info!(
292 "Indexing checkpoints took {} seconds",
293 start_time.elapsed().as_secs()
294 );
295 Ok(())
296 }
297
298 fn prune(
300 &self,
301 pruned_checkpoint_watermark: u64,
302 checkpoint_contents_to_prune: &[CheckpointContents],
303 ) -> Result<(), TypedStoreError> {
304 let mut batch = self.transactions.batch();
305
306 let transactions_to_prune = checkpoint_contents_to_prune
307 .iter()
308 .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
309
310 batch.delete_batch(&self.transactions, transactions_to_prune)?;
311 batch.insert_batch(
312 &self.watermark,
313 [(Watermark::Pruned, pruned_checkpoint_watermark)],
314 )?;
315
316 batch.write()
317 }
318
319 fn index_checkpoint(
321 &self,
322 checkpoint: &CheckpointData,
323 resolver: &mut dyn LayoutResolver,
324 ) -> Result<typed_store::rocks::DBBatch, StorageError> {
325 debug!(
326 checkpoint = checkpoint.checkpoint_summary.sequence_number,
327 "indexing checkpoint"
328 );
329
330 let mut batch = self.transactions.batch();
331
332 self.index_epoch(checkpoint, &mut batch)?;
333 self.index_transactions(checkpoint, &mut batch)?;
334 self.index_objects(checkpoint, resolver, &mut batch)?;
335
336 batch.insert_batch(
337 &self.watermark,
338 [(
339 Watermark::Indexed,
340 checkpoint.checkpoint_summary.sequence_number,
341 )],
342 )?;
343
344 debug!(
345 checkpoint = checkpoint.checkpoint_summary.sequence_number,
346 "finished indexing checkpoint"
347 );
348
349 Ok(batch)
350 }
351
352 fn index_epoch(
353 &self,
354 checkpoint: &CheckpointData,
355 batch: &mut typed_store::rocks::DBBatch,
356 ) -> Result<(), StorageError> {
357 let Some(epoch_info) = checkpoint.epoch_info()? else {
358 return Ok(());
359 };
360
361 if epoch_info.epoch > 0 {
364 let prev_epoch = epoch_info.epoch - 1;
365
366 if let Some(mut previous_epoch) = self.epochs.get(&prev_epoch)? {
367 previous_epoch.end_timestamp_ms = Some(epoch_info.start_timestamp_ms);
368 previous_epoch.end_checkpoint = Some(epoch_info.start_checkpoint - 1);
369 batch.insert_batch(&self.epochs, [(prev_epoch, previous_epoch)])?;
370 }
371 }
372
373 batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
375
376 Ok(())
377 }
378
379 fn initialize_current_epoch(
382 &mut self,
383 authority_store: &AuthorityStore,
384 checkpoint_store: &CheckpointStore,
385 ) -> Result<(), StorageError> {
386 let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
387 return Ok(());
388 };
389
390 if self.epochs.get(&checkpoint.epoch)?.is_some() {
391 return Ok(());
393 }
394
395 let system_state = iota_types::iota_system_state::get_iota_system_state(authority_store)
396 .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
397
398 let start_checkpoint = if checkpoint.epoch != 0 {
400 let previous_epoch = checkpoint.epoch - 1;
401
402 if let Some(previous_epoch_info) = self.epochs.get(&previous_epoch)? {
404 if let Some(end_checkpoint) = previous_epoch_info.end_checkpoint {
405 end_checkpoint + 1
406 } else {
407 self.scan_for_epoch_start_checkpoint(
409 checkpoint_store,
410 checkpoint.sequence_number,
411 previous_epoch,
412 )?
413 }
414 } else {
415 self.scan_for_epoch_start_checkpoint(
417 checkpoint_store,
418 checkpoint.sequence_number,
419 previous_epoch,
420 )?
421 }
422 } else {
423 0
425 };
426
427 let epoch_info = EpochInfo {
428 epoch: checkpoint.epoch,
429 protocol_version: system_state.protocol_version(),
430 start_timestamp_ms: system_state.epoch_start_timestamp_ms(),
431 end_timestamp_ms: None,
432 start_checkpoint,
433 end_checkpoint: None,
434 reference_gas_price: system_state.reference_gas_price(),
435 system_state,
436 };
437
438 self.epochs.insert(&epoch_info.epoch, &epoch_info)?;
439
440 Ok(())
441 }
442
443 fn scan_for_epoch_start_checkpoint(
444 &self,
445 checkpoint_store: &CheckpointStore,
446 current_checkpoint_seq_number: u64,
447 previous_epoch: EpochId,
448 ) -> Result<u64, StorageError> {
449 let mut last_checkpoint_seq_number_of_prev_epoch = None;
451 for seq in (0..=current_checkpoint_seq_number).rev() {
452 let Some(chkpt) = checkpoint_store
453 .get_checkpoint_by_sequence_number(seq)
454 .ok()
455 .flatten()
456 else {
457 continue;
459 };
460
461 if chkpt.epoch < previous_epoch {
462 break;
464 }
465
466 if chkpt.epoch == previous_epoch && chkpt.end_of_epoch_data.is_some() {
467 last_checkpoint_seq_number_of_prev_epoch = Some(chkpt.sequence_number);
469 break;
470 }
471 }
472
473 let last_checkpoint_seq_number_of_prev_epoch = last_checkpoint_seq_number_of_prev_epoch
474 .ok_or(StorageError::custom(format!(
475 "Failed to get the last checkpoint of the previous epoch {previous_epoch}",
476 )))?;
477
478 Ok(last_checkpoint_seq_number_of_prev_epoch + 1)
479 }
480
481 fn index_transactions(
482 &self,
483 checkpoint: &CheckpointData,
484 batch: &mut typed_store::rocks::DBBatch,
485 ) -> Result<(), StorageError> {
486 for tx in &checkpoint.transactions {
487 let info = TransactionInfo::new(
488 &tx.input_objects,
489 &tx.output_objects,
490 checkpoint.checkpoint_summary.sequence_number,
491 );
492
493 let digest = tx.transaction.digest();
494 batch.insert_batch(&self.transactions, [(digest, info)])?;
495 }
496
497 Ok(())
498 }
499
500 fn index_objects(
501 &self,
502 checkpoint: &CheckpointData,
503 resolver: &mut dyn LayoutResolver,
504 batch: &mut typed_store::rocks::DBBatch,
505 ) -> Result<(), StorageError> {
506 let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
507
508 for tx in &checkpoint.transactions {
509 for removed_object in tx.removed_objects_pre_version() {
511 match removed_object.owner() {
512 Owner::AddressOwner(address) => {
513 let owner_key = OwnerIndexKey::new(*address, removed_object.id());
514 batch.delete_batch(&self.owner, [owner_key])?;
515 }
516 Owner::ObjectOwner(object_id) => {
517 batch.delete_batch(
518 &self.dynamic_field,
519 [DynamicFieldKey::new(*object_id, removed_object.id())],
520 )?;
521 }
522 Owner::Shared { .. } | Owner::Immutable => {}
523 }
524 }
525
526 for (object, old_object) in tx.changed_objects() {
528 if let Some(old_object) = old_object {
529 match old_object.owner() {
530 Owner::AddressOwner(address) => {
531 let owner_key = OwnerIndexKey::new(*address, old_object.id());
532 batch.delete_batch(&self.owner, [owner_key])?;
533 }
534
535 Owner::ObjectOwner(object_id) => {
536 if old_object.owner() != object.owner() {
537 batch.delete_batch(
538 &self.dynamic_field,
539 [DynamicFieldKey::new(*object_id, old_object.id())],
540 )?;
541 }
542 }
543
544 Owner::Shared { .. } | Owner::Immutable => {}
545 }
546 }
547
548 match object.owner() {
549 Owner::AddressOwner(owner) => {
550 let owner_key = OwnerIndexKey::new(*owner, object.id());
551 let owner_info = OwnerIndexInfo::new(object);
552 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
553 }
554 Owner::ObjectOwner(parent) => {
555 if let Some(field_info) = try_create_dynamic_field_info(object, resolver)? {
556 let field_key = DynamicFieldKey::new(*parent, object.id());
557
558 batch.insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
559 }
560 }
561 Owner::Shared { .. } | Owner::Immutable => {}
562 }
563 }
564
565 for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
572 use std::collections::hash_map::Entry;
573
574 match coin_index.entry(key) {
575 Entry::Occupied(mut o) => {
576 o.get_mut().merge(value);
577 }
578 Entry::Vacant(v) => {
579 v.insert(value);
580 }
581 }
582 }
583 }
584
585 batch.insert_batch(&self.coin, coin_index)?;
586
587 Ok(())
588 }
589
590 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
591 self.epochs.get(&epoch)
592 }
593
594 fn get_transaction_info(
595 &self,
596 digest: &TransactionDigest,
597 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
598 self.transactions.get(digest)
599 }
600
601 fn owner_iter(
602 &self,
603 owner: IotaAddress,
604 cursor: Option<ObjectID>,
605 ) -> Result<
606 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
607 TypedStoreError,
608 > {
609 let lower_bound = OwnerIndexKey::new(owner, cursor.unwrap_or(ObjectID::ZERO));
610 let upper_bound = OwnerIndexKey::new(owner, ObjectID::MAX);
611 Ok(self
612 .owner
613 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
614 }
615
616 fn dynamic_field_iter(
617 &self,
618 parent: ObjectID,
619 cursor: Option<ObjectID>,
620 ) -> Result<
621 impl Iterator<Item = Result<(DynamicFieldKey, DynamicFieldIndexInfo), TypedStoreError>> + '_,
622 TypedStoreError,
623 > {
624 let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
625 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
626 let iter = self
627 .dynamic_field
628 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound));
629 Ok(iter)
630 }
631
632 fn get_coin_info(
633 &self,
634 coin_type: &StructTag,
635 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
636 let key = CoinIndexKey {
637 coin_type: coin_type.to_owned(),
638 };
639 self.coin.get(&key)
640 }
641}
642
643pub struct RestIndexStore {
644 tables: IndexStoreTables,
645 pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
646}
647
648impl RestIndexStore {
649 pub async fn new(
650 path: PathBuf,
651 authority_store: &AuthorityStore,
652 checkpoint_store: &CheckpointStore,
653 epoch_store: &AuthorityPerEpochStore,
654 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
655 ) -> Self {
656 let tables = {
657 let tables = IndexStoreTables::open(&path);
658
659 if tables.needs_to_do_initialization(checkpoint_store) {
662 let mut tables = {
663 drop(tables);
664 typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
665 .await
666 .expect("unable to destroy old rpc-index db");
667 IndexStoreTables::open(path)
668 };
669
670 tables
671 .init(
672 authority_store,
673 checkpoint_store,
674 epoch_store,
675 package_store,
676 )
677 .expect("unable to initialize rest index from live object set");
678 tables
679 } else {
680 tables
681 }
682 };
683
684 Self {
685 tables,
686 pending_updates: Default::default(),
687 }
688 }
689
690 pub fn new_without_init(path: PathBuf) -> Self {
691 let tables = IndexStoreTables::open(path);
692
693 Self {
694 tables,
695 pending_updates: Default::default(),
696 }
697 }
698
699 pub fn prune(
700 &self,
701 pruned_checkpoint_watermark: u64,
702 checkpoint_contents_to_prune: &[CheckpointContents],
703 ) -> Result<(), TypedStoreError> {
704 self.tables
705 .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
706 }
707
708 #[tracing::instrument(
713 skip_all,
714 fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
715 )]
716 pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
717 let sequence_number = checkpoint.checkpoint_summary.sequence_number;
718 let batch = self
719 .tables
720 .index_checkpoint(checkpoint, resolver)
721 .expect("db error");
722
723 self.pending_updates
724 .lock()
725 .unwrap()
726 .insert(sequence_number, batch);
727 }
728
729 #[tracing::instrument(skip(self))]
737 pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
738 let next_batch = self.pending_updates.lock().unwrap().pop_first();
739
740 let (next_sequence_number, batch) = next_batch.unwrap();
742 assert_eq!(
743 checkpoint, next_sequence_number,
744 "commit_update_for_checkpoint must be called in order"
745 );
746
747 Ok(batch.write()?)
748 }
749
750 pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
751 self.tables.get_epoch_info(epoch)
752 }
753
754 pub fn get_transaction_info(
755 &self,
756 digest: &TransactionDigest,
757 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
758 self.tables.get_transaction_info(digest)
759 }
760
761 pub fn owner_iter(
762 &self,
763 owner: IotaAddress,
764 cursor: Option<ObjectID>,
765 ) -> Result<
766 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
767 TypedStoreError,
768 > {
769 self.tables.owner_iter(owner, cursor)
770 }
771
772 pub fn dynamic_field_iter(
773 &self,
774 parent: ObjectID,
775 cursor: Option<ObjectID>,
776 ) -> Result<
777 impl Iterator<Item = Result<(DynamicFieldKey, DynamicFieldIndexInfo), TypedStoreError>> + '_,
778 TypedStoreError,
779 > {
780 self.tables.dynamic_field_iter(parent, cursor)
781 }
782
783 pub fn get_coin_info(
784 &self,
785 coin_type: &StructTag,
786 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
787 self.tables.get_coin_info(coin_type)
788 }
789}
790
791fn try_create_dynamic_field_info(
792 object: &Object,
793 resolver: &mut dyn LayoutResolver,
794) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
795 let Some(move_object) = object.data.try_as_move() else {
797 return Ok(None);
798 };
799
800 if !move_object.type_().is_dynamic_field() {
809 return Ok(None);
810 }
811
812 let layout = resolver
813 .get_annotated_layout(&move_object.type_().clone().into())
814 .map_err(StorageError::custom)?
815 .into_layout();
816
817 let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
818 .map_err(StorageError::custom)?;
819
820 let value_metadata = field.value_metadata().map_err(StorageError::custom)?;
821
822 Ok(Some(DynamicFieldIndexInfo {
823 name_type: field.name_layout.into(),
824 name_value: field.name_bytes.to_owned(),
825 dynamic_field_type: field.kind,
826 dynamic_object_id: if let DFV::ValueMetadata::DynamicObjectField(id) = value_metadata {
827 Some(id)
828 } else {
829 None
830 },
831 }))
832}
833
834fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
835 use iota_types::coin::{CoinMetadata, TreasuryCap};
836
837 object
838 .type_()
839 .and_then(MoveObjectType::other)
840 .and_then(|object_type| {
841 CoinMetadata::is_coin_metadata_with_coin_type(object_type)
842 .cloned()
843 .map(|coin_type| {
844 (
845 CoinIndexKey { coin_type },
846 CoinIndexInfo {
847 coin_metadata_object_id: Some(object.id()),
848 treasury_object_id: None,
849 },
850 )
851 })
852 .or_else(|| {
853 TreasuryCap::is_treasury_with_coin_type(object_type)
854 .cloned()
855 .map(|coin_type| {
856 (
857 CoinIndexKey { coin_type },
858 CoinIndexInfo {
859 coin_metadata_object_id: None,
860 treasury_object_id: Some(object.id()),
861 },
862 )
863 })
864 })
865 })
866}
867
868struct RestParLiveObjectSetIndexer<'a> {
869 tables: &'a IndexStoreTables,
870 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
871 epoch_store: &'a AuthorityPerEpochStore,
872 package_store: &'a Arc<dyn BackingPackageStore + Send + Sync>,
873}
874
875struct RestLiveObjectIndexer<'a> {
876 tables: &'a IndexStoreTables,
877 batch: typed_store::rocks::DBBatch,
878 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
879 resolver: Box<dyn LayoutResolver + 'a>,
880}
881
882impl<'a> ParMakeLiveObjectIndexer for RestParLiveObjectSetIndexer<'a> {
883 type ObjectIndexer = RestLiveObjectIndexer<'a>;
884
885 fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
886 RestLiveObjectIndexer {
887 tables: self.tables,
888 batch: self.tables.owner.batch(),
889 coin_index: self.coin_index,
890 resolver: self
891 .epoch_store
892 .executor()
893 .type_layout_resolver(Box::new(self.package_store)),
894 }
895 }
896}
897
898impl LiveObjectIndexer for RestLiveObjectIndexer<'_> {
899 fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
900 match object.owner {
901 Owner::AddressOwner(owner) => {
903 let owner_key = OwnerIndexKey::new(owner, object.id());
904 let owner_info = OwnerIndexInfo::new(&object);
905 self.batch
906 .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
907 }
908
909 Owner::ObjectOwner(parent) => {
911 if let Some(field_info) =
912 try_create_dynamic_field_info(&object, self.resolver.as_mut())?
913 {
914 let field_key = DynamicFieldKey::new(parent, object.id());
915
916 self.batch
917 .insert_batch(&self.tables.dynamic_field, [(field_key, field_info)])?;
918 }
919 }
920
921 Owner::Shared { .. } | Owner::Immutable => {}
922 }
923
924 if let Some((key, value)) = try_create_coin_index_info(&object) {
926 use std::collections::hash_map::Entry;
927
928 match self.coin_index.lock().unwrap().entry(key) {
929 Entry::Occupied(mut o) => {
930 o.get_mut().merge(value);
931 }
932 Entry::Vacant(v) => {
933 v.insert(value);
934 }
935 }
936 }
937
938 if self.batch.size_in_bytes() >= 1 << 27 {
941 std::mem::replace(&mut self.batch, self.tables.owner.batch()).write()?;
942 }
943
944 Ok(())
945 }
946
947 fn finish(self) -> Result<(), StorageError> {
948 self.batch.write()?;
949 Ok(())
950 }
951}
952
953fn sparse_checkpoint_data_for_backfill(
958 authority_store: &AuthorityStore,
959 checkpoint_store: &CheckpointStore,
960 checkpoint: u64,
961) -> Result<CheckpointData, StorageError> {
962 use iota_types::full_checkpoint_content::CheckpointTransaction;
963
964 let summary = checkpoint_store
965 .get_checkpoint_by_sequence_number(checkpoint)?
966 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
967 let contents = checkpoint_store
968 .get_checkpoint_contents(&summary.content_digest)?
969 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
970
971 let transaction_digests = contents
972 .iter()
973 .map(|execution_digests| execution_digests.transaction)
974 .collect::<Vec<_>>();
975 let transactions = authority_store
976 .multi_get_transaction_blocks(&transaction_digests)?
977 .into_iter()
978 .map(|maybe_transaction| {
979 maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
980 })
981 .collect::<Result<Vec<_>, _>>()?;
982
983 let effects = authority_store
984 .multi_get_executed_effects(&transaction_digests)?
985 .into_iter()
986 .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
987 .collect::<Result<Vec<_>, _>>()?;
988
989 let mut full_transactions = Vec::with_capacity(transactions.len());
990 for (tx, fx) in transactions.into_iter().zip(effects) {
991 let input_objects =
992 iota_types::storage::get_transaction_input_objects(authority_store, &fx)?;
993 let output_objects =
994 iota_types::storage::get_transaction_output_objects(authority_store, &fx)?;
995
996 let full_transaction = CheckpointTransaction {
997 transaction: tx.into(),
998 effects: fx,
999 events: None,
1000 input_objects,
1001 output_objects,
1002 };
1003
1004 full_transactions.push(full_transaction);
1005 }
1006
1007 let checkpoint_data = CheckpointData {
1008 checkpoint_summary: summary.into(),
1009 checkpoint_contents: contents,
1010 transactions: full_transactions,
1011 };
1012
1013 Ok(checkpoint_data)
1014}