1use std::{
6 collections::{BTreeMap, HashMap},
7 path::PathBuf,
8 sync::{Arc, Mutex},
9 time::Instant,
10};
11
12use iota_types::{
13 base_types::{IotaAddress, MoveObjectType, ObjectID, SequenceNumber},
14 digests::TransactionDigest,
15 dynamic_field::visitor as DFV,
16 full_checkpoint_content::CheckpointData,
17 layout_resolver::LayoutResolver,
18 messages_checkpoint::CheckpointContents,
19 object::{Object, Owner},
20 storage::{
21 BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, error::Error as StorageError,
22 },
23};
24use move_core_types::language_storage::StructTag;
25use rayon::iter::{IntoParallelIterator, ParallelIterator};
26use serde::{Deserialize, Serialize};
27use tracing::{debug, info};
28use typed_store::{
29 DBMapUtils, TypedStoreError,
30 rocks::{DBMap, MetricConf},
31 traits::{Map, TableSummary, TypedStoreDebug},
32};
33
34use crate::{
35 authority::{AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore},
36 checkpoints::CheckpointStore,
37 par_index_live_object_set::{LiveObjectIndexer, ParMakeLiveObjectIndexer},
38};
39
40const CURRENT_DB_VERSION: u64 = 0;
41
42#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
43struct MetadataInfo {
44 version: u64,
46}
47
48#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
49pub struct OwnerIndexKey {
50 pub owner: IotaAddress,
51 pub object_id: ObjectID,
52}
53
54impl OwnerIndexKey {
55 fn new(owner: IotaAddress, object_id: ObjectID) -> Self {
56 Self { owner, object_id }
57 }
58}
59
60#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
61pub struct OwnerIndexInfo {
62 pub version: SequenceNumber,
64 pub type_: MoveObjectType,
65}
66
67impl OwnerIndexInfo {
68 pub fn new(object: &Object) -> Self {
69 Self {
70 version: object.version(),
71 type_: object.type_().expect("packages cannot be owned").to_owned(),
72 }
73 }
74}
75
76#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
77pub struct TransactionInfo {
78 pub checkpoint: u64,
79}
80
81#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
82pub struct CoinIndexKey {
83 coin_type: StructTag,
84}
85
86#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
87pub struct CoinIndexInfo {
88 pub coin_metadata_object_id: Option<ObjectID>,
89 pub treasury_object_id: Option<ObjectID>,
90}
91
92impl CoinIndexInfo {
93 fn merge(self, other: Self) -> Self {
94 Self {
95 coin_metadata_object_id: self
96 .coin_metadata_object_id
97 .or(other.coin_metadata_object_id),
98 treasury_object_id: self.treasury_object_id.or(other.treasury_object_id),
99 }
100 }
101}
102
103#[derive(DBMapUtils)]
113struct IndexStoreTables {
114 meta: DBMap<(), MetadataInfo>,
122
123 transactions: DBMap<TransactionDigest, TransactionInfo>,
128
129 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
134
135 dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
140
141 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
146 }
150
151impl IndexStoreTables {
152 fn open<P: Into<PathBuf>>(path: P) -> Self {
153 IndexStoreTables::open_tables_read_write(
154 path.into(),
155 MetricConf::new("rest-index"),
156 None,
157 None,
158 )
159 }
160
161 fn needs_to_do_initialization(&self) -> bool {
162 match self.meta.get(&()) {
163 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
164 Ok(None) => true,
165 Err(_) => true,
166 }
167 }
168
169 fn needs_to_delete_old_db(&self) -> bool {
170 match self.meta.get(&()) {
171 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
172 Ok(None) => false,
173 Err(_) => true,
174 }
175 }
176
177 fn init(
178 &mut self,
179 authority_store: &AuthorityStore,
180 checkpoint_store: &CheckpointStore,
181 epoch_store: &AuthorityPerEpochStore,
182 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
183 ) -> Result<(), StorageError> {
184 info!("Initializing REST indexes");
185
186 if let Some(highest_executed_checkpoint) =
189 checkpoint_store.get_highest_executed_checkpoint_seq_number()?
190 {
191 let lowest_available_checkpoint = checkpoint_store
192 .get_highest_pruned_checkpoint_seq_number()?
193 .saturating_add(1);
194
195 let checkpoint_range = lowest_available_checkpoint..=highest_executed_checkpoint;
196
197 info!(
198 "Indexing {} checkpoints in range {checkpoint_range:?}",
199 checkpoint_range.size_hint().0
200 );
201 let start_time = Instant::now();
202
203 checkpoint_range.into_par_iter().try_for_each(|seq| {
204 let checkpoint = checkpoint_store
205 .get_checkpoint_by_sequence_number(seq)?
206 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
207 let contents = checkpoint_store
208 .get_checkpoint_contents(&checkpoint.content_digest)?
209 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
210
211 let info = TransactionInfo {
212 checkpoint: checkpoint.sequence_number,
213 };
214
215 self.transactions
216 .multi_insert(contents.iter().map(|digests| (digests.transaction, info)))
217 .map_err(StorageError::from)
218 })?;
219
220 info!(
221 "Indexing checkpoints took {} seconds",
222 start_time.elapsed().as_secs()
223 );
224 }
225
226 let coin_index = Mutex::new(HashMap::new());
227
228 let make_live_object_indexer = RestParLiveObjectSetIndexer {
229 tables: self,
230 coin_index: &coin_index,
231 epoch_store,
232 package_store,
233 };
234
235 crate::par_index_live_object_set::par_index_live_object_set(
236 authority_store,
237 &make_live_object_indexer,
238 )?;
239
240 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
241
242 self.meta.insert(
243 &(),
244 &MetadataInfo {
245 version: CURRENT_DB_VERSION,
246 },
247 )?;
248
249 info!("Finished initializing REST indexes");
250
251 Ok(())
252 }
253
254 fn prune(
256 &self,
257 checkpoint_contents_to_prune: &[CheckpointContents],
258 ) -> Result<(), TypedStoreError> {
259 let mut batch = self.transactions.batch();
260
261 let transactions_to_prune = checkpoint_contents_to_prune
262 .iter()
263 .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
264
265 batch.delete_batch(&self.transactions, transactions_to_prune)?;
266
267 batch.write()
268 }
269
270 fn index_checkpoint(
272 &self,
273 checkpoint: &CheckpointData,
274 resolver: &mut dyn LayoutResolver,
275 ) -> Result<typed_store::rocks::DBBatch, StorageError> {
276 debug!(
277 checkpoint = checkpoint.checkpoint_summary.sequence_number,
278 "indexing checkpoint"
279 );
280
281 let mut batch = self.transactions.batch();
282
283 {
285 let info = TransactionInfo {
286 checkpoint: checkpoint.checkpoint_summary.sequence_number,
287 };
288
289 batch.insert_batch(
290 &self.transactions,
291 checkpoint
292 .checkpoint_contents
293 .iter()
294 .map(|digests| (digests.transaction, info)),
295 )?;
296 }
297
298 {
300 let mut coin_index = HashMap::new();
301
302 for tx in &checkpoint.transactions {
303 for removed_object in tx.removed_objects_pre_version() {
305 match removed_object.owner() {
306 Owner::AddressOwner(address) => {
307 let owner_key = OwnerIndexKey::new(*address, removed_object.id());
308 batch.delete_batch(&self.owner, [owner_key])?;
309 }
310 Owner::ObjectOwner(object_id) => {
311 batch.delete_batch(
312 &self.dynamic_field,
313 [DynamicFieldKey::new(*object_id, removed_object.id())],
314 )?;
315 }
316 Owner::Shared { .. } | Owner::Immutable => {}
317 }
318 }
319
320 for (object, old_object) in tx.changed_objects() {
322 if let Some(old_object) = old_object {
323 if old_object.owner() != object.owner() {
324 match old_object.owner() {
325 Owner::AddressOwner(address) => {
326 let owner_key = OwnerIndexKey::new(*address, old_object.id());
327 batch.delete_batch(&self.owner, [owner_key])?;
328 }
329
330 Owner::ObjectOwner(object_id) => {
331 batch.delete_batch(
332 &self.dynamic_field,
333 [DynamicFieldKey::new(*object_id, old_object.id())],
334 )?;
335 }
336
337 Owner::Shared { .. } | Owner::Immutable => {}
338 }
339 }
340 }
341
342 match object.owner() {
343 Owner::AddressOwner(owner) => {
344 let owner_key = OwnerIndexKey::new(*owner, object.id());
345 let owner_info = OwnerIndexInfo::new(object);
346 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
347 }
348 Owner::ObjectOwner(parent) => {
349 if let Some(field_info) =
350 try_create_dynamic_field_info(object, resolver)
351 .ok()
352 .flatten()
353 {
354 let field_key = DynamicFieldKey::new(*parent, object.id());
355
356 batch
357 .insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
358 }
359 }
360 Owner::Shared { .. } | Owner::Immutable => {}
361 }
362 }
363
364 for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
371 use std::collections::hash_map::Entry;
372
373 match coin_index.entry(key) {
374 Entry::Occupied(o) => {
375 let (key, v) = o.remove_entry();
376 let value = value.merge(v);
377 batch.insert_batch(&self.coin, [(key, value)])?;
378 }
379 Entry::Vacant(v) => {
380 v.insert(value);
381 }
382 }
383 }
384 }
385
386 batch.insert_batch(&self.coin, coin_index)?;
387 }
388
389 debug!(
390 checkpoint = checkpoint.checkpoint_summary.sequence_number,
391 "finished indexing checkpoint"
392 );
393
394 Ok(batch)
395 }
396
397 fn get_transaction_info(
398 &self,
399 digest: &TransactionDigest,
400 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
401 self.transactions.get(digest)
402 }
403
404 fn owner_iter(
405 &self,
406 owner: IotaAddress,
407 cursor: Option<ObjectID>,
408 ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
409 let lower_bound = OwnerIndexKey::new(owner, ObjectID::ZERO);
410 let upper_bound = OwnerIndexKey::new(owner, ObjectID::MAX);
411 let mut iter = self
412 .owner
413 .iter_with_bounds(Some(lower_bound), Some(upper_bound));
414
415 if let Some(cursor) = cursor {
416 iter = iter.skip_to(&OwnerIndexKey::new(owner, cursor))?;
417 }
418
419 Ok(iter)
420 }
421
422 fn dynamic_field_iter(
423 &self,
424 parent: ObjectID,
425 cursor: Option<ObjectID>,
426 ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
427 {
428 let lower_bound = DynamicFieldKey::new(parent, ObjectID::ZERO);
429 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
430 let mut iter = self
431 .dynamic_field
432 .iter_with_bounds(Some(lower_bound), Some(upper_bound));
433
434 if let Some(cursor) = cursor {
435 iter = iter.skip_to(&DynamicFieldKey::new(parent, cursor))?;
436 }
437
438 Ok(iter)
439 }
440
441 fn get_coin_info(
442 &self,
443 coin_type: &StructTag,
444 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
445 let key = CoinIndexKey {
446 coin_type: coin_type.to_owned(),
447 };
448 self.coin.get(&key)
449 }
450}
451
452pub struct RestIndexStore {
453 tables: IndexStoreTables,
454 pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
455}
456
457impl RestIndexStore {
458 pub fn new(
459 path: PathBuf,
460 authority_store: &AuthorityStore,
461 checkpoint_store: &CheckpointStore,
462 epoch_store: &AuthorityPerEpochStore,
463 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
464 ) -> Self {
465 let tables = {
466 let tables = IndexStoreTables::open(&path);
467
468 if tables.needs_to_do_initialization() {
471 let mut tables = if tables.needs_to_delete_old_db() {
472 drop(tables);
473 typed_store::rocks::safe_drop_db(path.clone())
474 .expect("unable to destroy old rest-index db");
475 IndexStoreTables::open(path)
476 } else {
477 tables
478 };
479
480 tables
481 .init(
482 authority_store,
483 checkpoint_store,
484 epoch_store,
485 package_store,
486 )
487 .expect("unable to initialize rest index from live object set");
488 tables
489 } else {
490 tables
491 }
492 };
493
494 Self {
495 tables,
496 pending_updates: Default::default(),
497 }
498 }
499
500 pub fn new_without_init(path: PathBuf) -> Self {
501 let tables = IndexStoreTables::open(path);
502
503 Self {
504 tables,
505 pending_updates: Default::default(),
506 }
507 }
508
509 pub fn prune(
510 &self,
511 checkpoint_contents_to_prune: &[CheckpointContents],
512 ) -> Result<(), TypedStoreError> {
513 self.tables.prune(checkpoint_contents_to_prune)
514 }
515
516 pub fn index_checkpoint(
521 &self,
522 checkpoint: &CheckpointData,
523 resolver: &mut dyn LayoutResolver,
524 ) -> Result<(), StorageError> {
525 let sequence_number = checkpoint.checkpoint_summary.sequence_number;
526 let batch = self.tables.index_checkpoint(checkpoint, resolver)?;
527
528 self.pending_updates
529 .lock()
530 .unwrap()
531 .insert(sequence_number, batch);
532
533 Ok(())
534 }
535
536 pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
544 let next_batch = self.pending_updates.lock().unwrap().pop_first();
545
546 let (next_sequence_number, batch) = next_batch.unwrap();
548 assert_eq!(
549 checkpoint, next_sequence_number,
550 "commit_update_for_checkpoint must be called in order"
551 );
552
553 Ok(batch.write()?)
554 }
555
556 pub fn get_transaction_info(
557 &self,
558 digest: &TransactionDigest,
559 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
560 self.tables.get_transaction_info(digest)
561 }
562
563 pub fn owner_iter(
564 &self,
565 owner: IotaAddress,
566 cursor: Option<ObjectID>,
567 ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
568 self.tables.owner_iter(owner, cursor)
569 }
570
571 pub fn dynamic_field_iter(
572 &self,
573 parent: ObjectID,
574 cursor: Option<ObjectID>,
575 ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
576 {
577 self.tables.dynamic_field_iter(parent, cursor)
578 }
579
580 pub fn get_coin_info(
581 &self,
582 coin_type: &StructTag,
583 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
584 self.tables.get_coin_info(coin_type)
585 }
586}
587
588fn try_create_dynamic_field_info(
589 object: &Object,
590 resolver: &mut dyn LayoutResolver,
591) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
592 let Some(move_object) = object.data.try_as_move() else {
594 return Ok(None);
595 };
596
597 if !move_object.type_().is_dynamic_field() {
606 return Ok(None);
607 }
608
609 let layout = resolver
610 .get_annotated_layout(&move_object.type_().clone().into())
611 .map_err(StorageError::custom)?
612 .into_layout();
613
614 let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
615 .map_err(StorageError::custom)?;
616
617 let value_metadata = field.value_metadata().map_err(StorageError::custom)?;
618
619 Ok(Some(DynamicFieldIndexInfo {
620 name_type: field.name_layout.into(),
621 name_value: field.name_bytes.to_owned(),
622 dynamic_field_type: field.kind,
623 dynamic_object_id: if let DFV::ValueMetadata::DynamicObjectField(id) = value_metadata {
624 Some(id)
625 } else {
626 None
627 },
628 }))
629}
630
631fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
632 use iota_types::coin::{CoinMetadata, TreasuryCap};
633
634 object
635 .type_()
636 .and_then(MoveObjectType::other)
637 .and_then(|object_type| {
638 CoinMetadata::is_coin_metadata_with_coin_type(object_type)
639 .cloned()
640 .map(|coin_type| {
641 (
642 CoinIndexKey { coin_type },
643 CoinIndexInfo {
644 coin_metadata_object_id: Some(object.id()),
645 treasury_object_id: None,
646 },
647 )
648 })
649 .or_else(|| {
650 TreasuryCap::is_treasury_with_coin_type(object_type)
651 .cloned()
652 .map(|coin_type| {
653 (
654 CoinIndexKey { coin_type },
655 CoinIndexInfo {
656 coin_metadata_object_id: None,
657 treasury_object_id: Some(object.id()),
658 },
659 )
660 })
661 })
662 })
663}
664
665struct RestParLiveObjectSetIndexer<'a> {
666 tables: &'a IndexStoreTables,
667 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
668 epoch_store: &'a AuthorityPerEpochStore,
669 package_store: &'a Arc<dyn BackingPackageStore + Send + Sync>,
670}
671
672struct RestLiveObjectIndexer<'a> {
673 tables: &'a IndexStoreTables,
674 batch: typed_store::rocks::DBBatch,
675 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
676 resolver: Box<dyn LayoutResolver + 'a>,
677}
678
679impl<'a> ParMakeLiveObjectIndexer for RestParLiveObjectSetIndexer<'a> {
680 type ObjectIndexer = RestLiveObjectIndexer<'a>;
681
682 fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
683 RestLiveObjectIndexer {
684 tables: self.tables,
685 batch: self.tables.owner.batch(),
686 coin_index: self.coin_index,
687 resolver: self
688 .epoch_store
689 .executor()
690 .type_layout_resolver(Box::new(self.package_store)),
691 }
692 }
693}
694
695impl LiveObjectIndexer for RestLiveObjectIndexer<'_> {
696 fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
697 match object.owner {
698 Owner::AddressOwner(owner) => {
700 let owner_key = OwnerIndexKey::new(owner, object.id());
701 let owner_info = OwnerIndexInfo::new(&object);
702 self.batch
703 .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
704 }
705
706 Owner::ObjectOwner(parent) => {
708 if let Some(field_info) =
709 try_create_dynamic_field_info(&object, self.resolver.as_mut())?
710 {
711 let field_key = DynamicFieldKey::new(parent, object.id());
712
713 self.batch
714 .insert_batch(&self.tables.dynamic_field, [(field_key, field_info)])?;
715 }
716 }
717
718 Owner::Shared { .. } | Owner::Immutable => {}
719 }
720
721 if let Some((key, value)) = try_create_coin_index_info(&object) {
723 use std::collections::hash_map::Entry;
724
725 match self.coin_index.lock().unwrap().entry(key) {
726 Entry::Occupied(o) => {
727 let (key, v) = o.remove_entry();
728 let value = value.merge(v);
729 self.batch.insert_batch(&self.tables.coin, [(key, value)])?;
730 }
731 Entry::Vacant(v) => {
732 v.insert(value);
733 }
734 }
735 }
736
737 if self.batch.size_in_bytes() >= 1 << 27 {
740 std::mem::replace(&mut self.batch, self.tables.owner.batch()).write()?;
741 }
742
743 Ok(())
744 }
745
746 fn finish(self) -> Result<(), StorageError> {
747 self.batch.write()?;
748 Ok(())
749 }
750}