1use std::{
6 collections::{BTreeMap, HashMap},
7 path::PathBuf,
8 sync::{Arc, Mutex},
9 time::Instant,
10};
11
12use iota_types::{
13 base_types::{IotaAddress, MoveObjectType, ObjectID, SequenceNumber},
14 digests::TransactionDigest,
15 dynamic_field::visitor as DFV,
16 full_checkpoint_content::CheckpointData,
17 layout_resolver::LayoutResolver,
18 messages_checkpoint::CheckpointContents,
19 object::{Object, Owner},
20 storage::{
21 BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, error::Error as StorageError,
22 },
23};
24use move_core_types::language_storage::StructTag;
25use rayon::iter::{IntoParallelIterator, ParallelIterator};
26use serde::{Deserialize, Serialize};
27use tracing::{debug, info};
28use typed_store::{
29 DBMapUtils, TypedStoreError,
30 rocks::{DBMap, MetricConf},
31 traits::{Map, TableSummary, TypedStoreDebug},
32};
33
34use crate::{
35 authority::{AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore},
36 checkpoints::CheckpointStore,
37 par_index_live_object_set::{LiveObjectIndexer, ParMakeLiveObjectIndexer},
38};
39
40const CURRENT_DB_VERSION: u64 = 0;
41
42#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
43struct MetadataInfo {
44 version: u64,
46}
47
48#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
49pub struct OwnerIndexKey {
50 pub owner: IotaAddress,
51 pub object_id: ObjectID,
52}
53
54impl OwnerIndexKey {
55 fn new(owner: IotaAddress, object_id: ObjectID) -> Self {
56 Self { owner, object_id }
57 }
58}
59
60#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
61pub struct OwnerIndexInfo {
62 pub version: SequenceNumber,
64 pub type_: MoveObjectType,
65}
66
67impl OwnerIndexInfo {
68 pub fn new(object: &Object) -> Self {
69 Self {
70 version: object.version(),
71 type_: object.type_().expect("packages cannot be owned").to_owned(),
72 }
73 }
74}
75
76#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
77pub struct TransactionInfo {
78 pub checkpoint: u64,
79}
80
81#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
82pub struct CoinIndexKey {
83 coin_type: StructTag,
84}
85
86#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
87pub struct CoinIndexInfo {
88 pub coin_metadata_object_id: Option<ObjectID>,
89 pub treasury_object_id: Option<ObjectID>,
90}
91
92impl CoinIndexInfo {
93 fn merge(self, other: Self) -> Self {
94 Self {
95 coin_metadata_object_id: self
96 .coin_metadata_object_id
97 .or(other.coin_metadata_object_id),
98 treasury_object_id: self.treasury_object_id.or(other.treasury_object_id),
99 }
100 }
101}
102
103#[derive(DBMapUtils)]
113struct IndexStoreTables {
114 meta: DBMap<(), MetadataInfo>,
122
123 transactions: DBMap<TransactionDigest, TransactionInfo>,
128
129 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
134
135 dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
140
141 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
146 }
150
151impl IndexStoreTables {
152 fn open<P: Into<PathBuf>>(path: P) -> Self {
153 IndexStoreTables::open_tables_read_write(
154 path.into(),
155 MetricConf::new("rest-index"),
156 None,
157 None,
158 )
159 }
160
161 fn needs_to_do_initialization(&self) -> bool {
162 match self.meta.get(&()) {
163 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
164 Ok(None) => true,
165 Err(_) => true,
166 }
167 }
168
169 fn needs_to_delete_old_db(&self) -> bool {
170 match self.meta.get(&()) {
171 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
172 Ok(None) => false,
173 Err(_) => true,
174 }
175 }
176
177 fn init(
178 &mut self,
179 authority_store: &AuthorityStore,
180 checkpoint_store: &CheckpointStore,
181 epoch_store: &AuthorityPerEpochStore,
182 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
183 ) -> Result<(), StorageError> {
184 info!("Initializing REST indexes");
185
186 if let Some(highest_executed_checkpoint) =
189 checkpoint_store.get_highest_executed_checkpoint_seq_number()?
190 {
191 let lowest_available_checkpoint = checkpoint_store
192 .get_highest_pruned_checkpoint_seq_number()?
193 .saturating_add(1);
194
195 let checkpoint_range = lowest_available_checkpoint..=highest_executed_checkpoint;
196
197 info!(
198 "Indexing {} checkpoints in range {checkpoint_range:?}",
199 checkpoint_range.size_hint().0
200 );
201 let start_time = Instant::now();
202
203 checkpoint_range.into_par_iter().try_for_each(|seq| {
204 let checkpoint = checkpoint_store
205 .get_checkpoint_by_sequence_number(seq)?
206 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
207 let contents = checkpoint_store
208 .get_checkpoint_contents(&checkpoint.content_digest)?
209 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
210
211 let info = TransactionInfo {
212 checkpoint: checkpoint.sequence_number,
213 };
214
215 self.transactions
216 .multi_insert(contents.iter().map(|digests| (digests.transaction, info)))
217 .map_err(StorageError::from)
218 })?;
219
220 info!(
221 "Indexing checkpoints took {} seconds",
222 start_time.elapsed().as_secs()
223 );
224 }
225
226 let coin_index = Mutex::new(HashMap::new());
227
228 let make_live_object_indexer = RestParLiveObjectSetIndexer {
229 tables: self,
230 coin_index: &coin_index,
231 epoch_store,
232 package_store,
233 };
234
235 crate::par_index_live_object_set::par_index_live_object_set(
236 authority_store,
237 &make_live_object_indexer,
238 )?;
239
240 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
241
242 self.meta.insert(
243 &(),
244 &MetadataInfo {
245 version: CURRENT_DB_VERSION,
246 },
247 )?;
248
249 info!("Finished initializing REST indexes");
250
251 Ok(())
252 }
253
254 fn prune(
256 &self,
257 checkpoint_contents_to_prune: &[CheckpointContents],
258 ) -> Result<(), TypedStoreError> {
259 let mut batch = self.transactions.batch();
260
261 let transactions_to_prune = checkpoint_contents_to_prune
262 .iter()
263 .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
264
265 batch.delete_batch(&self.transactions, transactions_to_prune)?;
266
267 batch.write()
268 }
269
270 fn index_checkpoint(
272 &self,
273 checkpoint: &CheckpointData,
274 resolver: &mut dyn LayoutResolver,
275 ) -> Result<typed_store::rocks::DBBatch, StorageError> {
276 debug!(
277 checkpoint = checkpoint.checkpoint_summary.sequence_number,
278 "indexing checkpoint"
279 );
280
281 let mut batch = self.transactions.batch();
282
283 {
285 let info = TransactionInfo {
286 checkpoint: checkpoint.checkpoint_summary.sequence_number,
287 };
288
289 batch.insert_batch(
290 &self.transactions,
291 checkpoint
292 .checkpoint_contents
293 .iter()
294 .map(|digests| (digests.transaction, info)),
295 )?;
296 }
297
298 {
300 let mut coin_index = HashMap::new();
301
302 for tx in &checkpoint.transactions {
303 for removed_object in tx.removed_objects_pre_version() {
305 match removed_object.owner() {
306 Owner::AddressOwner(address) => {
307 let owner_key = OwnerIndexKey::new(*address, removed_object.id());
308 batch.delete_batch(&self.owner, [owner_key])?;
309 }
310 Owner::ObjectOwner(object_id) => {
311 batch.delete_batch(
312 &self.dynamic_field,
313 [DynamicFieldKey::new(*object_id, removed_object.id())],
314 )?;
315 }
316 Owner::Shared { .. } | Owner::Immutable => {}
317 }
318 }
319
320 for (object, old_object) in tx.changed_objects() {
322 if let Some(old_object) = old_object {
323 if old_object.owner() != object.owner() {
324 match old_object.owner() {
325 Owner::AddressOwner(address) => {
326 let owner_key = OwnerIndexKey::new(*address, old_object.id());
327 batch.delete_batch(&self.owner, [owner_key])?;
328 }
329
330 Owner::ObjectOwner(object_id) => {
331 batch.delete_batch(
332 &self.dynamic_field,
333 [DynamicFieldKey::new(*object_id, old_object.id())],
334 )?;
335 }
336
337 Owner::Shared { .. } | Owner::Immutable => {}
338 }
339 }
340 }
341
342 match object.owner() {
343 Owner::AddressOwner(owner) => {
344 let owner_key = OwnerIndexKey::new(*owner, object.id());
345 let owner_info = OwnerIndexInfo::new(object);
346 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
347 }
348 Owner::ObjectOwner(parent) => {
349 if let Some(field_info) =
350 try_create_dynamic_field_info(object, resolver)
351 .ok()
352 .flatten()
353 {
354 let field_key = DynamicFieldKey::new(*parent, object.id());
355
356 batch
357 .insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
358 }
359 }
360 Owner::Shared { .. } | Owner::Immutable => {}
361 }
362 }
363
364 for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
371 use std::collections::hash_map::Entry;
372
373 match coin_index.entry(key) {
374 Entry::Occupied(o) => {
375 let (key, v) = o.remove_entry();
376 let value = value.merge(v);
377 batch.insert_batch(&self.coin, [(key, value)])?;
378 }
379 Entry::Vacant(v) => {
380 v.insert(value);
381 }
382 }
383 }
384 }
385
386 batch.insert_batch(&self.coin, coin_index)?;
387 }
388
389 debug!(
390 checkpoint = checkpoint.checkpoint_summary.sequence_number,
391 "finished indexing checkpoint"
392 );
393
394 Ok(batch)
395 }
396
397 fn get_transaction_info(
398 &self,
399 digest: &TransactionDigest,
400 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
401 self.transactions.get(digest)
402 }
403
404 fn owner_iter(
405 &self,
406 owner: IotaAddress,
407 cursor: Option<ObjectID>,
408 ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
409 let lower_bound = OwnerIndexKey::new(owner, cursor.unwrap_or(ObjectID::ZERO));
410 let upper_bound = OwnerIndexKey::new(owner, ObjectID::MAX);
411 Ok(self
412 .owner
413 .iter_with_bounds(Some(lower_bound), Some(upper_bound)))
414 }
415
416 fn dynamic_field_iter(
417 &self,
418 parent: ObjectID,
419 cursor: Option<ObjectID>,
420 ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
421 {
422 let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
423 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
424 let iter = self
425 .dynamic_field
426 .iter_with_bounds(Some(lower_bound), Some(upper_bound));
427 Ok(iter)
428 }
429
430 fn get_coin_info(
431 &self,
432 coin_type: &StructTag,
433 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
434 let key = CoinIndexKey {
435 coin_type: coin_type.to_owned(),
436 };
437 self.coin.get(&key)
438 }
439}
440
441pub struct RestIndexStore {
442 tables: IndexStoreTables,
443 pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
444}
445
446impl RestIndexStore {
447 pub fn new(
448 path: PathBuf,
449 authority_store: &AuthorityStore,
450 checkpoint_store: &CheckpointStore,
451 epoch_store: &AuthorityPerEpochStore,
452 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
453 ) -> Self {
454 let tables = {
455 let tables = IndexStoreTables::open(&path);
456
457 if tables.needs_to_do_initialization() {
460 let mut tables = if tables.needs_to_delete_old_db() {
461 drop(tables);
462 typed_store::rocks::safe_drop_db(path.clone())
463 .expect("unable to destroy old rest-index db");
464 IndexStoreTables::open(path)
465 } else {
466 tables
467 };
468
469 tables
470 .init(
471 authority_store,
472 checkpoint_store,
473 epoch_store,
474 package_store,
475 )
476 .expect("unable to initialize rest index from live object set");
477 tables
478 } else {
479 tables
480 }
481 };
482
483 Self {
484 tables,
485 pending_updates: Default::default(),
486 }
487 }
488
489 pub fn new_without_init(path: PathBuf) -> Self {
490 let tables = IndexStoreTables::open(path);
491
492 Self {
493 tables,
494 pending_updates: Default::default(),
495 }
496 }
497
498 pub fn prune(
499 &self,
500 checkpoint_contents_to_prune: &[CheckpointContents],
501 ) -> Result<(), TypedStoreError> {
502 self.tables.prune(checkpoint_contents_to_prune)
503 }
504
505 pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
510 let sequence_number = checkpoint.checkpoint_summary.sequence_number;
511 let batch = self
512 .tables
513 .index_checkpoint(checkpoint, resolver)
514 .expect("db error");
515
516 self.pending_updates
517 .lock()
518 .unwrap()
519 .insert(sequence_number, batch);
520 }
521
522 pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
530 let next_batch = self.pending_updates.lock().unwrap().pop_first();
531
532 let (next_sequence_number, batch) = next_batch.unwrap();
534 assert_eq!(
535 checkpoint, next_sequence_number,
536 "commit_update_for_checkpoint must be called in order"
537 );
538
539 Ok(batch.write()?)
540 }
541
542 pub fn get_transaction_info(
543 &self,
544 digest: &TransactionDigest,
545 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
546 self.tables.get_transaction_info(digest)
547 }
548
549 pub fn owner_iter(
550 &self,
551 owner: IotaAddress,
552 cursor: Option<ObjectID>,
553 ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
554 self.tables.owner_iter(owner, cursor)
555 }
556
557 pub fn dynamic_field_iter(
558 &self,
559 parent: ObjectID,
560 cursor: Option<ObjectID>,
561 ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
562 {
563 self.tables.dynamic_field_iter(parent, cursor)
564 }
565
566 pub fn get_coin_info(
567 &self,
568 coin_type: &StructTag,
569 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
570 self.tables.get_coin_info(coin_type)
571 }
572}
573
574fn try_create_dynamic_field_info(
575 object: &Object,
576 resolver: &mut dyn LayoutResolver,
577) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
578 let Some(move_object) = object.data.try_as_move() else {
580 return Ok(None);
581 };
582
583 if !move_object.type_().is_dynamic_field() {
592 return Ok(None);
593 }
594
595 let layout = resolver
596 .get_annotated_layout(&move_object.type_().clone().into())
597 .map_err(StorageError::custom)?
598 .into_layout();
599
600 let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
601 .map_err(StorageError::custom)?;
602
603 let value_metadata = field.value_metadata().map_err(StorageError::custom)?;
604
605 Ok(Some(DynamicFieldIndexInfo {
606 name_type: field.name_layout.into(),
607 name_value: field.name_bytes.to_owned(),
608 dynamic_field_type: field.kind,
609 dynamic_object_id: if let DFV::ValueMetadata::DynamicObjectField(id) = value_metadata {
610 Some(id)
611 } else {
612 None
613 },
614 }))
615}
616
617fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
618 use iota_types::coin::{CoinMetadata, TreasuryCap};
619
620 object
621 .type_()
622 .and_then(MoveObjectType::other)
623 .and_then(|object_type| {
624 CoinMetadata::is_coin_metadata_with_coin_type(object_type)
625 .cloned()
626 .map(|coin_type| {
627 (
628 CoinIndexKey { coin_type },
629 CoinIndexInfo {
630 coin_metadata_object_id: Some(object.id()),
631 treasury_object_id: None,
632 },
633 )
634 })
635 .or_else(|| {
636 TreasuryCap::is_treasury_with_coin_type(object_type)
637 .cloned()
638 .map(|coin_type| {
639 (
640 CoinIndexKey { coin_type },
641 CoinIndexInfo {
642 coin_metadata_object_id: None,
643 treasury_object_id: Some(object.id()),
644 },
645 )
646 })
647 })
648 })
649}
650
651struct RestParLiveObjectSetIndexer<'a> {
652 tables: &'a IndexStoreTables,
653 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
654 epoch_store: &'a AuthorityPerEpochStore,
655 package_store: &'a Arc<dyn BackingPackageStore + Send + Sync>,
656}
657
658struct RestLiveObjectIndexer<'a> {
659 tables: &'a IndexStoreTables,
660 batch: typed_store::rocks::DBBatch,
661 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
662 resolver: Box<dyn LayoutResolver + 'a>,
663}
664
665impl<'a> ParMakeLiveObjectIndexer for RestParLiveObjectSetIndexer<'a> {
666 type ObjectIndexer = RestLiveObjectIndexer<'a>;
667
668 fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
669 RestLiveObjectIndexer {
670 tables: self.tables,
671 batch: self.tables.owner.batch(),
672 coin_index: self.coin_index,
673 resolver: self
674 .epoch_store
675 .executor()
676 .type_layout_resolver(Box::new(self.package_store)),
677 }
678 }
679}
680
681impl LiveObjectIndexer for RestLiveObjectIndexer<'_> {
682 fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
683 match object.owner {
684 Owner::AddressOwner(owner) => {
686 let owner_key = OwnerIndexKey::new(owner, object.id());
687 let owner_info = OwnerIndexInfo::new(&object);
688 self.batch
689 .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
690 }
691
692 Owner::ObjectOwner(parent) => {
694 if let Some(field_info) =
695 try_create_dynamic_field_info(&object, self.resolver.as_mut())?
696 {
697 let field_key = DynamicFieldKey::new(parent, object.id());
698
699 self.batch
700 .insert_batch(&self.tables.dynamic_field, [(field_key, field_info)])?;
701 }
702 }
703
704 Owner::Shared { .. } | Owner::Immutable => {}
705 }
706
707 if let Some((key, value)) = try_create_coin_index_info(&object) {
709 use std::collections::hash_map::Entry;
710
711 match self.coin_index.lock().unwrap().entry(key) {
712 Entry::Occupied(o) => {
713 let (key, v) = o.remove_entry();
714 let value = value.merge(v);
715 self.batch.insert_batch(&self.tables.coin, [(key, value)])?;
716 }
717 Entry::Vacant(v) => {
718 v.insert(value);
719 }
720 }
721 }
722
723 if self.batch.size_in_bytes() >= 1 << 27 {
726 std::mem::replace(&mut self.batch, self.tables.owner.batch()).write()?;
727 }
728
729 Ok(())
730 }
731
732 fn finish(self) -> Result<(), StorageError> {
733 self.batch.write()?;
734 Ok(())
735 }
736}