1use std::{
6 collections::HashMap,
7 path::PathBuf,
8 sync::{Arc, Mutex},
9 time::Instant,
10};
11
12use iota_types::{
13 base_types::{IotaAddress, MoveObjectType, ObjectID, SequenceNumber},
14 digests::TransactionDigest,
15 dynamic_field::visitor as DFV,
16 full_checkpoint_content::CheckpointData,
17 layout_resolver::LayoutResolver,
18 messages_checkpoint::CheckpointContents,
19 object::{Object, Owner},
20 storage::{
21 BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, error::Error as StorageError,
22 },
23};
24use move_core_types::language_storage::StructTag;
25use rayon::iter::{IntoParallelIterator, ParallelIterator};
26use serde::{Deserialize, Serialize};
27use tracing::{debug, info};
28use typed_store::{
29 DBMapUtils, TypedStoreError,
30 rocks::{DBMap, MetricConf},
31 traits::{Map, TableSummary, TypedStoreDebug},
32};
33
34use crate::{
35 authority::{
36 AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore,
37 authority_store_tables::LiveObject,
38 },
39 checkpoints::CheckpointStore,
40};
41
42const CURRENT_DB_VERSION: u64 = 0;
43
44#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
45struct MetadataInfo {
46 version: u64,
48}
49
50#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
51pub struct OwnerIndexKey {
52 pub owner: IotaAddress,
53 pub object_id: ObjectID,
54}
55
56impl OwnerIndexKey {
57 fn new(owner: IotaAddress, object_id: ObjectID) -> Self {
58 Self { owner, object_id }
59 }
60}
61
62#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
63pub struct OwnerIndexInfo {
64 pub version: SequenceNumber,
66 pub type_: MoveObjectType,
67}
68
69impl OwnerIndexInfo {
70 pub fn new(object: &Object) -> Self {
71 Self {
72 version: object.version(),
73 type_: object.type_().expect("packages cannot be owned").to_owned(),
74 }
75 }
76}
77
78#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
79pub struct TransactionInfo {
80 pub checkpoint: u64,
81}
82
83#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
84pub struct CoinIndexKey {
85 coin_type: StructTag,
86}
87
88#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
89pub struct CoinIndexInfo {
90 pub coin_metadata_object_id: Option<ObjectID>,
91 pub treasury_object_id: Option<ObjectID>,
92}
93
94impl CoinIndexInfo {
95 fn merge(self, other: Self) -> Self {
96 Self {
97 coin_metadata_object_id: self
98 .coin_metadata_object_id
99 .or(other.coin_metadata_object_id),
100 treasury_object_id: self.treasury_object_id.or(other.treasury_object_id),
101 }
102 }
103}
104
105#[derive(DBMapUtils)]
115struct IndexStoreTables {
116 meta: DBMap<(), MetadataInfo>,
124
125 transactions: DBMap<TransactionDigest, TransactionInfo>,
130
131 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
136
137 dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
142
143 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
148 }
152
153impl IndexStoreTables {
154 fn open<P: Into<PathBuf>>(path: P) -> Self {
155 IndexStoreTables::open_tables_read_write(
156 path.into(),
157 MetricConf::new("rest-index"),
158 None,
159 None,
160 )
161 }
162
163 fn needs_to_do_initialization(&self) -> bool {
164 match self.meta.get(&()) {
165 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
166 Ok(None) => true,
167 Err(_) => true,
168 }
169 }
170
171 fn needs_to_delete_old_db(&self) -> bool {
172 match self.meta.get(&()) {
173 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
174 Ok(None) => false,
175 Err(_) => true,
176 }
177 }
178
179 fn init(
180 &mut self,
181 authority_store: &AuthorityStore,
182 checkpoint_store: &CheckpointStore,
183 epoch_store: &AuthorityPerEpochStore,
184 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
185 ) -> Result<(), StorageError> {
186 info!("Initializing REST indexes");
187
188 if let Some(highest_executed_checkpoint) =
191 checkpoint_store.get_highest_executed_checkpoint_seq_number()?
192 {
193 let lowest_available_checkpoint = checkpoint_store
194 .get_highest_pruned_checkpoint_seq_number()?
195 .saturating_add(1);
196
197 let checkpoint_range = lowest_available_checkpoint..=highest_executed_checkpoint;
198
199 info!(
200 "Indexing {} checkpoints in range {checkpoint_range:?}",
201 checkpoint_range.size_hint().0
202 );
203 let start_time = Instant::now();
204
205 checkpoint_range.into_par_iter().try_for_each(|seq| {
206 let checkpoint = checkpoint_store
207 .get_checkpoint_by_sequence_number(seq)?
208 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
209 let contents = checkpoint_store
210 .get_checkpoint_contents(&checkpoint.content_digest)?
211 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
212
213 let info = TransactionInfo {
214 checkpoint: checkpoint.sequence_number,
215 };
216
217 self.transactions
218 .multi_insert(contents.iter().map(|digests| (digests.transaction, info)))
219 .map_err(StorageError::from)
220 })?;
221
222 info!(
223 "Indexing checkpoints took {} seconds",
224 start_time.elapsed().as_secs()
225 );
226 }
227
228 let coin_index = Mutex::new(HashMap::new());
229
230 info!("Indexing Live Object Set");
231 let start_time = Instant::now();
232 std::thread::scope(|s| -> Result<(), StorageError> {
233 let mut threads = Vec::new();
234 const BITS: u8 = 5;
235 for index in 0u8..(1 << BITS) {
236 let this = &self;
237 let coin_index = &coin_index;
238 threads.push(s.spawn(move || {
239 this.live_object_set_index_task(
240 index,
241 BITS,
242 authority_store,
243 coin_index,
244 epoch_store,
245 package_store,
246 )
247 }));
248 }
249
250 for thread in threads {
252 thread.join().unwrap()?;
253 }
254
255 Ok(())
256 })?;
257
258 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
259
260 info!(
261 "Indexing Live Object Set took {} seconds",
262 start_time.elapsed().as_secs()
263 );
264
265 self.meta.insert(
266 &(),
267 &MetadataInfo {
268 version: CURRENT_DB_VERSION,
269 },
270 )?;
271
272 info!("Finished initializing REST indexes");
273
274 Ok(())
275 }
276
277 fn live_object_set_index_task(
278 &self,
279 task_id: u8,
280 bits: u8,
281 authority_store: &AuthorityStore,
282 coin_index: &Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
283 epoch_store: &AuthorityPerEpochStore,
284 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
285 ) -> Result<(), StorageError> {
286 let mut id_bytes = [0; ObjectID::LENGTH];
287 id_bytes[0] = task_id << (8 - bits);
288 let start_id = ObjectID::new(id_bytes);
289
290 id_bytes[0] |= (1 << (8 - bits)) - 1;
291 for element in id_bytes.iter_mut().skip(1) {
292 *element = u8::MAX;
293 }
294 let end_id = ObjectID::new(id_bytes);
295
296 let mut resolver = epoch_store
297 .executor()
298 .type_layout_resolver(Box::new(package_store));
299 let mut batch = self.owner.batch();
300 let mut object_scanned: u64 = 0;
301 for object in authority_store
302 .perpetual_tables
303 .range_iter_live_object_set(Some(start_id), Some(end_id))
304 .filter_map(LiveObject::to_normal)
305 {
306 object_scanned += 1;
307 if object_scanned % 2_000_000 == 0 {
308 info!(
309 "[Index] Task {}: object scanned: {}",
310 task_id, object_scanned
311 );
312 }
313 match object.owner {
314 Owner::AddressOwner(owner) => {
316 let owner_key = OwnerIndexKey::new(owner, object.id());
317 let owner_info = OwnerIndexInfo::new(&object);
318 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
319 }
320
321 Owner::ObjectOwner(parent) => {
323 if let Some(field_info) =
324 try_create_dynamic_field_info(&object, resolver.as_mut())?
325 {
326 let field_key = DynamicFieldKey::new(parent, object.id());
327
328 batch.insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
329 }
330 }
331
332 Owner::Shared { .. } | Owner::Immutable => {}
333 }
334
335 if let Some((key, value)) = try_create_coin_index_info(&object) {
337 use std::collections::hash_map::Entry;
338
339 match coin_index.lock().unwrap().entry(key) {
340 Entry::Occupied(o) => {
341 let (key, v) = o.remove_entry();
342 let value = value.merge(v);
343 batch.insert_batch(&self.coin, [(key, value)])?;
344 }
345 Entry::Vacant(v) => {
346 v.insert(value);
347 }
348 }
349 }
350
351 if batch.size_in_bytes() >= 1 << 28 {
354 batch.write()?;
355 batch = self.owner.batch();
356 }
357 }
358
359 batch.write()?;
360 Ok(())
361 }
362
363 fn prune(
365 &self,
366 checkpoint_contents_to_prune: &[CheckpointContents],
367 ) -> Result<(), TypedStoreError> {
368 let mut batch = self.transactions.batch();
369
370 let transactions_to_prune = checkpoint_contents_to_prune
371 .iter()
372 .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
373
374 batch.delete_batch(&self.transactions, transactions_to_prune)?;
375
376 batch.write()
377 }
378
379 fn index_checkpoint(
381 &self,
382 checkpoint: &CheckpointData,
383 resolver: &mut dyn LayoutResolver,
384 ) -> Result<(), StorageError> {
385 debug!(
386 checkpoint = checkpoint.checkpoint_summary.sequence_number,
387 "indexing checkpoint"
388 );
389
390 let mut batch = self.transactions.batch();
391
392 {
394 let info = TransactionInfo {
395 checkpoint: checkpoint.checkpoint_summary.sequence_number,
396 };
397
398 batch.insert_batch(
399 &self.transactions,
400 checkpoint
401 .checkpoint_contents
402 .iter()
403 .map(|digests| (digests.transaction, info)),
404 )?;
405 }
406
407 {
409 let mut coin_index = HashMap::new();
410
411 for tx in &checkpoint.transactions {
412 for removed_object in tx.removed_objects_pre_version() {
414 match removed_object.owner() {
415 Owner::AddressOwner(address) => {
416 let owner_key = OwnerIndexKey::new(*address, removed_object.id());
417 batch.delete_batch(&self.owner, [owner_key])?;
418 }
419 Owner::ObjectOwner(object_id) => {
420 batch.delete_batch(
421 &self.dynamic_field,
422 [DynamicFieldKey::new(*object_id, removed_object.id())],
423 )?;
424 }
425 Owner::Shared { .. } | Owner::Immutable => {}
426 }
427 }
428
429 for (object, old_object) in tx.changed_objects() {
431 if let Some(old_object) = old_object {
432 if old_object.owner() != object.owner() {
433 match old_object.owner() {
434 Owner::AddressOwner(address) => {
435 let owner_key = OwnerIndexKey::new(*address, old_object.id());
436 batch.delete_batch(&self.owner, [owner_key])?;
437 }
438
439 Owner::ObjectOwner(object_id) => {
440 batch.delete_batch(
441 &self.dynamic_field,
442 [DynamicFieldKey::new(*object_id, old_object.id())],
443 )?;
444 }
445
446 Owner::Shared { .. } | Owner::Immutable => {}
447 }
448 }
449 }
450
451 match object.owner() {
452 Owner::AddressOwner(owner) => {
453 let owner_key = OwnerIndexKey::new(*owner, object.id());
454 let owner_info = OwnerIndexInfo::new(object);
455 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
456 }
457 Owner::ObjectOwner(parent) => {
458 if let Some(field_info) =
459 try_create_dynamic_field_info(object, resolver)?
460 {
461 let field_key = DynamicFieldKey::new(*parent, object.id());
462
463 batch
464 .insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
465 }
466 }
467 Owner::Shared { .. } | Owner::Immutable => {}
468 }
469 }
470
471 for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
478 use std::collections::hash_map::Entry;
479
480 match coin_index.entry(key) {
481 Entry::Occupied(o) => {
482 let (key, v) = o.remove_entry();
483 let value = value.merge(v);
484 batch.insert_batch(&self.coin, [(key, value)])?;
485 }
486 Entry::Vacant(v) => {
487 v.insert(value);
488 }
489 }
490 }
491 }
492
493 batch.insert_batch(&self.coin, coin_index)?;
494 }
495
496 batch.write()?;
497
498 debug!(
499 checkpoint = checkpoint.checkpoint_summary.sequence_number,
500 "finished indexing checkpoint"
501 );
502 Ok(())
503 }
504
505 fn get_transaction_info(
506 &self,
507 digest: &TransactionDigest,
508 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
509 self.transactions.get(digest)
510 }
511
512 fn owner_iter(
513 &self,
514 owner: IotaAddress,
515 cursor: Option<ObjectID>,
516 ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
517 let lower_bound = OwnerIndexKey::new(owner, ObjectID::ZERO);
518 let upper_bound = OwnerIndexKey::new(owner, ObjectID::MAX);
519 let mut iter = self
520 .owner
521 .iter_with_bounds(Some(lower_bound), Some(upper_bound));
522
523 if let Some(cursor) = cursor {
524 iter = iter.skip_to(&OwnerIndexKey::new(owner, cursor))?;
525 }
526
527 Ok(iter)
528 }
529
530 fn dynamic_field_iter(
531 &self,
532 parent: ObjectID,
533 cursor: Option<ObjectID>,
534 ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
535 {
536 let lower_bound = DynamicFieldKey::new(parent, ObjectID::ZERO);
537 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
538 let mut iter = self
539 .dynamic_field
540 .iter_with_bounds(Some(lower_bound), Some(upper_bound));
541
542 if let Some(cursor) = cursor {
543 iter = iter.skip_to(&DynamicFieldKey::new(parent, cursor))?;
544 }
545
546 Ok(iter)
547 }
548
549 fn get_coin_info(
550 &self,
551 coin_type: &StructTag,
552 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
553 let key = CoinIndexKey {
554 coin_type: coin_type.to_owned(),
555 };
556 self.coin.get(&key)
557 }
558}
559
560pub struct RestIndexStore {
561 tables: IndexStoreTables,
562}
563
564impl RestIndexStore {
565 pub fn new(
566 path: PathBuf,
567 authority_store: &AuthorityStore,
568 checkpoint_store: &CheckpointStore,
569 epoch_store: &AuthorityPerEpochStore,
570 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
571 ) -> Self {
572 let tables = {
573 let tables = IndexStoreTables::open(&path);
574
575 if tables.needs_to_do_initialization() {
578 let mut tables = if tables.needs_to_delete_old_db() {
579 drop(tables);
580 typed_store::rocks::safe_drop_db(path.clone())
581 .expect("unable to destroy old rest-index db");
582 IndexStoreTables::open(path)
583 } else {
584 tables
585 };
586
587 tables
588 .init(
589 authority_store,
590 checkpoint_store,
591 epoch_store,
592 package_store,
593 )
594 .expect("unable to initialize rest index from live object set");
595 tables
596 } else {
597 tables
598 }
599 };
600
601 Self { tables }
602 }
603
604 pub fn new_without_init(path: PathBuf) -> Self {
605 let tables = IndexStoreTables::open(path);
606
607 Self { tables }
608 }
609
610 pub fn prune(
611 &self,
612 checkpoint_contents_to_prune: &[CheckpointContents],
613 ) -> Result<(), TypedStoreError> {
614 self.tables.prune(checkpoint_contents_to_prune)
615 }
616
617 pub fn index_checkpoint(
618 &self,
619 checkpoint: &CheckpointData,
620 resolver: &mut dyn LayoutResolver,
621 ) -> Result<(), StorageError> {
622 self.tables.index_checkpoint(checkpoint, resolver)
623 }
624
625 pub fn get_transaction_info(
626 &self,
627 digest: &TransactionDigest,
628 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
629 self.tables.get_transaction_info(digest)
630 }
631
632 pub fn owner_iter(
633 &self,
634 owner: IotaAddress,
635 cursor: Option<ObjectID>,
636 ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
637 self.tables.owner_iter(owner, cursor)
638 }
639
640 pub fn dynamic_field_iter(
641 &self,
642 parent: ObjectID,
643 cursor: Option<ObjectID>,
644 ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
645 {
646 self.tables.dynamic_field_iter(parent, cursor)
647 }
648
649 pub fn get_coin_info(
650 &self,
651 coin_type: &StructTag,
652 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
653 self.tables.get_coin_info(coin_type)
654 }
655}
656
657fn try_create_dynamic_field_info(
658 object: &Object,
659 resolver: &mut dyn LayoutResolver,
660) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
661 let Some(move_object) = object.data.try_as_move() else {
663 return Ok(None);
664 };
665
666 if !move_object.type_().is_dynamic_field() {
675 return Ok(None);
676 }
677
678 let layout = resolver
679 .get_annotated_layout(&move_object.type_().clone().into())
680 .map_err(StorageError::custom)?
681 .into_layout();
682
683 let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
684 .map_err(StorageError::custom)?;
685
686 let value_metadata = field.value_metadata().map_err(StorageError::custom)?;
687
688 Ok(Some(DynamicFieldIndexInfo {
689 name_type: field.name_layout.into(),
690 name_value: field.name_bytes.to_owned(),
691 dynamic_field_type: field.kind,
692 dynamic_object_id: if let DFV::ValueMetadata::DynamicObjectField(id) = value_metadata {
693 Some(id)
694 } else {
695 None
696 },
697 }))
698}
699
700fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
701 use iota_types::coin::{CoinMetadata, TreasuryCap};
702
703 object
704 .type_()
705 .and_then(MoveObjectType::other)
706 .and_then(|object_type| {
707 CoinMetadata::is_coin_metadata_with_coin_type(object_type)
708 .cloned()
709 .map(|coin_type| {
710 (
711 CoinIndexKey { coin_type },
712 CoinIndexInfo {
713 coin_metadata_object_id: Some(object.id()),
714 treasury_object_id: None,
715 },
716 )
717 })
718 .or_else(|| {
719 TreasuryCap::is_treasury_with_coin_type(object_type)
720 .cloned()
721 .map(|coin_type| {
722 (
723 CoinIndexKey { coin_type },
724 CoinIndexInfo {
725 coin_metadata_object_id: None,
726 treasury_object_id: Some(object.id()),
727 },
728 )
729 })
730 })
731 })
732}