1use std::{
6 collections::HashMap,
7 path::PathBuf,
8 sync::{Arc, Mutex},
9 time::Instant,
10};
11
12use iota_types::{
13 base_types::{IotaAddress, MoveObjectType, ObjectID, SequenceNumber},
14 digests::TransactionDigest,
15 dynamic_field::visitor as DFV,
16 full_checkpoint_content::CheckpointData,
17 layout_resolver::LayoutResolver,
18 messages_checkpoint::CheckpointContents,
19 object::{Object, Owner},
20 storage::{
21 BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, error::Error as StorageError,
22 },
23};
24use move_core_types::language_storage::StructTag;
25use rayon::iter::{IntoParallelIterator, ParallelIterator};
26use serde::{Deserialize, Serialize};
27use tracing::{debug, info};
28use typed_store::{
29 DBMapUtils, TypedStoreError,
30 rocks::{DBMap, MetricConf},
31 traits::{Map, TableSummary, TypedStoreDebug},
32};
33
34use crate::{
35 authority::{
36 AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore,
37 authority_store_tables::LiveObject,
38 },
39 checkpoints::CheckpointStore,
40};
41
42const CURRENT_DB_VERSION: u64 = 0;
43
44#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
45struct MetadataInfo {
46 version: u64,
48}
49
50#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
51pub struct OwnerIndexKey {
52 pub owner: IotaAddress,
53 pub object_id: ObjectID,
54}
55
56impl OwnerIndexKey {
57 fn new(owner: IotaAddress, object_id: ObjectID) -> Self {
58 Self { owner, object_id }
59 }
60}
61
62#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
63pub struct OwnerIndexInfo {
64 pub version: SequenceNumber,
66 pub type_: MoveObjectType,
67}
68
69impl OwnerIndexInfo {
70 pub fn new(object: &Object) -> Self {
71 Self {
72 version: object.version(),
73 type_: object.type_().expect("packages cannot be owned").to_owned(),
74 }
75 }
76}
77
78#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
79pub struct TransactionInfo {
80 pub checkpoint: u64,
81}
82
83#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
84pub struct CoinIndexKey {
85 coin_type: StructTag,
86}
87
88#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
89pub struct CoinIndexInfo {
90 pub coin_metadata_object_id: Option<ObjectID>,
91 pub treasury_object_id: Option<ObjectID>,
92}
93
94impl CoinIndexInfo {
95 fn merge(self, other: Self) -> Self {
96 Self {
97 coin_metadata_object_id: self
98 .coin_metadata_object_id
99 .or(other.coin_metadata_object_id),
100 treasury_object_id: self.treasury_object_id.or(other.treasury_object_id),
101 }
102 }
103}
104
105#[derive(DBMapUtils)]
115struct IndexStoreTables {
116 meta: DBMap<(), MetadataInfo>,
124
125 transactions: DBMap<TransactionDigest, TransactionInfo>,
130
131 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
136
137 dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
142
143 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
148 }
152
153impl IndexStoreTables {
154 fn open<P: Into<PathBuf>>(path: P) -> Self {
155 IndexStoreTables::open_tables_read_write(
156 path.into(),
157 MetricConf::new("rest-index"),
158 None,
159 None,
160 )
161 }
162
163 fn needs_to_do_initialization(&self) -> bool {
164 match self.meta.get(&()) {
165 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
166 Ok(None) => true,
167 Err(_) => true,
168 }
169 }
170
171 fn needs_to_delete_old_db(&self) -> bool {
172 match self.meta.get(&()) {
173 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
174 Ok(None) => false,
175 Err(_) => true,
176 }
177 }
178
179 fn init(
180 &mut self,
181 authority_store: &AuthorityStore,
182 checkpoint_store: &CheckpointStore,
183 epoch_store: &AuthorityPerEpochStore,
184 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
185 ) -> Result<(), StorageError> {
186 info!("Initializing REST indexes");
187
188 if let Some(highest_executed_checkpoint) =
191 checkpoint_store.get_highest_executed_checkpoint_seq_number()?
192 {
193 let lowest_available_checkpoint = checkpoint_store
194 .get_highest_pruned_checkpoint_seq_number()?
195 .saturating_add(1);
196
197 let checkpoint_range = lowest_available_checkpoint..=highest_executed_checkpoint;
198
199 info!(
200 "Indexing {} checkpoints in range {checkpoint_range:?}",
201 checkpoint_range.size_hint().0
202 );
203 let start_time = Instant::now();
204
205 checkpoint_range.into_par_iter().try_for_each(|seq| {
206 let checkpoint = checkpoint_store
207 .get_checkpoint_by_sequence_number(seq)?
208 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
209 let contents = checkpoint_store
210 .get_checkpoint_contents(&checkpoint.content_digest)?
211 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
212
213 let info = TransactionInfo {
214 checkpoint: checkpoint.sequence_number,
215 };
216
217 self.transactions
218 .multi_insert(contents.iter().map(|digests| (digests.transaction, info)))
219 .map_err(StorageError::from)
220 })?;
221
222 info!(
223 "Indexing checkpoints took {} seconds",
224 start_time.elapsed().as_secs()
225 );
226 }
227
228 let coin_index = Mutex::new(HashMap::new());
229
230 info!("Indexing Live Object Set");
231 let start_time = Instant::now();
232 std::thread::scope(|s| -> Result<(), StorageError> {
233 let mut threads = Vec::new();
234 const BITS: u8 = 5;
235 for index in 0u8..(1 << BITS) {
236 let this = &self;
237 let coin_index = &coin_index;
238 threads.push(s.spawn(move || {
239 this.live_object_set_index_task(
240 index,
241 BITS,
242 authority_store,
243 coin_index,
244 epoch_store,
245 package_store,
246 )
247 }));
248 }
249
250 for thread in threads {
252 thread.join().unwrap()?;
253 }
254
255 Ok(())
256 })?;
257
258 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
259
260 info!(
261 "Indexing Live Object Set took {} seconds",
262 start_time.elapsed().as_secs()
263 );
264
265 self.meta.insert(
266 &(),
267 &MetadataInfo {
268 version: CURRENT_DB_VERSION,
269 },
270 )?;
271
272 info!("Finished initializing REST indexes");
273
274 Ok(())
275 }
276
277 fn live_object_set_index_task(
278 &self,
279 task_id: u8,
280 bits: u8,
281 authority_store: &AuthorityStore,
282 coin_index: &Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
283 epoch_store: &AuthorityPerEpochStore,
284 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
285 ) -> Result<(), StorageError> {
286 let mut id_bytes = [0; ObjectID::LENGTH];
287 id_bytes[0] = task_id << (8 - bits);
288 let start_id = ObjectID::new(id_bytes);
289
290 id_bytes[0] |= (1 << (8 - bits)) - 1;
291 for element in id_bytes.iter_mut().skip(1) {
292 *element = u8::MAX;
293 }
294 let end_id = ObjectID::new(id_bytes);
295
296 let mut resolver = epoch_store
297 .executor()
298 .type_layout_resolver(Box::new(package_store));
299 let mut batch = self.owner.batch();
300 let mut object_scanned: u64 = 0;
301 for object in authority_store
302 .perpetual_tables
303 .range_iter_live_object_set(Some(start_id), Some(end_id))
304 .filter_map(LiveObject::to_normal)
305 {
306 object_scanned += 1;
307 if object_scanned % 2_000_000 == 0 {
308 info!(
309 "[Index] Task {}: object scanned: {}",
310 task_id, object_scanned
311 );
312 }
313 match object.owner {
314 Owner::AddressOwner(owner) => {
316 let owner_key = OwnerIndexKey::new(owner, object.id());
317 let owner_info = OwnerIndexInfo::new(&object);
318 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
319 }
320
321 Owner::ObjectOwner(parent) => {
323 if let Some(field_info) =
324 try_create_dynamic_field_info(&object, resolver.as_mut())?
325 {
326 let field_key = DynamicFieldKey::new(parent, object.id());
327
328 batch.insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
329 }
330 }
331
332 Owner::Shared { .. } | Owner::Immutable => {}
333 }
334
335 if let Some((key, value)) = try_create_coin_index_info(&object) {
337 use std::collections::hash_map::Entry;
338
339 match coin_index.lock().unwrap().entry(key) {
340 Entry::Occupied(o) => {
341 let (key, v) = o.remove_entry();
342 let value = value.merge(v);
343 batch.insert_batch(&self.coin, [(key, value)])?;
344 }
345 Entry::Vacant(v) => {
346 v.insert(value);
347 }
348 }
349 }
350
351 if batch.size_in_bytes() >= 1 << 28 {
354 batch.write()?;
355 batch = self.owner.batch();
356 }
357 }
358
359 batch.write()?;
360 Ok(())
361 }
362
363 fn prune(
365 &self,
366 checkpoint_contents_to_prune: &[CheckpointContents],
367 ) -> Result<(), TypedStoreError> {
368 let mut batch = self.transactions.batch();
369
370 let transactions_to_prune = checkpoint_contents_to_prune
371 .iter()
372 .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
373
374 batch.delete_batch(&self.transactions, transactions_to_prune)?;
375
376 batch.write()
377 }
378
379 fn index_checkpoint(
381 &self,
382 checkpoint: &CheckpointData,
383 resolver: &mut dyn LayoutResolver,
384 ) -> Result<(), StorageError> {
385 debug!(
386 checkpoint = checkpoint.checkpoint_summary.sequence_number,
387 "indexing checkpoint"
388 );
389
390 let mut batch = self.transactions.batch();
391
392 {
394 let info = TransactionInfo {
395 checkpoint: checkpoint.checkpoint_summary.sequence_number,
396 };
397
398 batch.insert_batch(
399 &self.transactions,
400 checkpoint
401 .checkpoint_contents
402 .iter()
403 .map(|digests| (digests.transaction, info)),
404 )?;
405 }
406
407 {
409 let mut coin_index = HashMap::new();
410
411 for tx in &checkpoint.transactions {
412 for removed_object in tx.removed_objects_pre_version() {
414 match removed_object.owner() {
415 Owner::AddressOwner(address) => {
416 let owner_key = OwnerIndexKey::new(*address, removed_object.id());
417 batch.delete_batch(&self.owner, [owner_key])?;
418 }
419 Owner::ObjectOwner(object_id) => {
420 batch.delete_batch(
421 &self.dynamic_field,
422 [DynamicFieldKey::new(*object_id, removed_object.id())],
423 )?;
424 }
425 Owner::Shared { .. } | Owner::Immutable => {}
426 }
427 }
428
429 for (object, old_object) in tx.changed_objects() {
431 if let Some(old_object) = old_object {
432 if old_object.owner() != object.owner() {
433 match old_object.owner() {
434 Owner::AddressOwner(address) => {
435 let owner_key = OwnerIndexKey::new(*address, old_object.id());
436 batch.delete_batch(&self.owner, [owner_key])?;
437 }
438
439 Owner::ObjectOwner(object_id) => {
440 batch.delete_batch(
441 &self.dynamic_field,
442 [DynamicFieldKey::new(*object_id, old_object.id())],
443 )?;
444 }
445
446 Owner::Shared { .. } | Owner::Immutable => {}
447 }
448 }
449 }
450
451 match object.owner() {
452 Owner::AddressOwner(owner) => {
453 let owner_key = OwnerIndexKey::new(*owner, object.id());
454 let owner_info = OwnerIndexInfo::new(object);
455 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
456 }
457 Owner::ObjectOwner(parent) => {
458 if let Some(field_info) =
459 try_create_dynamic_field_info(object, resolver)
460 .ok()
461 .flatten()
462 {
463 let field_key = DynamicFieldKey::new(*parent, object.id());
464
465 batch
466 .insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
467 }
468 }
469 Owner::Shared { .. } | Owner::Immutable => {}
470 }
471 }
472
473 for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
480 use std::collections::hash_map::Entry;
481
482 match coin_index.entry(key) {
483 Entry::Occupied(o) => {
484 let (key, v) = o.remove_entry();
485 let value = value.merge(v);
486 batch.insert_batch(&self.coin, [(key, value)])?;
487 }
488 Entry::Vacant(v) => {
489 v.insert(value);
490 }
491 }
492 }
493 }
494
495 batch.insert_batch(&self.coin, coin_index)?;
496 }
497
498 batch.write()?;
499
500 debug!(
501 checkpoint = checkpoint.checkpoint_summary.sequence_number,
502 "finished indexing checkpoint"
503 );
504 Ok(())
505 }
506
507 fn get_transaction_info(
508 &self,
509 digest: &TransactionDigest,
510 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
511 self.transactions.get(digest)
512 }
513
514 fn owner_iter(
515 &self,
516 owner: IotaAddress,
517 cursor: Option<ObjectID>,
518 ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
519 let lower_bound = OwnerIndexKey::new(owner, ObjectID::ZERO);
520 let upper_bound = OwnerIndexKey::new(owner, ObjectID::MAX);
521 let mut iter = self
522 .owner
523 .iter_with_bounds(Some(lower_bound), Some(upper_bound));
524
525 if let Some(cursor) = cursor {
526 iter = iter.skip_to(&OwnerIndexKey::new(owner, cursor))?;
527 }
528
529 Ok(iter)
530 }
531
532 fn dynamic_field_iter(
533 &self,
534 parent: ObjectID,
535 cursor: Option<ObjectID>,
536 ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
537 {
538 let lower_bound = DynamicFieldKey::new(parent, ObjectID::ZERO);
539 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
540 let mut iter = self
541 .dynamic_field
542 .iter_with_bounds(Some(lower_bound), Some(upper_bound));
543
544 if let Some(cursor) = cursor {
545 iter = iter.skip_to(&DynamicFieldKey::new(parent, cursor))?;
546 }
547
548 Ok(iter)
549 }
550
551 fn get_coin_info(
552 &self,
553 coin_type: &StructTag,
554 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
555 let key = CoinIndexKey {
556 coin_type: coin_type.to_owned(),
557 };
558 self.coin.get(&key)
559 }
560}
561
562pub struct RestIndexStore {
563 tables: IndexStoreTables,
564}
565
566impl RestIndexStore {
567 pub fn new(
568 path: PathBuf,
569 authority_store: &AuthorityStore,
570 checkpoint_store: &CheckpointStore,
571 epoch_store: &AuthorityPerEpochStore,
572 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
573 ) -> Self {
574 let tables = {
575 let tables = IndexStoreTables::open(&path);
576
577 if tables.needs_to_do_initialization() {
580 let mut tables = if tables.needs_to_delete_old_db() {
581 drop(tables);
582 typed_store::rocks::safe_drop_db(path.clone())
583 .expect("unable to destroy old rest-index db");
584 IndexStoreTables::open(path)
585 } else {
586 tables
587 };
588
589 tables
590 .init(
591 authority_store,
592 checkpoint_store,
593 epoch_store,
594 package_store,
595 )
596 .expect("unable to initialize rest index from live object set");
597 tables
598 } else {
599 tables
600 }
601 };
602
603 Self { tables }
604 }
605
606 pub fn new_without_init(path: PathBuf) -> Self {
607 let tables = IndexStoreTables::open(path);
608
609 Self { tables }
610 }
611
612 pub fn prune(
613 &self,
614 checkpoint_contents_to_prune: &[CheckpointContents],
615 ) -> Result<(), TypedStoreError> {
616 self.tables.prune(checkpoint_contents_to_prune)
617 }
618
619 pub fn index_checkpoint(
620 &self,
621 checkpoint: &CheckpointData,
622 resolver: &mut dyn LayoutResolver,
623 ) -> Result<(), StorageError> {
624 self.tables.index_checkpoint(checkpoint, resolver)
625 }
626
627 pub fn get_transaction_info(
628 &self,
629 digest: &TransactionDigest,
630 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
631 self.tables.get_transaction_info(digest)
632 }
633
634 pub fn owner_iter(
635 &self,
636 owner: IotaAddress,
637 cursor: Option<ObjectID>,
638 ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
639 self.tables.owner_iter(owner, cursor)
640 }
641
642 pub fn dynamic_field_iter(
643 &self,
644 parent: ObjectID,
645 cursor: Option<ObjectID>,
646 ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
647 {
648 self.tables.dynamic_field_iter(parent, cursor)
649 }
650
651 pub fn get_coin_info(
652 &self,
653 coin_type: &StructTag,
654 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
655 self.tables.get_coin_info(coin_type)
656 }
657}
658
659fn try_create_dynamic_field_info(
660 object: &Object,
661 resolver: &mut dyn LayoutResolver,
662) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
663 let Some(move_object) = object.data.try_as_move() else {
665 return Ok(None);
666 };
667
668 if !move_object.type_().is_dynamic_field() {
677 return Ok(None);
678 }
679
680 let layout = resolver
681 .get_annotated_layout(&move_object.type_().clone().into())
682 .map_err(StorageError::custom)?
683 .into_layout();
684
685 let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
686 .map_err(StorageError::custom)?;
687
688 let value_metadata = field.value_metadata().map_err(StorageError::custom)?;
689
690 Ok(Some(DynamicFieldIndexInfo {
691 name_type: field.name_layout.into(),
692 name_value: field.name_bytes.to_owned(),
693 dynamic_field_type: field.kind,
694 dynamic_object_id: if let DFV::ValueMetadata::DynamicObjectField(id) = value_metadata {
695 Some(id)
696 } else {
697 None
698 },
699 }))
700}
701
702fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
703 use iota_types::coin::{CoinMetadata, TreasuryCap};
704
705 object
706 .type_()
707 .and_then(MoveObjectType::other)
708 .and_then(|object_type| {
709 CoinMetadata::is_coin_metadata_with_coin_type(object_type)
710 .cloned()
711 .map(|coin_type| {
712 (
713 CoinIndexKey { coin_type },
714 CoinIndexInfo {
715 coin_metadata_object_id: Some(object.id()),
716 treasury_object_id: None,
717 },
718 )
719 })
720 .or_else(|| {
721 TreasuryCap::is_treasury_with_coin_type(object_type)
722 .cloned()
723 .map(|coin_type| {
724 (
725 CoinIndexKey { coin_type },
726 CoinIndexInfo {
727 coin_metadata_object_id: None,
728 treasury_object_id: Some(object.id()),
729 },
730 )
731 })
732 })
733 })
734}