iota_core/
rest_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    path::PathBuf,
8    sync::{Arc, Mutex},
9    time::{Duration, Instant},
10};
11
12use iota_types::{
13    base_types::{IotaAddress, MoveObjectType, ObjectID, SequenceNumber},
14    committee::EpochId,
15    digests::TransactionDigest,
16    dynamic_field::visitor as DFV,
17    full_checkpoint_content::CheckpointData,
18    iota_system_state::IotaSystemStateTrait,
19    layout_resolver::LayoutResolver,
20    messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber},
21    object::{Object, Owner},
22    storage::{
23        BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, EpochInfo, TransactionInfo,
24        error::Error as StorageError,
25    },
26};
27use move_core_types::language_storage::StructTag;
28use rayon::iter::{IntoParallelIterator, ParallelIterator};
29use serde::{Deserialize, Serialize};
30use tracing::{debug, info};
31use typed_store::{
32    DBMapUtils, TypedStoreError,
33    rocks::{DBMap, MetricConf},
34    traits::{Map, TableSummary, TypedStoreDebug},
35};
36
37use crate::{
38    authority::{AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore},
39    checkpoints::CheckpointStore,
40    par_index_live_object_set::{LiveObjectIndexer, ParMakeLiveObjectIndexer},
41};
42
43const CURRENT_DB_VERSION: u64 = 1;
44
45#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
46struct MetadataInfo {
47    /// Version of the Database
48    version: u64,
49}
50
51/// Checkpoint watermark type
52#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
53pub enum Watermark {
54    Indexed,
55    Pruned,
56}
57
58#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
59pub struct OwnerIndexKey {
60    pub owner: IotaAddress,
61    pub object_id: ObjectID,
62}
63
64impl OwnerIndexKey {
65    fn new(owner: IotaAddress, object_id: ObjectID) -> Self {
66        Self { owner, object_id }
67    }
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
71pub struct OwnerIndexInfo {
72    // object_id of the object is a part of the Key
73    pub version: SequenceNumber,
74    pub type_: MoveObjectType,
75}
76
77impl OwnerIndexInfo {
78    pub fn new(object: &Object) -> Self {
79        Self {
80            version: object.version(),
81            type_: object.type_().expect("packages cannot be owned").to_owned(),
82        }
83    }
84}
85
86#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
87pub struct CoinIndexKey {
88    coin_type: StructTag,
89}
90
91#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
92pub struct CoinIndexInfo {
93    pub coin_metadata_object_id: Option<ObjectID>,
94    pub treasury_object_id: Option<ObjectID>,
95}
96
97impl CoinIndexInfo {
98    fn merge(&mut self, other: Self) {
99        self.coin_metadata_object_id = self
100            .coin_metadata_object_id
101            .or(other.coin_metadata_object_id);
102        self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
103    }
104}
105
106/// RocksDB tables for the RestIndexStore
107///
108/// Anytime a new table is added, or and existing one has it's schema changed,
109/// make sure to also update the value of `CURRENT_DB_VERSION`.
110///
111/// NOTE: Authors and Reviewers before adding any new tables ensure that they
112/// are either:
113/// - bounded in size by the live object set
114/// - are prune-able and have corresponding logic in the `prune` function
115#[derive(DBMapUtils)]
116struct IndexStoreTables {
117    /// A singleton that store metadata information on the DB.
118    ///
119    /// A few uses for this singleton:
120    /// - determining if the DB has been initialized (as some tables will still
121    ///   be empty post initialization)
122    /// - version of the DB. Everytime a new table or schema is changed the
123    ///   version number needs to be incremented.
124    meta: DBMap<(), MetadataInfo>,
125
126    /// Table used to track watermark for the highest indexed checkpoint
127    ///
128    /// This is useful to help know the highest checkpoint that was indexed in
129    /// the event that the node was running with indexes enabled, then run
130    /// for a period of time with indexes disabled, and then run with them
131    /// enabled again so that the tables can be reinitialized.
132    watermark: DBMap<Watermark, CheckpointSequenceNumber>,
133
134    /// An index of extra metadata for Epochs.
135    ///
136    /// Only contains entries for epochs which have yet to be pruned from the
137    /// main database.
138    epochs: DBMap<EpochId, EpochInfo>,
139
140    /// An index of extra metadata for Transactions.
141    ///
142    /// Only contains entries for transactions which have yet to be pruned from
143    /// the main database.
144    transactions: DBMap<TransactionDigest, TransactionInfo>,
145
146    /// An index of object ownership.
147    ///
148    /// Allows an efficient iterator to list all objects currently owned by a
149    /// specific user account.
150    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
151
152    /// An index of dynamic fields (children objects).
153    ///
154    /// Allows an efficient iterator to list all of the dynamic fields owned by
155    /// a particular ObjectID.
156    dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
157
158    /// An index of Coin Types
159    ///
160    /// Allows looking up information related to published Coins, like the
161    /// ObjectID of its coorisponding CoinMetadata.
162    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
163    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
164    // - bounded in size by the live object set
165    // - are prune-able and have corresponding logic in the `prune` function
166}
167
168impl IndexStoreTables {
169    fn open<P: Into<PathBuf>>(path: P) -> Self {
170        IndexStoreTables::open_tables_read_write(
171            path.into(),
172            MetricConf::new("rest-index"),
173            None,
174            None,
175        )
176    }
177
178    fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
179        (match self.meta.get(&()) {
180            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
181            Ok(None) => true,
182            Err(_) => true,
183        }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
184    }
185
186    // Check if the index watermark is behind the highets_executed watermark.
187    fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
188        let highest_executed_checkpoint = checkpoint_store
189            .get_highest_executed_checkpoint_seq_number()
190            .ok()
191            .flatten();
192        let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
193        watermark < highest_executed_checkpoint
194    }
195
196    #[tracing::instrument(skip_all)]
197    fn init(
198        &mut self,
199        authority_store: &AuthorityStore,
200        checkpoint_store: &CheckpointStore,
201        epoch_store: &AuthorityPerEpochStore,
202        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
203    ) -> Result<(), StorageError> {
204        info!("Initializing REST indexes");
205
206        let highest_executed_checkpoint =
207            checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
208        let lowest_available_checkpoint = checkpoint_store
209            .get_highest_pruned_checkpoint_seq_number()?
210            .map(|c| c.saturating_add(1))
211            .unwrap_or(0);
212        let lowest_available_checkpoint_objects = authority_store
213            .perpetual_tables
214            .get_highest_pruned_checkpoint()?
215            .map(|c| c.saturating_add(1))
216            .unwrap_or(0);
217
218        // Doing backfill requires processing objects so we have to restrict our
219        // backfill range to the range of checkpoints that we have objects for.
220        let lowest_available_checkpoint =
221            lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
222
223        let checkpoint_range = highest_executed_checkpoint.map(|highest_executed_checkpoint| {
224            lowest_available_checkpoint..=highest_executed_checkpoint
225        });
226
227        if let Some(checkpoint_range) = checkpoint_range {
228            self.index_existing_transactions(authority_store, checkpoint_store, checkpoint_range)?;
229        }
230
231        self.initialize_current_epoch(authority_store, checkpoint_store)?;
232
233        let coin_index = Mutex::new(HashMap::new());
234
235        let make_live_object_indexer = RestParLiveObjectSetIndexer {
236            tables: self,
237            coin_index: &coin_index,
238            epoch_store,
239            package_store,
240        };
241
242        crate::par_index_live_object_set::par_index_live_object_set(
243            authority_store,
244            &make_live_object_indexer,
245        )?;
246
247        self.coin.multi_insert(coin_index.into_inner().unwrap())?;
248
249        self.watermark.insert(
250            &Watermark::Indexed,
251            &highest_executed_checkpoint.unwrap_or(0),
252        )?;
253
254        self.meta.insert(
255            &(),
256            &MetadataInfo {
257                version: CURRENT_DB_VERSION,
258            },
259        )?;
260
261        info!("Finished initializing REST indexes");
262
263        Ok(())
264    }
265
266    #[tracing::instrument(skip(self, authority_store, checkpoint_store))]
267    fn index_existing_transactions(
268        &mut self,
269        authority_store: &AuthorityStore,
270        checkpoint_store: &CheckpointStore,
271        checkpoint_range: std::ops::RangeInclusive<u64>,
272    ) -> Result<(), StorageError> {
273        info!(
274            "Indexing {} checkpoints in range {checkpoint_range:?}",
275            checkpoint_range.size_hint().0
276        );
277        let start_time = Instant::now();
278
279        checkpoint_range.into_par_iter().try_for_each(|seq| {
280            let checkpoint_data =
281                sparse_checkpoint_data_for_backfill(authority_store, checkpoint_store, seq)?;
282
283            let mut batch = self.transactions.batch();
284
285            self.index_epoch(&checkpoint_data, &mut batch)?;
286            self.index_transactions(&checkpoint_data, &mut batch)?;
287
288            batch.write().map_err(StorageError::from)
289        })?;
290
291        info!(
292            "Indexing checkpoints took {} seconds",
293            start_time.elapsed().as_secs()
294        );
295        Ok(())
296    }
297
298    /// Prune data from this Index
299    fn prune(
300        &self,
301        pruned_checkpoint_watermark: u64,
302        checkpoint_contents_to_prune: &[CheckpointContents],
303    ) -> Result<(), TypedStoreError> {
304        let mut batch = self.transactions.batch();
305
306        let transactions_to_prune = checkpoint_contents_to_prune
307            .iter()
308            .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
309
310        batch.delete_batch(&self.transactions, transactions_to_prune)?;
311        batch.insert_batch(
312            &self.watermark,
313            [(Watermark::Pruned, pruned_checkpoint_watermark)],
314        )?;
315
316        batch.write()
317    }
318
319    /// Index a Checkpoint
320    fn index_checkpoint(
321        &self,
322        checkpoint: &CheckpointData,
323        resolver: &mut dyn LayoutResolver,
324    ) -> Result<typed_store::rocks::DBBatch, StorageError> {
325        debug!(
326            checkpoint = checkpoint.checkpoint_summary.sequence_number,
327            "indexing checkpoint"
328        );
329
330        let mut batch = self.transactions.batch();
331
332        self.index_epoch(checkpoint, &mut batch)?;
333        self.index_transactions(checkpoint, &mut batch)?;
334        self.index_objects(checkpoint, resolver, &mut batch)?;
335
336        batch.insert_batch(
337            &self.watermark,
338            [(
339                Watermark::Indexed,
340                checkpoint.checkpoint_summary.sequence_number,
341            )],
342        )?;
343
344        debug!(
345            checkpoint = checkpoint.checkpoint_summary.sequence_number,
346            "finished indexing checkpoint"
347        );
348
349        Ok(batch)
350    }
351
352    fn index_epoch(
353        &self,
354        checkpoint: &CheckpointData,
355        batch: &mut typed_store::rocks::DBBatch,
356    ) -> Result<(), StorageError> {
357        let Some(epoch_info) = checkpoint.epoch_info()? else {
358            return Ok(());
359        };
360
361        // We need to handle closing the previous epoch by updating the entry for it, if
362        // it exists.
363        if epoch_info.epoch > 0 {
364            let prev_epoch = epoch_info.epoch - 1;
365
366            if let Some(mut previous_epoch) = self.epochs.get(&prev_epoch)? {
367                previous_epoch.end_timestamp_ms = Some(epoch_info.start_timestamp_ms);
368                previous_epoch.end_checkpoint = Some(epoch_info.start_checkpoint - 1);
369                batch.insert_batch(&self.epochs, [(prev_epoch, previous_epoch)])?;
370            }
371        }
372
373        // Insert the current epoch info
374        batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
375
376        Ok(())
377    }
378
379    // After attempting to reindex past epochs, ensure that the current epoch is at
380    // least partially initialized
381    fn initialize_current_epoch(
382        &mut self,
383        authority_store: &AuthorityStore,
384        checkpoint_store: &CheckpointStore,
385    ) -> Result<(), StorageError> {
386        let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
387            return Ok(());
388        };
389
390        if self.epochs.get(&checkpoint.epoch)?.is_some() {
391            // no need to initialize if it already exists
392            return Ok(());
393        }
394
395        let system_state = iota_types::iota_system_state::get_iota_system_state(authority_store)
396            .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
397
398        // Determine the start checkpoint of the current epoch
399        let start_checkpoint = if checkpoint.epoch != 0 {
400            let previous_epoch = checkpoint.epoch - 1;
401
402            // Find the last checkpoint of the previous epoch
403            if let Some(previous_epoch_info) = self.epochs.get(&previous_epoch)? {
404                if let Some(end_checkpoint) = previous_epoch_info.end_checkpoint {
405                    end_checkpoint + 1
406                } else {
407                    // Fall back to scanning checkpoints if the end_checkpoint is None
408                    self.scan_for_epoch_start_checkpoint(
409                        checkpoint_store,
410                        checkpoint.sequence_number,
411                        previous_epoch,
412                    )?
413                }
414            } else {
415                // Fall back to scanning checkpoints if the previous epoch info is missing
416                self.scan_for_epoch_start_checkpoint(
417                    checkpoint_store,
418                    checkpoint.sequence_number,
419                    previous_epoch,
420                )?
421            }
422        } else {
423            // First epoch starts at checkpoint 0
424            0
425        };
426
427        let epoch_info = EpochInfo {
428            epoch: checkpoint.epoch,
429            protocol_version: system_state.protocol_version(),
430            start_timestamp_ms: system_state.epoch_start_timestamp_ms(),
431            end_timestamp_ms: None,
432            start_checkpoint,
433            end_checkpoint: None,
434            reference_gas_price: system_state.reference_gas_price(),
435            system_state,
436        };
437
438        self.epochs.insert(&epoch_info.epoch, &epoch_info)?;
439
440        Ok(())
441    }
442
443    fn scan_for_epoch_start_checkpoint(
444        &self,
445        checkpoint_store: &CheckpointStore,
446        current_checkpoint_seq_number: u64,
447        previous_epoch: EpochId,
448    ) -> Result<u64, StorageError> {
449        // Scan from current checkpoint backwards to 0 to find the start of this epoch.
450        let mut last_checkpoint_seq_number_of_prev_epoch = None;
451        for seq in (0..=current_checkpoint_seq_number).rev() {
452            let Some(chkpt) = checkpoint_store
453                .get_checkpoint_by_sequence_number(seq)
454                .ok()
455                .flatten()
456            else {
457                // continue if there is a gap in the checkpoints
458                continue;
459            };
460
461            if chkpt.epoch < previous_epoch {
462                // we must stop searching if we are past the previous epoch
463                break;
464            }
465
466            if chkpt.epoch == previous_epoch && chkpt.end_of_epoch_data.is_some() {
467                // We found the checkpoint with end of epoch data for the previous epoch
468                last_checkpoint_seq_number_of_prev_epoch = Some(chkpt.sequence_number);
469                break;
470            }
471        }
472
473        let last_checkpoint_seq_number_of_prev_epoch = last_checkpoint_seq_number_of_prev_epoch
474            .ok_or(StorageError::custom(format!(
475                "Failed to get the last checkpoint of the previous epoch {previous_epoch}",
476            )))?;
477
478        Ok(last_checkpoint_seq_number_of_prev_epoch + 1)
479    }
480
481    fn index_transactions(
482        &self,
483        checkpoint: &CheckpointData,
484        batch: &mut typed_store::rocks::DBBatch,
485    ) -> Result<(), StorageError> {
486        for tx in &checkpoint.transactions {
487            let info = TransactionInfo::new(
488                &tx.input_objects,
489                &tx.output_objects,
490                checkpoint.checkpoint_summary.sequence_number,
491            );
492
493            let digest = tx.transaction.digest();
494            batch.insert_batch(&self.transactions, [(digest, info)])?;
495        }
496
497        Ok(())
498    }
499
500    fn index_objects(
501        &self,
502        checkpoint: &CheckpointData,
503        resolver: &mut dyn LayoutResolver,
504        batch: &mut typed_store::rocks::DBBatch,
505    ) -> Result<(), StorageError> {
506        let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
507
508        for tx in &checkpoint.transactions {
509            // determine changes from removed objects
510            for removed_object in tx.removed_objects_pre_version() {
511                match removed_object.owner() {
512                    Owner::AddressOwner(address) => {
513                        let owner_key = OwnerIndexKey::new(*address, removed_object.id());
514                        batch.delete_batch(&self.owner, [owner_key])?;
515                    }
516                    Owner::ObjectOwner(object_id) => {
517                        batch.delete_batch(
518                            &self.dynamic_field,
519                            [DynamicFieldKey::new(*object_id, removed_object.id())],
520                        )?;
521                    }
522                    Owner::Shared { .. } | Owner::Immutable => {}
523                }
524            }
525
526            // determine changes from changed objects
527            for (object, old_object) in tx.changed_objects() {
528                if let Some(old_object) = old_object {
529                    match old_object.owner() {
530                        Owner::AddressOwner(address) => {
531                            let owner_key = OwnerIndexKey::new(*address, old_object.id());
532                            batch.delete_batch(&self.owner, [owner_key])?;
533                        }
534
535                        Owner::ObjectOwner(object_id) => {
536                            if old_object.owner() != object.owner() {
537                                batch.delete_batch(
538                                    &self.dynamic_field,
539                                    [DynamicFieldKey::new(*object_id, old_object.id())],
540                                )?;
541                            }
542                        }
543
544                        Owner::Shared { .. } | Owner::Immutable => {}
545                    }
546                }
547
548                match object.owner() {
549                    Owner::AddressOwner(owner) => {
550                        let owner_key = OwnerIndexKey::new(*owner, object.id());
551                        let owner_info = OwnerIndexInfo::new(object);
552                        batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
553                    }
554                    Owner::ObjectOwner(parent) => {
555                        if let Some(field_info) = try_create_dynamic_field_info(object, resolver)? {
556                            let field_key = DynamicFieldKey::new(*parent, object.id());
557
558                            batch.insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
559                        }
560                    }
561                    Owner::Shared { .. } | Owner::Immutable => {}
562                }
563            }
564
565            // coin indexing
566            //
567            // coin indexing relies on the fact that CoinMetadata and TreasuryCap are
568            // created in the same transaction so we don't need to worry about
569            // overriding any older value that may exist in the database
570            // (because there necessarily cannot be).
571            for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
572                use std::collections::hash_map::Entry;
573
574                match coin_index.entry(key) {
575                    Entry::Occupied(mut o) => {
576                        o.get_mut().merge(value);
577                    }
578                    Entry::Vacant(v) => {
579                        v.insert(value);
580                    }
581                }
582            }
583        }
584
585        batch.insert_batch(&self.coin, coin_index)?;
586
587        Ok(())
588    }
589
590    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
591        self.epochs.get(&epoch)
592    }
593
594    fn get_transaction_info(
595        &self,
596        digest: &TransactionDigest,
597    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
598        self.transactions.get(digest)
599    }
600
601    fn owner_iter(
602        &self,
603        owner: IotaAddress,
604        cursor: Option<ObjectID>,
605    ) -> Result<
606        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
607        TypedStoreError,
608    > {
609        let lower_bound = OwnerIndexKey::new(owner, cursor.unwrap_or(ObjectID::ZERO));
610        let upper_bound = OwnerIndexKey::new(owner, ObjectID::MAX);
611        Ok(self
612            .owner
613            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
614    }
615
616    fn dynamic_field_iter(
617        &self,
618        parent: ObjectID,
619        cursor: Option<ObjectID>,
620    ) -> Result<
621        impl Iterator<Item = Result<(DynamicFieldKey, DynamicFieldIndexInfo), TypedStoreError>> + '_,
622        TypedStoreError,
623    > {
624        let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
625        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
626        let iter = self
627            .dynamic_field
628            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound));
629        Ok(iter)
630    }
631
632    fn get_coin_info(
633        &self,
634        coin_type: &StructTag,
635    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
636        let key = CoinIndexKey {
637            coin_type: coin_type.to_owned(),
638        };
639        self.coin.get(&key)
640    }
641}
642
643pub struct RestIndexStore {
644    tables: IndexStoreTables,
645    pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
646}
647
648impl RestIndexStore {
649    pub async fn new(
650        path: PathBuf,
651        authority_store: &AuthorityStore,
652        checkpoint_store: &CheckpointStore,
653        epoch_store: &AuthorityPerEpochStore,
654        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
655    ) -> Self {
656        let tables = {
657            let tables = IndexStoreTables::open(&path);
658
659            // If the index tables are uninitialized or on an older version then we need to
660            // populate them
661            if tables.needs_to_do_initialization(checkpoint_store) {
662                let mut tables = {
663                    drop(tables);
664                    typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
665                        .await
666                        .expect("unable to destroy old rpc-index db");
667                    IndexStoreTables::open(path)
668                };
669
670                tables
671                    .init(
672                        authority_store,
673                        checkpoint_store,
674                        epoch_store,
675                        package_store,
676                    )
677                    .expect("unable to initialize rest index from live object set");
678                tables
679            } else {
680                tables
681            }
682        };
683
684        Self {
685            tables,
686            pending_updates: Default::default(),
687        }
688    }
689
690    pub fn new_without_init(path: PathBuf) -> Self {
691        let tables = IndexStoreTables::open(path);
692
693        Self {
694            tables,
695            pending_updates: Default::default(),
696        }
697    }
698
699    pub fn prune(
700        &self,
701        pruned_checkpoint_watermark: u64,
702        checkpoint_contents_to_prune: &[CheckpointContents],
703    ) -> Result<(), TypedStoreError> {
704        self.tables
705            .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
706    }
707
708    /// Index a checkpoint and stage the index updated in `pending_updates`.
709    ///
710    /// Updates will not be committed to the database until
711    /// `commit_update_for_checkpoint` is called.
712    #[tracing::instrument(
713        skip_all,
714        fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
715    )]
716    pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
717        let sequence_number = checkpoint.checkpoint_summary.sequence_number;
718        let batch = self
719            .tables
720            .index_checkpoint(checkpoint, resolver)
721            .expect("db error");
722
723        self.pending_updates
724            .lock()
725            .unwrap()
726            .insert(sequence_number, batch);
727    }
728
729    /// Commits the pending updates for the provided checkpoint number.
730    ///
731    /// Invariants:
732    /// - `index_checkpoint` must have been called for the provided checkpoint
733    /// - Callers of this function must ensure that it is called for each
734    ///   checkpoint in sequential order. This will panic if the provided
735    ///   checkpoint does not match the expected next checkpoint to commit.
736    #[tracing::instrument(skip(self))]
737    pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
738        let next_batch = self.pending_updates.lock().unwrap().pop_first();
739
740        // Its expected that the next batch exists
741        let (next_sequence_number, batch) = next_batch.unwrap();
742        assert_eq!(
743            checkpoint, next_sequence_number,
744            "commit_update_for_checkpoint must be called in order"
745        );
746
747        Ok(batch.write()?)
748    }
749
750    pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
751        self.tables.get_epoch_info(epoch)
752    }
753
754    pub fn get_transaction_info(
755        &self,
756        digest: &TransactionDigest,
757    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
758        self.tables.get_transaction_info(digest)
759    }
760
761    pub fn owner_iter(
762        &self,
763        owner: IotaAddress,
764        cursor: Option<ObjectID>,
765    ) -> Result<
766        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
767        TypedStoreError,
768    > {
769        self.tables.owner_iter(owner, cursor)
770    }
771
772    pub fn dynamic_field_iter(
773        &self,
774        parent: ObjectID,
775        cursor: Option<ObjectID>,
776    ) -> Result<
777        impl Iterator<Item = Result<(DynamicFieldKey, DynamicFieldIndexInfo), TypedStoreError>> + '_,
778        TypedStoreError,
779    > {
780        self.tables.dynamic_field_iter(parent, cursor)
781    }
782
783    pub fn get_coin_info(
784        &self,
785        coin_type: &StructTag,
786    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
787        self.tables.get_coin_info(coin_type)
788    }
789}
790
791fn try_create_dynamic_field_info(
792    object: &Object,
793    resolver: &mut dyn LayoutResolver,
794) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
795    // Skip if not a move object
796    let Some(move_object) = object.data.try_as_move() else {
797        return Ok(None);
798    };
799
800    // Skip any objects that aren't of type `Field<Name, Value>`
801    //
802    // All dynamic fields are of type:
803    //   - Field<Name, Value> for dynamic fields
804    //   - Field<Wrapper<Name, ID>> for dynamic field objects where the ID is the id
805    //     of the pointed
806    //   to object
807    //
808    if !move_object.type_().is_dynamic_field() {
809        return Ok(None);
810    }
811
812    let layout = resolver
813        .get_annotated_layout(&move_object.type_().clone().into())
814        .map_err(StorageError::custom)?
815        .into_layout();
816
817    let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
818        .map_err(StorageError::custom)?;
819
820    let value_metadata = field.value_metadata().map_err(StorageError::custom)?;
821
822    Ok(Some(DynamicFieldIndexInfo {
823        name_type: field.name_layout.into(),
824        name_value: field.name_bytes.to_owned(),
825        dynamic_field_type: field.kind,
826        dynamic_object_id: if let DFV::ValueMetadata::DynamicObjectField(id) = value_metadata {
827            Some(id)
828        } else {
829            None
830        },
831    }))
832}
833
834fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
835    use iota_types::coin::{CoinMetadata, TreasuryCap};
836
837    object
838        .type_()
839        .and_then(MoveObjectType::other)
840        .and_then(|object_type| {
841            CoinMetadata::is_coin_metadata_with_coin_type(object_type)
842                .cloned()
843                .map(|coin_type| {
844                    (
845                        CoinIndexKey { coin_type },
846                        CoinIndexInfo {
847                            coin_metadata_object_id: Some(object.id()),
848                            treasury_object_id: None,
849                        },
850                    )
851                })
852                .or_else(|| {
853                    TreasuryCap::is_treasury_with_coin_type(object_type)
854                        .cloned()
855                        .map(|coin_type| {
856                            (
857                                CoinIndexKey { coin_type },
858                                CoinIndexInfo {
859                                    coin_metadata_object_id: None,
860                                    treasury_object_id: Some(object.id()),
861                                },
862                            )
863                        })
864                })
865        })
866}
867
868struct RestParLiveObjectSetIndexer<'a> {
869    tables: &'a IndexStoreTables,
870    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
871    epoch_store: &'a AuthorityPerEpochStore,
872    package_store: &'a Arc<dyn BackingPackageStore + Send + Sync>,
873}
874
875struct RestLiveObjectIndexer<'a> {
876    tables: &'a IndexStoreTables,
877    batch: typed_store::rocks::DBBatch,
878    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
879    resolver: Box<dyn LayoutResolver + 'a>,
880}
881
882impl<'a> ParMakeLiveObjectIndexer for RestParLiveObjectSetIndexer<'a> {
883    type ObjectIndexer = RestLiveObjectIndexer<'a>;
884
885    fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
886        RestLiveObjectIndexer {
887            tables: self.tables,
888            batch: self.tables.owner.batch(),
889            coin_index: self.coin_index,
890            resolver: self
891                .epoch_store
892                .executor()
893                .type_layout_resolver(Box::new(self.package_store)),
894        }
895    }
896}
897
898impl LiveObjectIndexer for RestLiveObjectIndexer<'_> {
899    fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
900        match object.owner {
901            // Owner Index
902            Owner::AddressOwner(owner) => {
903                let owner_key = OwnerIndexKey::new(owner, object.id());
904                let owner_info = OwnerIndexInfo::new(&object);
905                self.batch
906                    .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
907            }
908
909            // Dynamic Field Index
910            Owner::ObjectOwner(parent) => {
911                if let Some(field_info) =
912                    try_create_dynamic_field_info(&object, self.resolver.as_mut())?
913                {
914                    let field_key = DynamicFieldKey::new(parent, object.id());
915
916                    self.batch
917                        .insert_batch(&self.tables.dynamic_field, [(field_key, field_info)])?;
918                }
919            }
920
921            Owner::Shared { .. } | Owner::Immutable => {}
922        }
923
924        // Look for CoinMetadata<T> and TreasuryCap<T> objects
925        if let Some((key, value)) = try_create_coin_index_info(&object) {
926            use std::collections::hash_map::Entry;
927
928            match self.coin_index.lock().unwrap().entry(key) {
929                Entry::Occupied(mut o) => {
930                    o.get_mut().merge(value);
931                }
932                Entry::Vacant(v) => {
933                    v.insert(value);
934                }
935            }
936        }
937
938        // If the batch size grows to greater that 128MB then write out to the DB so
939        // that the data we need to hold in memory doesn't grown unbounded.
940        if self.batch.size_in_bytes() >= 1 << 27 {
941            std::mem::replace(&mut self.batch, self.tables.owner.batch()).write()?;
942        }
943
944        Ok(())
945    }
946
947    fn finish(self) -> Result<(), StorageError> {
948        self.batch.write()?;
949        Ok(())
950    }
951}
952
953// TODO figure out a way to dedup this logic. Today we'd need to do quite a bit
954// of refactoring to make it possible.
955//
956// Load a CheckpointData struct without event data
957fn sparse_checkpoint_data_for_backfill(
958    authority_store: &AuthorityStore,
959    checkpoint_store: &CheckpointStore,
960    checkpoint: u64,
961) -> Result<CheckpointData, StorageError> {
962    use iota_types::full_checkpoint_content::CheckpointTransaction;
963
964    let summary = checkpoint_store
965        .get_checkpoint_by_sequence_number(checkpoint)?
966        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
967    let contents = checkpoint_store
968        .get_checkpoint_contents(&summary.content_digest)?
969        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
970
971    let transaction_digests = contents
972        .iter()
973        .map(|execution_digests| execution_digests.transaction)
974        .collect::<Vec<_>>();
975    let transactions = authority_store
976        .multi_get_transaction_blocks(&transaction_digests)?
977        .into_iter()
978        .map(|maybe_transaction| {
979            maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
980        })
981        .collect::<Result<Vec<_>, _>>()?;
982
983    let effects = authority_store
984        .multi_get_executed_effects(&transaction_digests)?
985        .into_iter()
986        .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
987        .collect::<Result<Vec<_>, _>>()?;
988
989    let mut full_transactions = Vec::with_capacity(transactions.len());
990    for (tx, fx) in transactions.into_iter().zip(effects) {
991        let input_objects =
992            iota_types::storage::get_transaction_input_objects(authority_store, &fx)?;
993        let output_objects =
994            iota_types::storage::get_transaction_output_objects(authority_store, &fx)?;
995
996        let full_transaction = CheckpointTransaction {
997            transaction: tx.into(),
998            effects: fx,
999            events: None,
1000            input_objects,
1001            output_objects,
1002        };
1003
1004        full_transactions.push(full_transaction);
1005    }
1006
1007    let checkpoint_data = CheckpointData {
1008        checkpoint_summary: summary.into(),
1009        checkpoint_contents: contents,
1010        transactions: full_transactions,
1011    };
1012
1013    Ok(checkpoint_data)
1014}