iota_core/
rest_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    path::PathBuf,
8    sync::{Arc, Mutex},
9    time::Instant,
10};
11
12use iota_types::{
13    base_types::{IotaAddress, MoveObjectType, ObjectID, SequenceNumber},
14    digests::TransactionDigest,
15    dynamic_field::visitor as DFV,
16    full_checkpoint_content::CheckpointData,
17    layout_resolver::LayoutResolver,
18    messages_checkpoint::CheckpointContents,
19    object::{Object, Owner},
20    storage::{
21        BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, error::Error as StorageError,
22    },
23};
24use move_core_types::language_storage::StructTag;
25use rayon::iter::{IntoParallelIterator, ParallelIterator};
26use serde::{Deserialize, Serialize};
27use tracing::{debug, info};
28use typed_store::{
29    DBMapUtils, TypedStoreError,
30    rocks::{DBMap, MetricConf},
31    traits::{Map, TableSummary, TypedStoreDebug},
32};
33
34use crate::{
35    authority::{
36        AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore,
37        authority_store_tables::LiveObject,
38    },
39    checkpoints::CheckpointStore,
40};
41
42const CURRENT_DB_VERSION: u64 = 0;
43
44#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
45struct MetadataInfo {
46    /// Version of the Database
47    version: u64,
48}
49
50#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
51pub struct OwnerIndexKey {
52    pub owner: IotaAddress,
53    pub object_id: ObjectID,
54}
55
56impl OwnerIndexKey {
57    fn new(owner: IotaAddress, object_id: ObjectID) -> Self {
58        Self { owner, object_id }
59    }
60}
61
62#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
63pub struct OwnerIndexInfo {
64    // object_id of the object is a part of the Key
65    pub version: SequenceNumber,
66    pub type_: MoveObjectType,
67}
68
69impl OwnerIndexInfo {
70    pub fn new(object: &Object) -> Self {
71        Self {
72            version: object.version(),
73            type_: object.type_().expect("packages cannot be owned").to_owned(),
74        }
75    }
76}
77
78#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
79pub struct TransactionInfo {
80    pub checkpoint: u64,
81}
82
83#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
84pub struct CoinIndexKey {
85    coin_type: StructTag,
86}
87
88#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
89pub struct CoinIndexInfo {
90    pub coin_metadata_object_id: Option<ObjectID>,
91    pub treasury_object_id: Option<ObjectID>,
92}
93
94impl CoinIndexInfo {
95    fn merge(self, other: Self) -> Self {
96        Self {
97            coin_metadata_object_id: self
98                .coin_metadata_object_id
99                .or(other.coin_metadata_object_id),
100            treasury_object_id: self.treasury_object_id.or(other.treasury_object_id),
101        }
102    }
103}
104
105/// RocksDB tables for the RestIndexStore
106///
107/// Anytime a new table is added, or and existing one has it's schema changed,
108/// make sure to also update the value of `CURRENT_DB_VERSION`.
109///
110/// NOTE: Authors and Reviewers before adding any new tables ensure that they
111/// are either:
112/// - bounded in size by the live object set
113/// - are prune-able and have corresponding logic in the `prune` function
114#[derive(DBMapUtils)]
115struct IndexStoreTables {
116    /// A singleton that store metadata information on the DB.
117    ///
118    /// A few uses for this singleton:
119    /// - determining if the DB has been initialized (as some tables will still
120    ///   be empty post initialization)
121    /// - version of the DB. Everytime a new table or schema is changed the
122    ///   version number needs to be incremented.
123    meta: DBMap<(), MetadataInfo>,
124
125    /// An index of extra metadata for Transactions.
126    ///
127    /// Only contains entries for transactions which have yet to be pruned from
128    /// the main database.
129    transactions: DBMap<TransactionDigest, TransactionInfo>,
130
131    /// An index of object ownership.
132    ///
133    /// Allows an efficient iterator to list all objects currently owned by a
134    /// specific user account.
135    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
136
137    /// An index of dynamic fields (children objects).
138    ///
139    /// Allows an efficient iterator to list all of the dynamic fields owned by
140    /// a particular ObjectID.
141    dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
142
143    /// An index of Coin Types
144    ///
145    /// Allows looking up information related to published Coins, like the
146    /// ObjectID of its coorisponding CoinMetadata.
147    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
148    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
149    // - bounded in size by the live object set
150    // - are prune-able and have corresponding logic in the `prune` function
151}
152
153impl IndexStoreTables {
154    fn open<P: Into<PathBuf>>(path: P) -> Self {
155        IndexStoreTables::open_tables_read_write(
156            path.into(),
157            MetricConf::new("rest-index"),
158            None,
159            None,
160        )
161    }
162
163    fn needs_to_do_initialization(&self) -> bool {
164        match self.meta.get(&()) {
165            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
166            Ok(None) => true,
167            Err(_) => true,
168        }
169    }
170
171    fn needs_to_delete_old_db(&self) -> bool {
172        match self.meta.get(&()) {
173            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
174            Ok(None) => false,
175            Err(_) => true,
176        }
177    }
178
179    fn init(
180        &mut self,
181        authority_store: &AuthorityStore,
182        checkpoint_store: &CheckpointStore,
183        epoch_store: &AuthorityPerEpochStore,
184        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
185    ) -> Result<(), StorageError> {
186        info!("Initializing REST indexes");
187
188        // Iterate through available, executed checkpoints that have yet to be pruned
189        // to initialize checkpoint and transaction based indexes.
190        if let Some(highest_executed_checkpoint) =
191            checkpoint_store.get_highest_executed_checkpoint_seq_number()?
192        {
193            let lowest_available_checkpoint = checkpoint_store
194                .get_highest_pruned_checkpoint_seq_number()?
195                .saturating_add(1);
196
197            let checkpoint_range = lowest_available_checkpoint..=highest_executed_checkpoint;
198
199            info!(
200                "Indexing {} checkpoints in range {checkpoint_range:?}",
201                checkpoint_range.size_hint().0
202            );
203            let start_time = Instant::now();
204
205            checkpoint_range.into_par_iter().try_for_each(|seq| {
206                let checkpoint = checkpoint_store
207                    .get_checkpoint_by_sequence_number(seq)?
208                    .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
209                let contents = checkpoint_store
210                    .get_checkpoint_contents(&checkpoint.content_digest)?
211                    .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
212
213                let info = TransactionInfo {
214                    checkpoint: checkpoint.sequence_number,
215                };
216
217                self.transactions
218                    .multi_insert(contents.iter().map(|digests| (digests.transaction, info)))
219                    .map_err(StorageError::from)
220            })?;
221
222            info!(
223                "Indexing checkpoints took {} seconds",
224                start_time.elapsed().as_secs()
225            );
226        }
227
228        let coin_index = Mutex::new(HashMap::new());
229
230        info!("Indexing Live Object Set");
231        let start_time = Instant::now();
232        std::thread::scope(|s| -> Result<(), StorageError> {
233            let mut threads = Vec::new();
234            const BITS: u8 = 5;
235            for index in 0u8..(1 << BITS) {
236                let this = &self;
237                let coin_index = &coin_index;
238                threads.push(s.spawn(move || {
239                    this.live_object_set_index_task(
240                        index,
241                        BITS,
242                        authority_store,
243                        coin_index,
244                        epoch_store,
245                        package_store,
246                    )
247                }));
248            }
249
250            // join threads
251            for thread in threads {
252                thread.join().unwrap()?;
253            }
254
255            Ok(())
256        })?;
257
258        self.coin.multi_insert(coin_index.into_inner().unwrap())?;
259
260        info!(
261            "Indexing Live Object Set took {} seconds",
262            start_time.elapsed().as_secs()
263        );
264
265        self.meta.insert(
266            &(),
267            &MetadataInfo {
268                version: CURRENT_DB_VERSION,
269            },
270        )?;
271
272        info!("Finished initializing REST indexes");
273
274        Ok(())
275    }
276
277    fn live_object_set_index_task(
278        &self,
279        task_id: u8,
280        bits: u8,
281        authority_store: &AuthorityStore,
282        coin_index: &Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
283        epoch_store: &AuthorityPerEpochStore,
284        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
285    ) -> Result<(), StorageError> {
286        let mut id_bytes = [0; ObjectID::LENGTH];
287        id_bytes[0] = task_id << (8 - bits);
288        let start_id = ObjectID::new(id_bytes);
289
290        id_bytes[0] |= (1 << (8 - bits)) - 1;
291        for element in id_bytes.iter_mut().skip(1) {
292            *element = u8::MAX;
293        }
294        let end_id = ObjectID::new(id_bytes);
295
296        let mut resolver = epoch_store
297            .executor()
298            .type_layout_resolver(Box::new(package_store));
299        let mut batch = self.owner.batch();
300        let mut object_scanned: u64 = 0;
301        for object in authority_store
302            .perpetual_tables
303            .range_iter_live_object_set(Some(start_id), Some(end_id))
304            .filter_map(LiveObject::to_normal)
305        {
306            object_scanned += 1;
307            if object_scanned % 2_000_000 == 0 {
308                info!(
309                    "[Index] Task {}: object scanned: {}",
310                    task_id, object_scanned
311                );
312            }
313            match object.owner {
314                // Owner Index
315                Owner::AddressOwner(owner) => {
316                    let owner_key = OwnerIndexKey::new(owner, object.id());
317                    let owner_info = OwnerIndexInfo::new(&object);
318                    batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
319                }
320
321                // Dynamic Field Index
322                Owner::ObjectOwner(parent) => {
323                    if let Some(field_info) =
324                        try_create_dynamic_field_info(&object, resolver.as_mut())?
325                    {
326                        let field_key = DynamicFieldKey::new(parent, object.id());
327
328                        batch.insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
329                    }
330                }
331
332                Owner::Shared { .. } | Owner::Immutable => {}
333            }
334
335            // Look for CoinMetadata<T> and TreasuryCap<T> objects
336            if let Some((key, value)) = try_create_coin_index_info(&object) {
337                use std::collections::hash_map::Entry;
338
339                match coin_index.lock().unwrap().entry(key) {
340                    Entry::Occupied(o) => {
341                        let (key, v) = o.remove_entry();
342                        let value = value.merge(v);
343                        batch.insert_batch(&self.coin, [(key, value)])?;
344                    }
345                    Entry::Vacant(v) => {
346                        v.insert(value);
347                    }
348                }
349            }
350
351            // If the batch size grows to greater that 256MB then write out to the DB so
352            // that the data we need to hold in memory doesn't grown unbounded.
353            if batch.size_in_bytes() >= 1 << 28 {
354                batch.write()?;
355                batch = self.owner.batch();
356            }
357        }
358
359        batch.write()?;
360        Ok(())
361    }
362
363    /// Prune data from this Index
364    fn prune(
365        &self,
366        checkpoint_contents_to_prune: &[CheckpointContents],
367    ) -> Result<(), TypedStoreError> {
368        let mut batch = self.transactions.batch();
369
370        let transactions_to_prune = checkpoint_contents_to_prune
371            .iter()
372            .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
373
374        batch.delete_batch(&self.transactions, transactions_to_prune)?;
375
376        batch.write()
377    }
378
379    /// Index a Checkpoint
380    fn index_checkpoint(
381        &self,
382        checkpoint: &CheckpointData,
383        resolver: &mut dyn LayoutResolver,
384    ) -> Result<(), StorageError> {
385        debug!(
386            checkpoint = checkpoint.checkpoint_summary.sequence_number,
387            "indexing checkpoint"
388        );
389
390        let mut batch = self.transactions.batch();
391
392        // transactions index
393        {
394            let info = TransactionInfo {
395                checkpoint: checkpoint.checkpoint_summary.sequence_number,
396            };
397
398            batch.insert_batch(
399                &self.transactions,
400                checkpoint
401                    .checkpoint_contents
402                    .iter()
403                    .map(|digests| (digests.transaction, info)),
404            )?;
405        }
406
407        // object indexes
408        {
409            let mut coin_index = HashMap::new();
410
411            for tx in &checkpoint.transactions {
412                // determine changes from removed objects
413                for removed_object in tx.removed_objects_pre_version() {
414                    match removed_object.owner() {
415                        Owner::AddressOwner(address) => {
416                            let owner_key = OwnerIndexKey::new(*address, removed_object.id());
417                            batch.delete_batch(&self.owner, [owner_key])?;
418                        }
419                        Owner::ObjectOwner(object_id) => {
420                            batch.delete_batch(
421                                &self.dynamic_field,
422                                [DynamicFieldKey::new(*object_id, removed_object.id())],
423                            )?;
424                        }
425                        Owner::Shared { .. } | Owner::Immutable => {}
426                    }
427                }
428
429                // determine changes from changed objects
430                for (object, old_object) in tx.changed_objects() {
431                    if let Some(old_object) = old_object {
432                        if old_object.owner() != object.owner() {
433                            match old_object.owner() {
434                                Owner::AddressOwner(address) => {
435                                    let owner_key = OwnerIndexKey::new(*address, old_object.id());
436                                    batch.delete_batch(&self.owner, [owner_key])?;
437                                }
438
439                                Owner::ObjectOwner(object_id) => {
440                                    batch.delete_batch(
441                                        &self.dynamic_field,
442                                        [DynamicFieldKey::new(*object_id, old_object.id())],
443                                    )?;
444                                }
445
446                                Owner::Shared { .. } | Owner::Immutable => {}
447                            }
448                        }
449                    }
450
451                    match object.owner() {
452                        Owner::AddressOwner(owner) => {
453                            let owner_key = OwnerIndexKey::new(*owner, object.id());
454                            let owner_info = OwnerIndexInfo::new(object);
455                            batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
456                        }
457                        Owner::ObjectOwner(parent) => {
458                            if let Some(field_info) =
459                                try_create_dynamic_field_info(object, resolver)?
460                            {
461                                let field_key = DynamicFieldKey::new(*parent, object.id());
462
463                                batch
464                                    .insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
465                            }
466                        }
467                        Owner::Shared { .. } | Owner::Immutable => {}
468                    }
469                }
470
471                // coin indexing
472                //
473                // coin indexing relies on the fact that CoinMetadata and TreasuryCap are
474                // created in the same transaction so we don't need to worry
475                // about overriding any older value that may exist in the
476                // database (because there necessarily cannot be).
477                for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
478                    use std::collections::hash_map::Entry;
479
480                    match coin_index.entry(key) {
481                        Entry::Occupied(o) => {
482                            let (key, v) = o.remove_entry();
483                            let value = value.merge(v);
484                            batch.insert_batch(&self.coin, [(key, value)])?;
485                        }
486                        Entry::Vacant(v) => {
487                            v.insert(value);
488                        }
489                    }
490                }
491            }
492
493            batch.insert_batch(&self.coin, coin_index)?;
494        }
495
496        batch.write()?;
497
498        debug!(
499            checkpoint = checkpoint.checkpoint_summary.sequence_number,
500            "finished indexing checkpoint"
501        );
502        Ok(())
503    }
504
505    fn get_transaction_info(
506        &self,
507        digest: &TransactionDigest,
508    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
509        self.transactions.get(digest)
510    }
511
512    fn owner_iter(
513        &self,
514        owner: IotaAddress,
515        cursor: Option<ObjectID>,
516    ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
517        let lower_bound = OwnerIndexKey::new(owner, ObjectID::ZERO);
518        let upper_bound = OwnerIndexKey::new(owner, ObjectID::MAX);
519        let mut iter = self
520            .owner
521            .iter_with_bounds(Some(lower_bound), Some(upper_bound));
522
523        if let Some(cursor) = cursor {
524            iter = iter.skip_to(&OwnerIndexKey::new(owner, cursor))?;
525        }
526
527        Ok(iter)
528    }
529
530    fn dynamic_field_iter(
531        &self,
532        parent: ObjectID,
533        cursor: Option<ObjectID>,
534    ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
535    {
536        let lower_bound = DynamicFieldKey::new(parent, ObjectID::ZERO);
537        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
538        let mut iter = self
539            .dynamic_field
540            .iter_with_bounds(Some(lower_bound), Some(upper_bound));
541
542        if let Some(cursor) = cursor {
543            iter = iter.skip_to(&DynamicFieldKey::new(parent, cursor))?;
544        }
545
546        Ok(iter)
547    }
548
549    fn get_coin_info(
550        &self,
551        coin_type: &StructTag,
552    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
553        let key = CoinIndexKey {
554            coin_type: coin_type.to_owned(),
555        };
556        self.coin.get(&key)
557    }
558}
559
560pub struct RestIndexStore {
561    tables: IndexStoreTables,
562}
563
564impl RestIndexStore {
565    pub fn new(
566        path: PathBuf,
567        authority_store: &AuthorityStore,
568        checkpoint_store: &CheckpointStore,
569        epoch_store: &AuthorityPerEpochStore,
570        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
571    ) -> Self {
572        let tables = {
573            let tables = IndexStoreTables::open(&path);
574
575            // If the index tables are uninitialized or on an older version then we need to
576            // populate them
577            if tables.needs_to_do_initialization() {
578                let mut tables = if tables.needs_to_delete_old_db() {
579                    drop(tables);
580                    typed_store::rocks::safe_drop_db(path.clone())
581                        .expect("unable to destroy old rest-index db");
582                    IndexStoreTables::open(path)
583                } else {
584                    tables
585                };
586
587                tables
588                    .init(
589                        authority_store,
590                        checkpoint_store,
591                        epoch_store,
592                        package_store,
593                    )
594                    .expect("unable to initialize rest index from live object set");
595                tables
596            } else {
597                tables
598            }
599        };
600
601        Self { tables }
602    }
603
604    pub fn new_without_init(path: PathBuf) -> Self {
605        let tables = IndexStoreTables::open(path);
606
607        Self { tables }
608    }
609
610    pub fn prune(
611        &self,
612        checkpoint_contents_to_prune: &[CheckpointContents],
613    ) -> Result<(), TypedStoreError> {
614        self.tables.prune(checkpoint_contents_to_prune)
615    }
616
617    pub fn index_checkpoint(
618        &self,
619        checkpoint: &CheckpointData,
620        resolver: &mut dyn LayoutResolver,
621    ) -> Result<(), StorageError> {
622        self.tables.index_checkpoint(checkpoint, resolver)
623    }
624
625    pub fn get_transaction_info(
626        &self,
627        digest: &TransactionDigest,
628    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
629        self.tables.get_transaction_info(digest)
630    }
631
632    pub fn owner_iter(
633        &self,
634        owner: IotaAddress,
635        cursor: Option<ObjectID>,
636    ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
637        self.tables.owner_iter(owner, cursor)
638    }
639
640    pub fn dynamic_field_iter(
641        &self,
642        parent: ObjectID,
643        cursor: Option<ObjectID>,
644    ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
645    {
646        self.tables.dynamic_field_iter(parent, cursor)
647    }
648
649    pub fn get_coin_info(
650        &self,
651        coin_type: &StructTag,
652    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
653        self.tables.get_coin_info(coin_type)
654    }
655}
656
657fn try_create_dynamic_field_info(
658    object: &Object,
659    resolver: &mut dyn LayoutResolver,
660) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
661    // Skip if not a move object
662    let Some(move_object) = object.data.try_as_move() else {
663        return Ok(None);
664    };
665
666    // Skip any objects that aren't of type `Field<Name, Value>`
667    //
668    // All dynamic fields are of type:
669    //   - Field<Name, Value> for dynamic fields
670    //   - Field<Wrapper<Name, ID>> for dynamic field objects where the ID is the id
671    //     of the pointed
672    //   to object
673    //
674    if !move_object.type_().is_dynamic_field() {
675        return Ok(None);
676    }
677
678    let layout = resolver
679        .get_annotated_layout(&move_object.type_().clone().into())
680        .map_err(StorageError::custom)?
681        .into_layout();
682
683    let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
684        .map_err(StorageError::custom)?;
685
686    let value_metadata = field.value_metadata().map_err(StorageError::custom)?;
687
688    Ok(Some(DynamicFieldIndexInfo {
689        name_type: field.name_layout.into(),
690        name_value: field.name_bytes.to_owned(),
691        dynamic_field_type: field.kind,
692        dynamic_object_id: if let DFV::ValueMetadata::DynamicObjectField(id) = value_metadata {
693            Some(id)
694        } else {
695            None
696        },
697    }))
698}
699
700fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
701    use iota_types::coin::{CoinMetadata, TreasuryCap};
702
703    object
704        .type_()
705        .and_then(MoveObjectType::other)
706        .and_then(|object_type| {
707            CoinMetadata::is_coin_metadata_with_coin_type(object_type)
708                .cloned()
709                .map(|coin_type| {
710                    (
711                        CoinIndexKey { coin_type },
712                        CoinIndexInfo {
713                            coin_metadata_object_id: Some(object.id()),
714                            treasury_object_id: None,
715                        },
716                    )
717                })
718                .or_else(|| {
719                    TreasuryCap::is_treasury_with_coin_type(object_type)
720                        .cloned()
721                        .map(|coin_type| {
722                            (
723                                CoinIndexKey { coin_type },
724                                CoinIndexInfo {
725                                    coin_metadata_object_id: None,
726                                    treasury_object_id: Some(object.id()),
727                                },
728                            )
729                        })
730                })
731        })
732}