1use std::{
9 cmp::{max, min},
10 collections::{HashMap, HashSet},
11 path::{Path, PathBuf},
12 sync::{
13 Arc,
14 atomic::{AtomicU64, Ordering},
15 },
16};
17
18use iota_json_rpc_types::{IotaObjectDataFilter, TransactionFilter};
19use iota_types::{
20 base_types::{
21 IotaAddress, ObjectDigest, ObjectID, ObjectInfo, ObjectRef, SequenceNumber,
22 TransactionDigest, TxSequenceNumber,
23 },
24 digests::TransactionEventsDigest,
25 dynamic_field::{self, DynamicFieldInfo},
26 effects::TransactionEvents,
27 error::{IotaError, IotaResult, UserInputError},
28 inner_temporary_store::TxCoins,
29 object::{Object, Owner},
30 parse_iota_struct_tag,
31};
32use itertools::Itertools;
33use move_core_types::{
34 identifier::Identifier,
35 language_storage::{ModuleId, StructTag, TypeTag},
36};
37use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
38use serde::{Deserialize, Serialize, de::DeserializeOwned};
39use tokio::{sync::OwnedMutexGuard, task::spawn_blocking};
40use tracing::{debug, trace};
41use typed_store::{
42 DBMapUtils, TypedStoreError,
43 rocks::{DBBatch, DBMap, DBOptions, MetricConf, default_db_options, read_size_from_env},
44 traits::{Map, TableSummary, TypedStoreDebug},
45};
46
47use crate::{mutex_table::MutexTable, sharded_lru::ShardedLruCache};
48
49type OwnerIndexKey = (IotaAddress, ObjectID);
50type CoinIndexKey = (IotaAddress, String, ObjectID);
51type DynamicFieldKey = (ObjectID, ObjectID);
52type EventId = (TxSequenceNumber, usize);
53type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
54type AllBalance = HashMap<TypeTag, TotalBalance>;
55
56pub const MAX_TX_RANGE_SIZE: u64 = 4096;
57
58pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
59const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
60const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
61const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
62
63#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
64pub struct TotalBalance {
65 pub balance: i128,
66 pub num_coins: i64,
67}
68
69#[derive(Debug)]
70pub struct ObjectIndexChanges {
71 pub deleted_owners: Vec<OwnerIndexKey>,
72 pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
73 pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
74 pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
75}
76
77#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
78pub struct CoinInfo {
79 pub version: SequenceNumber,
80 pub digest: ObjectDigest,
81 pub balance: u64,
82 pub previous_transaction: TransactionDigest,
83}
84
85impl CoinInfo {
86 pub fn from_object(object: &Object) -> Option<CoinInfo> {
87 object.as_coin_maybe().map(|coin| CoinInfo {
88 version: object.version(),
89 digest: object.digest(),
90 previous_transaction: object.previous_transaction,
91 balance: coin.value(),
92 })
93 }
94}
95
96pub struct IndexStoreMetrics {
97 balance_lookup_from_db: IntCounter,
98 balance_lookup_from_total: IntCounter,
99 all_balance_lookup_from_db: IntCounter,
100 all_balance_lookup_from_total: IntCounter,
101}
102
103impl IndexStoreMetrics {
104 pub fn new(registry: &Registry) -> IndexStoreMetrics {
105 Self {
106 balance_lookup_from_db: register_int_counter_with_registry!(
107 "balance_lookup_from_db",
108 "Total number of balance requests served from cache",
109 registry,
110 )
111 .unwrap(),
112 balance_lookup_from_total: register_int_counter_with_registry!(
113 "balance_lookup_from_total",
114 "Total number of balance requests served ",
115 registry,
116 )
117 .unwrap(),
118 all_balance_lookup_from_db: register_int_counter_with_registry!(
119 "all_balance_lookup_from_db",
120 "Total number of all balance requests served from cache",
121 registry,
122 )
123 .unwrap(),
124 all_balance_lookup_from_total: register_int_counter_with_registry!(
125 "all_balance_lookup_from_total",
126 "Total number of all balance requests served",
127 registry,
128 )
129 .unwrap(),
130 }
131 }
132}
133
134pub struct IndexStoreCaches {
137 per_coin_type_balance: ShardedLruCache<(IotaAddress, TypeTag), IotaResult<TotalBalance>>,
138 all_balances: ShardedLruCache<IotaAddress, IotaResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
139 locks: MutexTable<IotaAddress>,
140}
141
142#[derive(Default)]
143pub struct IndexStoreCacheUpdates {
144 _locks: Vec<OwnedMutexGuard<()>>,
145 per_coin_type_balance_changes: Vec<((IotaAddress, TypeTag), IotaResult<TotalBalance>)>,
146 all_balance_changes: Vec<(IotaAddress, IotaResult<Arc<AllBalance>>)>,
147}
148
149#[derive(DBMapUtils)]
154pub struct IndexStoreTables {
155 #[default_options_override_fn = "transactions_from_addr_table_default_config"]
157 transactions_from_addr: DBMap<(IotaAddress, TxSequenceNumber), TransactionDigest>,
158
159 #[default_options_override_fn = "transactions_to_addr_table_default_config"]
161 transactions_to_addr: DBMap<(IotaAddress, TxSequenceNumber), TransactionDigest>,
162
163 #[deprecated]
165 transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
166
167 #[deprecated]
170 transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
171
172 #[default_options_override_fn = "transactions_by_move_function_table_default_config"]
175 transactions_by_move_function:
176 DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
177
178 #[default_options_override_fn = "transactions_order_table_default_config"]
180 transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
181
182 #[default_options_override_fn = "transactions_seq_table_default_config"]
184 transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
185
186 #[default_options_override_fn = "owner_index_table_default_config"]
192 owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
193
194 #[default_options_override_fn = "coin_index_table_default_config"]
195 coin_index: DBMap<CoinIndexKey, CoinInfo>,
196
197 #[default_options_override_fn = "dynamic_field_index_table_default_config"]
203 dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
204
205 #[default_options_override_fn = "index_table_default_config"]
206 event_order: DBMap<EventId, EventIndex>,
207
208 #[default_options_override_fn = "index_table_default_config"]
209 event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
210
211 #[default_options_override_fn = "index_table_default_config"]
212 event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
213
214 #[default_options_override_fn = "index_table_default_config"]
215 event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
216
217 #[default_options_override_fn = "index_table_default_config"]
218 event_by_sender: DBMap<(IotaAddress, EventId), EventIndex>,
219
220 #[default_options_override_fn = "index_table_default_config"]
221 event_by_time: DBMap<(u64, EventId), EventIndex>,
222}
223
224impl IndexStoreTables {
225 pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
226 &self.owner_index
227 }
228
229 pub fn coin_index(&self) -> &DBMap<CoinIndexKey, CoinInfo> {
230 &self.coin_index
231 }
232}
233
234pub struct IndexStore {
238 next_sequence_number: AtomicU64,
239 tables: IndexStoreTables,
240 caches: IndexStoreCaches,
241 metrics: Arc<IndexStoreMetrics>,
242 max_type_length: u64,
243 remove_deprecated_tables: bool,
244}
245
246fn transactions_order_table_default_config() -> DBOptions {
248 default_db_options().disable_write_throttling()
249}
250fn transactions_seq_table_default_config() -> DBOptions {
251 default_db_options().disable_write_throttling()
252}
253fn transactions_from_addr_table_default_config() -> DBOptions {
254 default_db_options().disable_write_throttling()
255}
256fn transactions_to_addr_table_default_config() -> DBOptions {
257 default_db_options().disable_write_throttling()
258}
259fn transactions_by_move_function_table_default_config() -> DBOptions {
260 default_db_options().disable_write_throttling()
261}
262fn owner_index_table_default_config() -> DBOptions {
263 default_db_options().disable_write_throttling()
264}
265fn dynamic_field_index_table_default_config() -> DBOptions {
266 default_db_options().disable_write_throttling()
267}
268fn index_table_default_config() -> DBOptions {
269 default_db_options().disable_write_throttling()
270}
271fn coin_index_table_default_config() -> DBOptions {
272 default_db_options()
273 .optimize_for_write_throughput()
274 .optimize_for_read(
275 read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
276 )
277 .disable_write_throttling()
278}
279
280impl IndexStore {
281 pub fn new(
282 path: PathBuf,
283 registry: &Registry,
284 max_type_length: Option<u64>,
285 remove_deprecated_tables: bool,
286 ) -> Self {
287 let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
288 path,
289 MetricConf::new("index"),
290 None,
291 None,
292 remove_deprecated_tables,
293 );
294 let metrics = IndexStoreMetrics::new(registry);
295 let caches = IndexStoreCaches {
296 per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
297 all_balances: ShardedLruCache::new(1_000_000, 1000),
298 locks: MutexTable::new(128),
299 };
300 let next_sequence_number = tables
301 .transaction_order
302 .unbounded_iter()
303 .skip_to_last()
304 .next()
305 .map(|(seq, _)| seq + 1)
306 .unwrap_or(0)
307 .into();
308
309 Self {
310 tables,
311 next_sequence_number,
312 caches,
313 metrics: Arc::new(metrics),
314 max_type_length: max_type_length.unwrap_or(128),
315 remove_deprecated_tables,
316 }
317 }
318
319 pub fn tables(&self) -> &IndexStoreTables {
320 &self.tables
321 }
322
323 pub async fn index_coin(
324 &self,
325 digest: &TransactionDigest,
326 batch: &mut DBBatch,
327 object_index_changes: &ObjectIndexChanges,
328 tx_coins: Option<TxCoins>,
329 ) -> IotaResult<IndexStoreCacheUpdates> {
330 if tx_coins.is_none() {
334 return Ok(IndexStoreCacheUpdates::default());
335 }
336 let mut addresses: HashSet<IotaAddress> = HashSet::new();
338 addresses.extend(
339 object_index_changes
340 .deleted_owners
341 .iter()
342 .map(|(owner, _)| *owner),
343 );
344 addresses.extend(
345 object_index_changes
346 .new_owners
347 .iter()
348 .map(|((owner, _), _)| *owner),
349 );
350 let _locks = self.caches.locks.acquire_locks(addresses.into_iter()).await;
351 let mut balance_changes: HashMap<IotaAddress, HashMap<TypeTag, TotalBalance>> =
352 HashMap::new();
353 let (input_coins, written_coins) = tx_coins.unwrap();
355 let coin_delete_keys = object_index_changes
360 .deleted_owners
361 .iter()
362 .filter_map(|(owner, obj_id)| {
363 let object = input_coins.get(obj_id).or(written_coins.get(obj_id))?;
364 let coin_type_tag = object.coin_type_maybe().unwrap_or_else(|| {
365 panic!(
366 "object_id: {:?} is not a coin type, input_coins: {:?}, written_coins: {:?}, tx_digest: {:?}",
367 obj_id, input_coins, written_coins, digest
368 )
369 });
370 let map = balance_changes.entry(*owner).or_default();
371 let entry = map.entry(coin_type_tag.clone()).or_insert(TotalBalance {
372 num_coins: 0,
373 balance: 0
374 });
375 if let Ok(Some(coin_info)) = &self.tables.coin_index.get(&(*owner, coin_type_tag.to_string(), *obj_id)) {
376 entry.num_coins -= 1;
377 entry.balance -= coin_info.balance as i128;
378 }
379 Some((*owner, coin_type_tag.to_string(), *obj_id))
380 }).collect::<Vec<_>>();
381 trace!(
382 tx_digest=?digest,
383 "coin_delete_keys: {:?}",
384 coin_delete_keys,
385 );
386 batch.delete_batch(&self.tables.coin_index, coin_delete_keys.into_iter())?;
387
388 let coin_add_keys = object_index_changes
397 .new_owners
398 .iter()
399 .filter_map(|((owner, obj_id), obj_info)| {
400 let obj = written_coins.get(obj_id)?;
402 let coin_type_tag = obj.coin_type_maybe().unwrap_or_else(|| {
403 panic!(
404 "object_id: {:?} in written_coins is not a coin type, written_coins: {:?}, tx_digest: {:?}",
405 obj_id, written_coins, digest
406 )
407 });
408 let coin = obj.as_coin_maybe().unwrap_or_else(|| {
409 panic!(
410 "object_id: {:?} in written_coins cannot be deserialized as a Coin, written_coins: {:?}, tx_digest: {:?}",
411 obj_id, written_coins, digest
412 )
413 });
414 let map = balance_changes.entry(*owner).or_default();
415 let entry = map.entry(coin_type_tag.clone()).or_insert(TotalBalance {
416 num_coins: 0,
417 balance: 0
418 });
419 let result = self.tables.coin_index.get(&(*owner, coin_type_tag.to_string(), *obj_id));
420 if let Ok(Some(coin_info)) = &result {
421 entry.balance -= coin_info.balance as i128;
422 entry.balance += coin.balance.value() as i128;
423 } else if let Ok(None) = &result {
424 entry.num_coins += 1;
425 entry.balance += coin.balance.value() as i128;
426 }
427 Some(((*owner, coin_type_tag.to_string(), *obj_id), (CoinInfo {version: obj_info.version, digest: obj_info.digest, balance: coin.balance.value(), previous_transaction: *digest})))
428 }).collect::<Vec<_>>();
429 trace!(
430 tx_digest=?digest,
431 "coin_add_keys: {:?}",
432 coin_add_keys,
433 );
434
435 batch.insert_batch(&self.tables.coin_index, coin_add_keys.into_iter())?;
436
437 let per_coin_type_balance_changes: Vec<_> = balance_changes
438 .iter()
439 .flat_map(|(address, balance_map)| {
440 balance_map.iter().map(|(type_tag, balance)| {
441 (
442 (*address, type_tag.clone()),
443 Ok::<TotalBalance, IotaError>(*balance),
444 )
445 })
446 })
447 .collect();
448 let all_balance_changes: Vec<_> = balance_changes
449 .into_iter()
450 .map(|(address, balance_map)| {
451 (
452 address,
453 Ok::<Arc<HashMap<TypeTag, TotalBalance>>, IotaError>(Arc::new(balance_map)),
454 )
455 })
456 .collect();
457 let cache_updates = IndexStoreCacheUpdates {
458 _locks,
459 per_coin_type_balance_changes,
460 all_balance_changes,
461 };
462 Ok(cache_updates)
463 }
464
465 pub async fn index_tx(
468 &self,
469 sender: IotaAddress,
470 active_inputs: impl Iterator<Item = ObjectID>,
471 mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
472 move_functions: impl Iterator<Item = (ObjectID, Identifier, Identifier)> + Clone,
473 events: &TransactionEvents,
474 object_index_changes: ObjectIndexChanges,
475 digest: &TransactionDigest,
476 timestamp_ms: u64,
477 tx_coins: Option<TxCoins>,
478 ) -> IotaResult<u64> {
479 let sequence = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
480 let mut batch = self.tables.transactions_from_addr.batch();
481
482 batch.insert_batch(
483 &self.tables.transaction_order,
484 std::iter::once((sequence, *digest)),
485 )?;
486
487 batch.insert_batch(
488 &self.tables.transactions_seq,
489 std::iter::once((*digest, sequence)),
490 )?;
491
492 batch.insert_batch(
493 &self.tables.transactions_from_addr,
494 std::iter::once(((sender, sequence), *digest)),
495 )?;
496
497 #[allow(deprecated)]
498 if !self.remove_deprecated_tables {
499 batch.insert_batch(
500 &self.tables.transactions_by_input_object_id,
501 active_inputs.map(|id| ((id, sequence), *digest)),
502 )?;
503
504 batch.insert_batch(
505 &self.tables.transactions_by_mutated_object_id,
506 mutated_objects
507 .clone()
508 .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
509 )?;
510 }
511
512 batch.insert_batch(
513 &self.tables.transactions_by_move_function,
514 move_functions.map(|(obj_id, module, function)| {
515 (
516 (obj_id, module.to_string(), function.to_string(), sequence),
517 *digest,
518 )
519 }),
520 )?;
521
522 batch.insert_batch(
523 &self.tables.transactions_to_addr,
524 mutated_objects.filter_map(|(_, owner)| {
525 owner
526 .get_address_owner_address()
527 .ok()
528 .map(|addr| ((addr, sequence), digest))
529 }),
530 )?;
531
532 let cache_updates = self
534 .index_coin(digest, &mut batch, &object_index_changes, tx_coins)
535 .await?;
536
537 batch.delete_batch(
539 &self.tables.owner_index,
540 object_index_changes.deleted_owners.into_iter(),
541 )?;
542 batch.delete_batch(
543 &self.tables.dynamic_field_index,
544 object_index_changes.deleted_dynamic_fields.into_iter(),
545 )?;
546
547 batch.insert_batch(
548 &self.tables.owner_index,
549 object_index_changes.new_owners.into_iter(),
550 )?;
551
552 batch.insert_batch(
553 &self.tables.dynamic_field_index,
554 object_index_changes.new_dynamic_fields.into_iter(),
555 )?;
556
557 let event_digest = events.digest();
559 batch.insert_batch(
560 &self.tables.event_order,
561 events
562 .data
563 .iter()
564 .enumerate()
565 .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
566 )?;
567 batch.insert_batch(
568 &self.tables.event_by_move_module,
569 events
570 .data
571 .iter()
572 .enumerate()
573 .map(|(i, e)| {
574 (
575 i,
576 ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
577 )
578 })
579 .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
580 )?;
581 batch.insert_batch(
582 &self.tables.event_by_sender,
583 events.data.iter().enumerate().map(|(i, e)| {
584 (
585 (e.sender, (sequence, i)),
586 (event_digest, *digest, timestamp_ms),
587 )
588 }),
589 )?;
590 batch.insert_batch(
591 &self.tables.event_by_move_event,
592 events.data.iter().enumerate().map(|(i, e)| {
593 (
594 (e.type_.clone(), (sequence, i)),
595 (event_digest, *digest, timestamp_ms),
596 )
597 }),
598 )?;
599
600 batch.insert_batch(
601 &self.tables.event_by_time,
602 events.data.iter().enumerate().map(|(i, _)| {
603 (
604 (timestamp_ms, (sequence, i)),
605 (event_digest, *digest, timestamp_ms),
606 )
607 }),
608 )?;
609
610 batch.insert_batch(
611 &self.tables.event_by_event_module,
612 events.data.iter().enumerate().map(|(i, e)| {
613 (
614 (
615 ModuleId::new(e.type_.address, e.type_.module.clone()),
616 (sequence, i),
617 ),
618 (event_digest, *digest, timestamp_ms),
619 )
620 }),
621 )?;
622
623 let invalidate_caches =
624 read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
625
626 if invalidate_caches {
627 self.invalidate_per_coin_type_cache(
629 cache_updates
630 .per_coin_type_balance_changes
631 .iter()
632 .map(|x| x.0.clone()),
633 )
634 .await?;
635 self.invalidate_all_balance_cache(
636 cache_updates.all_balance_changes.iter().map(|x| x.0),
637 )
638 .await?;
639 }
640
641 batch.write()?;
642
643 if !invalidate_caches {
644 self.update_per_coin_type_cache(cache_updates.per_coin_type_balance_changes)
650 .await?;
651 self.update_all_balance_cache(cache_updates.all_balance_changes)
652 .await?;
653 }
654 Ok(sequence)
655 }
656
657 pub fn next_sequence_number(&self) -> TxSequenceNumber {
658 self.next_sequence_number.load(Ordering::SeqCst) + 1
659 }
660
661 pub fn get_transactions(
662 &self,
663 filter: Option<TransactionFilter>,
664 cursor: Option<TransactionDigest>,
665 limit: Option<usize>,
666 reverse: bool,
667 ) -> IotaResult<Vec<TransactionDigest>> {
668 let cursor = if let Some(cursor) = cursor {
670 Some(
671 self.get_transaction_seq(&cursor)?
672 .ok_or(IotaError::TransactionNotFound { digest: cursor })?,
673 )
674 } else {
675 None
676 };
677 match filter {
678 Some(TransactionFilter::MoveFunction {
679 package,
680 module,
681 function,
682 }) => Ok(self.get_transactions_by_move_function(
683 package, module, function, cursor, limit, reverse,
684 )?),
685 Some(TransactionFilter::InputObject(object_id)) => {
686 Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
687 }
688 Some(TransactionFilter::ChangedObject(object_id)) => {
689 Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
690 }
691 Some(TransactionFilter::FromAddress(address)) => {
692 Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
693 }
694 Some(TransactionFilter::ToAddress(address)) => {
695 Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
696 }
697 Some(_) => Err(IotaError::UserInput {
700 error: UserInputError::Unsupported(format!("{:?}", filter)),
701 }),
702 None => {
703 let iter = self.tables.transaction_order.unbounded_iter();
704
705 if reverse {
706 let iter = iter
707 .skip_prior_to(&cursor.unwrap_or(TxSequenceNumber::MAX))?
708 .reverse()
709 .skip(usize::from(cursor.is_some()))
710 .map(|(_, digest)| digest);
711 if let Some(limit) = limit {
712 Ok(iter.take(limit).collect())
713 } else {
714 Ok(iter.collect())
715 }
716 } else {
717 let iter = iter
718 .skip_to(&cursor.unwrap_or(TxSequenceNumber::MIN))?
719 .skip(usize::from(cursor.is_some()))
720 .map(|(_, digest)| digest);
721 if let Some(limit) = limit {
722 Ok(iter.take(limit).collect())
723 } else {
724 Ok(iter.collect())
725 }
726 }
727 }
728 }
729 }
730
731 fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
732 index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
733 key: KeyT,
734 cursor: Option<TxSequenceNumber>,
735 limit: Option<usize>,
736 reverse: bool,
737 ) -> IotaResult<Vec<TransactionDigest>> {
738 Ok(if reverse {
739 let iter = index
740 .unbounded_iter()
741 .skip_prior_to(&(key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX)))?
742 .reverse()
743 .skip(usize::from(cursor.is_some()))
745 .take_while(|((id, _), _)| *id == key)
746 .map(|(_, digest)| digest);
747 if let Some(limit) = limit {
748 iter.take(limit).collect()
749 } else {
750 iter.collect()
751 }
752 } else {
753 let iter = index
754 .unbounded_iter()
755 .skip_to(&(key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN)))?
756 .skip(usize::from(cursor.is_some()))
758 .take_while(|((id, _), _)| *id == key)
759 .map(|(_, digest)| digest);
760 if let Some(limit) = limit {
761 iter.take(limit).collect()
762 } else {
763 iter.collect()
764 }
765 })
766 }
767
768 pub fn get_transactions_by_input_object(
769 &self,
770 input_object: ObjectID,
771 cursor: Option<TxSequenceNumber>,
772 limit: Option<usize>,
773 reverse: bool,
774 ) -> IotaResult<Vec<TransactionDigest>> {
775 if self.remove_deprecated_tables {
776 return Ok(vec![]);
777 }
778 #[allow(deprecated)]
779 Self::get_transactions_from_index(
780 &self.tables.transactions_by_input_object_id,
781 input_object,
782 cursor,
783 limit,
784 reverse,
785 )
786 }
787
788 pub fn get_transactions_by_mutated_object(
789 &self,
790 mutated_object: ObjectID,
791 cursor: Option<TxSequenceNumber>,
792 limit: Option<usize>,
793 reverse: bool,
794 ) -> IotaResult<Vec<TransactionDigest>> {
795 if self.remove_deprecated_tables {
796 return Ok(vec![]);
797 }
798 #[allow(deprecated)]
799 Self::get_transactions_from_index(
800 &self.tables.transactions_by_mutated_object_id,
801 mutated_object,
802 cursor,
803 limit,
804 reverse,
805 )
806 }
807
808 pub fn get_transactions_from_addr(
809 &self,
810 addr: IotaAddress,
811 cursor: Option<TxSequenceNumber>,
812 limit: Option<usize>,
813 reverse: bool,
814 ) -> IotaResult<Vec<TransactionDigest>> {
815 Self::get_transactions_from_index(
816 &self.tables.transactions_from_addr,
817 addr,
818 cursor,
819 limit,
820 reverse,
821 )
822 }
823
824 pub fn get_transactions_by_move_function(
825 &self,
826 package: ObjectID,
827 module: Option<String>,
828 function: Option<String>,
829 cursor: Option<TxSequenceNumber>,
830 limit: Option<usize>,
831 reverse: bool,
832 ) -> IotaResult<Vec<TransactionDigest>> {
833 if function.is_some() && module.is_none() {
835 return Err(IotaError::UserInput {
836 error: UserInputError::MoveFunctionInput(
837 "Cannot supply function without supplying module".to_string(),
838 ),
839 });
840 }
841
842 if cursor.is_some() && (module.is_none() || function.is_none()) {
844 return Err(IotaError::UserInput {
845 error: UserInputError::MoveFunctionInput(
846 "Cannot supply cursor without supplying module and function".to_string(),
847 ),
848 });
849 }
850
851 let cursor_val = cursor.unwrap_or(if reverse {
852 TxSequenceNumber::MAX
853 } else {
854 TxSequenceNumber::MIN
855 });
856
857 let max_string = "Z".repeat(self.max_type_length.try_into().unwrap());
858 let module_val = module.clone().unwrap_or(if reverse {
859 max_string.clone()
860 } else {
861 "".to_string()
862 });
863
864 let function_val =
865 function
866 .clone()
867 .unwrap_or(if reverse { max_string } else { "".to_string() });
868
869 let key = (package, module_val, function_val, cursor_val);
870 let iter = self.tables.transactions_by_move_function.unbounded_iter();
871 Ok(if reverse {
872 let iter = iter
873 .skip_prior_to(&key)?
874 .reverse()
875 .skip(usize::from(cursor.is_some()))
877 .take_while(|((id, m, f, _), _)| {
878 *id == package
879 && module.as_ref().map(|x| x == m).unwrap_or(true)
880 && function.as_ref().map(|x| x == f).unwrap_or(true)
881 })
882 .map(|(_, digest)| digest);
883 if let Some(limit) = limit {
884 iter.take(limit).collect()
885 } else {
886 iter.collect()
887 }
888 } else {
889 let iter = iter
890 .skip_to(&key)?
891 .skip(usize::from(cursor.is_some()))
893 .take_while(|((id, m, f, _), _)| {
894 *id == package
895 && module.as_ref().map(|x| x == m).unwrap_or(true)
896 && function.as_ref().map(|x| x == f).unwrap_or(true)
897 })
898 .map(|(_, digest)| digest);
899 if let Some(limit) = limit {
900 iter.take(limit).collect()
901 } else {
902 iter.collect()
903 }
904 })
905 }
906
907 pub fn get_transactions_to_addr(
908 &self,
909 addr: IotaAddress,
910 cursor: Option<TxSequenceNumber>,
911 limit: Option<usize>,
912 reverse: bool,
913 ) -> IotaResult<Vec<TransactionDigest>> {
914 Self::get_transactions_from_index(
915 &self.tables.transactions_to_addr,
916 addr,
917 cursor,
918 limit,
919 reverse,
920 )
921 }
922
923 pub fn get_transaction_seq(
924 &self,
925 digest: &TransactionDigest,
926 ) -> IotaResult<Option<TxSequenceNumber>> {
927 Ok(self.tables.transactions_seq.get(digest)?)
928 }
929
930 pub fn all_events(
931 &self,
932 tx_seq: TxSequenceNumber,
933 event_seq: usize,
934 limit: usize,
935 descending: bool,
936 ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
937 Ok(if descending {
938 self.tables
939 .event_order
940 .unbounded_iter()
941 .skip_prior_to(&(tx_seq, event_seq))?
942 .reverse()
943 .take(limit)
944 .map(|((_, event_seq), (digest, tx_digest, time))| {
945 (digest, tx_digest, event_seq, time)
946 })
947 .collect()
948 } else {
949 self.tables
950 .event_order
951 .unbounded_iter()
952 .skip_to(&(tx_seq, event_seq))?
953 .take(limit)
954 .map(|((_, event_seq), (digest, tx_digest, time))| {
955 (digest, tx_digest, event_seq, time)
956 })
957 .collect()
958 })
959 }
960
961 pub fn events_by_transaction(
962 &self,
963 digest: &TransactionDigest,
964 tx_seq: TxSequenceNumber,
965 event_seq: usize,
966 limit: usize,
967 descending: bool,
968 ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
969 let seq = self
970 .get_transaction_seq(digest)?
971 .ok_or(IotaError::TransactionNotFound { digest: *digest })?;
972 Ok(if descending {
973 self.tables
974 .event_order
975 .unbounded_iter()
976 .skip_prior_to(&(min(tx_seq, seq), event_seq))?
977 .reverse()
978 .take_while(|((tx, _), _)| tx == &seq)
979 .take(limit)
980 .map(|((_, event_seq), (digest, tx_digest, time))| {
981 (digest, tx_digest, event_seq, time)
982 })
983 .collect()
984 } else {
985 self.tables
986 .event_order
987 .unbounded_iter()
988 .skip_to(&(max(tx_seq, seq), event_seq))?
989 .take_while(|((tx, _), _)| tx == &seq)
990 .take(limit)
991 .map(|((_, event_seq), (digest, tx_digest, time))| {
992 (digest, tx_digest, event_seq, time)
993 })
994 .collect()
995 })
996 }
997
998 fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
999 index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1000 key: &KeyT,
1001 tx_seq: TxSequenceNumber,
1002 event_seq: usize,
1003 limit: usize,
1004 descending: bool,
1005 ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1006 Ok(if descending {
1007 index
1008 .unbounded_iter()
1009 .skip_prior_to(&(key.clone(), (tx_seq, event_seq)))?
1010 .reverse()
1011 .take_while(|((m, _), _)| m == key)
1012 .take(limit)
1013 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1014 (digest, tx_digest, event_seq, time)
1015 })
1016 .collect()
1017 } else {
1018 index
1019 .unbounded_iter()
1020 .skip_to(&(key.clone(), (tx_seq, event_seq)))?
1021 .take_while(|((m, _), _)| m == key)
1022 .take(limit)
1023 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1024 (digest, tx_digest, event_seq, time)
1025 })
1026 .collect()
1027 })
1028 }
1029
1030 pub fn events_by_module_id(
1031 &self,
1032 module: &ModuleId,
1033 tx_seq: TxSequenceNumber,
1034 event_seq: usize,
1035 limit: usize,
1036 descending: bool,
1037 ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1038 Self::get_event_from_index(
1039 &self.tables.event_by_move_module,
1040 module,
1041 tx_seq,
1042 event_seq,
1043 limit,
1044 descending,
1045 )
1046 }
1047
1048 pub fn events_by_move_event_struct_name(
1049 &self,
1050 struct_name: &StructTag,
1051 tx_seq: TxSequenceNumber,
1052 event_seq: usize,
1053 limit: usize,
1054 descending: bool,
1055 ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1056 Self::get_event_from_index(
1057 &self.tables.event_by_move_event,
1058 struct_name,
1059 tx_seq,
1060 event_seq,
1061 limit,
1062 descending,
1063 )
1064 }
1065
1066 pub fn events_by_move_event_module(
1067 &self,
1068 module_id: &ModuleId,
1069 tx_seq: TxSequenceNumber,
1070 event_seq: usize,
1071 limit: usize,
1072 descending: bool,
1073 ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1074 Self::get_event_from_index(
1075 &self.tables.event_by_event_module,
1076 module_id,
1077 tx_seq,
1078 event_seq,
1079 limit,
1080 descending,
1081 )
1082 }
1083
1084 pub fn events_by_sender(
1085 &self,
1086 sender: &IotaAddress,
1087 tx_seq: TxSequenceNumber,
1088 event_seq: usize,
1089 limit: usize,
1090 descending: bool,
1091 ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1092 Self::get_event_from_index(
1093 &self.tables.event_by_sender,
1094 sender,
1095 tx_seq,
1096 event_seq,
1097 limit,
1098 descending,
1099 )
1100 }
1101
1102 pub fn event_iterator(
1103 &self,
1104 start_time: u64,
1105 end_time: u64,
1106 tx_seq: TxSequenceNumber,
1107 event_seq: usize,
1108 limit: usize,
1109 descending: bool,
1110 ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1111 Ok(if descending {
1112 self.tables
1113 .event_by_time
1114 .unbounded_iter()
1115 .skip_prior_to(&(end_time, (tx_seq, event_seq)))?
1116 .reverse()
1117 .take_while(|((m, _), _)| m >= &start_time)
1118 .take(limit)
1119 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1120 (digest, tx_digest, event_seq, time)
1121 })
1122 .collect()
1123 } else {
1124 self.tables
1125 .event_by_time
1126 .unbounded_iter()
1127 .skip_to(&(start_time, (tx_seq, event_seq)))?
1128 .take_while(|((m, _), _)| m <= &end_time)
1129 .take(limit)
1130 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1131 (digest, tx_digest, event_seq, time)
1132 })
1133 .collect()
1134 })
1135 }
1136
1137 pub fn get_dynamic_fields_iterator(
1138 &self,
1139 object: ObjectID,
1140 cursor: Option<ObjectID>,
1141 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1142 {
1143 debug!(?object, "get_dynamic_fields");
1144 let iter_lower_bound = (object, ObjectID::ZERO);
1145 let iter_upper_bound = (object, ObjectID::MAX);
1146 Ok(self
1147 .tables
1148 .dynamic_field_index
1149 .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
1150 .skip_to(&(object, cursor.unwrap_or(ObjectID::ZERO)))?
1152 .skip(usize::from(cursor.is_some()))
1154 .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
1155 .map_ok(|((_, c), object_info)| (c, object_info)))
1156 }
1157
1158 pub fn get_dynamic_field_object_id(
1159 &self,
1160 object: ObjectID,
1161 name_type: TypeTag,
1162 name_bcs_bytes: &[u8],
1163 ) -> IotaResult<Option<ObjectID>> {
1164 debug!(?object, "get_dynamic_field_object_id");
1165 let dynamic_field_id =
1166 dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1167 |e| {
1168 IotaError::Unknown(format!(
1169 "Unable to generate dynamic field id. Got error: {e:?}"
1170 ))
1171 },
1172 )?;
1173
1174 if let Some(info) = self
1175 .tables
1176 .dynamic_field_index
1177 .get(&(object, dynamic_field_id))?
1178 {
1179 debug_assert!(
1181 info.object_id == dynamic_field_id
1182 || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1183 );
1184 return Ok(Some(info.object_id));
1185 }
1186
1187 let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1188 let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1189 let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1190 object,
1191 &dynamic_object_field_type,
1192 name_bcs_bytes,
1193 )
1194 .map_err(|e| {
1195 IotaError::Unknown(format!(
1196 "Unable to generate dynamic field id. Got error: {e:?}"
1197 ))
1198 })?;
1199 if let Some(info) = self
1200 .tables
1201 .dynamic_field_index
1202 .get(&(object, dynamic_object_field_id))?
1203 {
1204 return Ok(Some(info.object_id));
1205 }
1206
1207 Ok(None)
1208 }
1209
1210 pub fn get_owner_objects(
1211 &self,
1212 owner: IotaAddress,
1213 cursor: Option<ObjectID>,
1214 limit: usize,
1215 filter: Option<IotaObjectDataFilter>,
1216 ) -> IotaResult<Vec<ObjectInfo>> {
1217 let cursor = match cursor {
1218 Some(cursor) => cursor,
1219 None => ObjectID::ZERO,
1220 };
1221 Ok(self
1222 .get_owner_objects_iterator(owner, cursor, filter)?
1223 .take(limit)
1224 .collect())
1225 }
1226
1227 pub fn get_owned_coins_iterator(
1228 coin_index: &DBMap<CoinIndexKey, CoinInfo>,
1229 owner: IotaAddress,
1230 coin_type_tag: Option<String>,
1231 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
1232 let all_coins = coin_type_tag.is_none();
1233 let starting_coin_type =
1234 coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1235 Ok(coin_index
1236 .unbounded_iter()
1237 .skip_to(&(owner, starting_coin_type.clone(), ObjectID::ZERO))?
1238 .take_while(move |((addr, coin_type, _), _)| {
1239 if addr != &owner {
1240 return false;
1241 }
1242 if !all_coins && &starting_coin_type != coin_type {
1243 return false;
1244 }
1245 true
1246 })
1247 .map(|((_, coin_type, obj_id), coin)| (coin_type, obj_id, coin)))
1248 }
1249
1250 pub fn get_owned_coins_iterator_with_cursor(
1251 &self,
1252 owner: IotaAddress,
1253 cursor: (String, ObjectID),
1254 limit: usize,
1255 one_coin_type_only: bool,
1256 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
1257 let (starting_coin_type, starting_object_id) = cursor;
1258 Ok(self
1259 .tables
1260 .coin_index
1261 .unbounded_iter()
1262 .skip_to(&(owner, starting_coin_type.clone(), starting_object_id))?
1263 .filter(move |((_, _, obj_id), _)| obj_id != &starting_object_id)
1264 .enumerate()
1265 .take_while(move |(index, ((addr, coin_type, _), _))| {
1266 if *index >= limit {
1267 return false;
1268 }
1269 if addr != &owner {
1270 return false;
1271 }
1272 if one_coin_type_only && &starting_coin_type != coin_type {
1273 return false;
1274 }
1275 true
1276 })
1277 .map(|(_, ((_, coin_type, obj_id), coin))| (coin_type, obj_id, coin)))
1278 }
1279
1280 pub fn get_owner_objects_iterator(
1284 &self,
1285 owner: IotaAddress,
1286 starting_object_id: ObjectID,
1287 filter: Option<IotaObjectDataFilter>,
1288 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
1289 Ok(self
1290 .tables
1291 .owner_index
1292 .unbounded_iter()
1293 .skip_to(&(owner, starting_object_id))?
1295 .skip(usize::from(starting_object_id != ObjectID::ZERO))
1296 .take_while(move |((address_owner, _), _)| address_owner == &owner)
1297 .filter(move |(_, o)| {
1298 if let Some(filter) = filter.as_ref() {
1299 filter.matches(o)
1300 } else {
1301 true
1302 }
1303 })
1304 .map(|(_, object_info)| object_info))
1305 }
1306
1307 pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> IotaResult {
1308 let mut batch = self.tables.owner_index.batch();
1309 batch.insert_batch(
1310 &self.tables.owner_index,
1311 object_index_changes.new_owners.into_iter(),
1312 )?;
1313 batch.insert_batch(
1314 &self.tables.dynamic_field_index,
1315 object_index_changes.new_dynamic_fields.into_iter(),
1316 )?;
1317 batch.write()?;
1318 Ok(())
1319 }
1320
1321 pub fn is_empty(&self) -> bool {
1322 self.tables.owner_index.is_empty()
1323 }
1324
1325 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
1326 self.tables
1328 .transactions_from_addr
1329 .checkpoint_db(path)
1330 .map_err(Into::into)
1331 }
1332
1333 pub async fn get_balance(
1339 &self,
1340 owner: IotaAddress,
1341 coin_type: TypeTag,
1342 ) -> IotaResult<TotalBalance> {
1343 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1344 let cloned_coin_type = coin_type.clone();
1345 let metrics_cloned = self.metrics.clone();
1346 let coin_index_cloned = self.tables.coin_index.clone();
1347 if force_disable_cache {
1348 return spawn_blocking(move || {
1349 Self::get_balance_from_db(
1350 metrics_cloned,
1351 coin_index_cloned,
1352 owner,
1353 cloned_coin_type,
1354 )
1355 })
1356 .await
1357 .unwrap()
1358 .map_err(|e| IotaError::Execution(format!("Failed to read balance frm DB: {:?}", e)));
1359 }
1360
1361 self.metrics.balance_lookup_from_total.inc();
1362
1363 let balance = self
1364 .caches
1365 .per_coin_type_balance
1366 .get(&(owner, coin_type.clone()))
1367 .await;
1368 if let Some(balance) = balance {
1369 return balance;
1370 }
1371 let all_balance = self.caches.all_balances.get(&owner.clone()).await;
1373 if let Some(Ok(all_balance)) = all_balance {
1374 if let Some(balance) = all_balance.get(&coin_type) {
1375 return Ok(*balance);
1376 }
1377 }
1378 let cloned_coin_type = coin_type.clone();
1379 let metrics_cloned = self.metrics.clone();
1380 let coin_index_cloned = self.tables.coin_index.clone();
1381 self.caches
1382 .per_coin_type_balance
1383 .get_with((owner, coin_type), async move {
1384 spawn_blocking(move || {
1385 Self::get_balance_from_db(
1386 metrics_cloned,
1387 coin_index_cloned,
1388 owner,
1389 cloned_coin_type,
1390 )
1391 })
1392 .await
1393 .unwrap()
1394 .map_err(|e| {
1395 IotaError::Execution(format!("Failed to read balance frm DB: {:?}", e))
1396 })
1397 })
1398 .await
1399 }
1400
1401 pub async fn get_all_balance(
1408 &self,
1409 owner: IotaAddress,
1410 ) -> IotaResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1411 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1412 let metrics_cloned = self.metrics.clone();
1413 let coin_index_cloned = self.tables.coin_index.clone();
1414 if force_disable_cache {
1415 return spawn_blocking(move || {
1416 Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
1417 })
1418 .await
1419 .unwrap()
1420 .map_err(|e| {
1421 IotaError::Execution(format!("Failed to read all balance from DB: {:?}", e))
1422 });
1423 }
1424
1425 self.metrics.all_balance_lookup_from_total.inc();
1426 let metrics_cloned = self.metrics.clone();
1427 let coin_index_cloned = self.tables.coin_index.clone();
1428 self.caches
1429 .all_balances
1430 .get_with(owner, async move {
1431 spawn_blocking(move || {
1432 Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
1433 })
1434 .await
1435 .unwrap()
1436 .map_err(|e| {
1437 IotaError::Execution(format!("Failed to read all balance from DB: {:?}", e))
1438 })
1439 })
1440 .await
1441 .map(|mut balances_map| {
1442 Arc::make_mut(&mut balances_map)
1443 .retain(|_, TotalBalance { num_coins, .. }| *num_coins > 0);
1444 balances_map
1445 })
1446 }
1447
1448 pub fn get_balance_from_db(
1451 metrics: Arc<IndexStoreMetrics>,
1452 coin_index: DBMap<CoinIndexKey, CoinInfo>,
1453 owner: IotaAddress,
1454 coin_type: TypeTag,
1455 ) -> IotaResult<TotalBalance> {
1456 metrics.balance_lookup_from_db.inc();
1457 let coin_type_str = coin_type.to_string();
1458 let coins =
1459 Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?
1460 .map(|(_coin_type, obj_id, coin)| (coin_type_str.clone(), obj_id, coin));
1461
1462 let mut balance = 0i128;
1463 let mut num_coins = 0;
1464 for (_coin_type, _obj_id, coin_info) in coins {
1465 balance += coin_info.balance as i128;
1466 num_coins += 1;
1467 }
1468 Ok(TotalBalance { balance, num_coins })
1469 }
1470
1471 pub fn get_all_balances_from_db(
1473 metrics: Arc<IndexStoreMetrics>,
1474 coin_index: DBMap<CoinIndexKey, CoinInfo>,
1475 owner: IotaAddress,
1476 ) -> IotaResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1477 metrics.all_balance_lookup_from_db.inc();
1478 let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
1479 let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
1480 .chunk_by(|(coin_type, _obj_id, _coin)| coin_type.clone());
1481 for (coin_type, coins) in &coins {
1482 let mut total_balance = 0i128;
1483 let mut coin_object_count = 0;
1484 for (_coin_type, _obj_id, coin_info) in coins {
1485 total_balance += coin_info.balance as i128;
1486 coin_object_count += 1;
1487 }
1488 let coin_type = TypeTag::Struct(Box::new(parse_iota_struct_tag(&coin_type).map_err(
1489 |e| IotaError::Execution(format!("Failed to parse event sender address: {:?}", e)),
1490 )?));
1491 balances.insert(
1492 coin_type,
1493 TotalBalance {
1494 num_coins: coin_object_count,
1495 balance: total_balance,
1496 },
1497 );
1498 }
1499 Ok(Arc::new(balances))
1500 }
1501
1502 async fn invalidate_per_coin_type_cache(
1503 &self,
1504 keys: impl IntoIterator<Item = (IotaAddress, TypeTag)>,
1505 ) -> IotaResult {
1506 self.caches
1507 .per_coin_type_balance
1508 .batch_invalidate(keys)
1509 .await;
1510 Ok(())
1511 }
1512
1513 async fn invalidate_all_balance_cache(
1514 &self,
1515 addresses: impl IntoIterator<Item = IotaAddress>,
1516 ) -> IotaResult {
1517 self.caches.all_balances.batch_invalidate(addresses).await;
1518 Ok(())
1519 }
1520
1521 async fn update_per_coin_type_cache(
1522 &self,
1523 keys: impl IntoIterator<Item = ((IotaAddress, TypeTag), IotaResult<TotalBalance>)>,
1524 ) -> IotaResult {
1525 self.caches
1526 .per_coin_type_balance
1527 .batch_merge(keys, Self::merge_balance)
1528 .await;
1529 Ok(())
1530 }
1531
1532 fn merge_balance(
1533 old_balance: &IotaResult<TotalBalance>,
1534 balance_delta: &IotaResult<TotalBalance>,
1535 ) -> IotaResult<TotalBalance> {
1536 if let Ok(old_balance) = old_balance {
1537 if let Ok(balance_delta) = balance_delta {
1538 Ok(TotalBalance {
1539 balance: old_balance.balance + balance_delta.balance,
1540 num_coins: old_balance.num_coins + balance_delta.num_coins,
1541 })
1542 } else {
1543 balance_delta.clone()
1544 }
1545 } else {
1546 old_balance.clone()
1547 }
1548 }
1549
1550 async fn update_all_balance_cache(
1551 &self,
1552 keys: impl IntoIterator<Item = (IotaAddress, IotaResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
1553 ) -> IotaResult {
1554 self.caches
1555 .all_balances
1556 .batch_merge(keys, Self::merge_all_balance)
1557 .await;
1558 Ok(())
1559 }
1560
1561 fn merge_all_balance(
1562 old_balance: &IotaResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1563 balance_delta: &IotaResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1564 ) -> IotaResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1565 if let Ok(old_balance) = old_balance {
1566 if let Ok(balance_delta) = balance_delta {
1567 let mut new_balance = HashMap::new();
1568 for (key, value) in old_balance.iter() {
1569 new_balance.insert(key.clone(), *value);
1570 }
1571 for (key, delta) in balance_delta.iter() {
1572 let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
1573 balance: 0,
1574 num_coins: 0,
1575 });
1576 let new_total = TotalBalance {
1577 balance: old.balance + delta.balance,
1578 num_coins: old.num_coins + delta.num_coins,
1579 };
1580 new_balance.insert(key.clone(), new_total);
1581 }
1582 Ok(Arc::new(new_balance))
1583 } else {
1584 balance_delta.clone()
1585 }
1586 } else {
1587 old_balance.clone()
1588 }
1589 }
1590}
1591
1592#[cfg(test)]
1593mod tests {
1594 use std::{collections::BTreeMap, env::temp_dir};
1595
1596 use iota_types::{
1597 base_types::{IotaAddress, ObjectInfo, ObjectType},
1598 digests::TransactionDigest,
1599 effects::TransactionEvents,
1600 gas_coin::GAS,
1601 object,
1602 object::Owner,
1603 };
1604 use move_core_types::account_address::AccountAddress;
1605 use prometheus::Registry;
1606
1607 use crate::{IndexStore, indexes::ObjectIndexChanges};
1608
1609 #[tokio::test]
1610 async fn test_index_cache() -> anyhow::Result<()> {
1611 let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
1619 let address: IotaAddress = AccountAddress::random().into();
1620 let mut written_objects = BTreeMap::new();
1621 let mut object_map = BTreeMap::new();
1622
1623 let mut new_objects = vec![];
1624 for _i in 0..10 {
1625 let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
1626 new_objects.push((
1627 (address, object.id()),
1628 ObjectInfo {
1629 object_id: object.id(),
1630 version: object.version(),
1631 digest: object.digest(),
1632 type_: ObjectType::Struct(object.type_().unwrap().clone()),
1633 owner: Owner::AddressOwner(address),
1634 previous_transaction: object.previous_transaction,
1635 },
1636 ));
1637 object_map.insert(object.id(), object.clone());
1638 written_objects.insert(object.data.id(), object);
1639 }
1640 let object_index_changes = ObjectIndexChanges {
1641 deleted_owners: vec![],
1642 deleted_dynamic_fields: vec![],
1643 new_owners: new_objects,
1644 new_dynamic_fields: vec![],
1645 };
1646
1647 let tx_coins = (object_map.clone(), written_objects.clone());
1648 index_store
1649 .index_tx(
1650 address,
1651 vec![].into_iter(),
1652 vec![].into_iter(),
1653 vec![].into_iter(),
1654 &TransactionEvents { data: vec![] },
1655 object_index_changes,
1656 &TransactionDigest::random(),
1657 1234,
1658 Some(tx_coins),
1659 )
1660 .await?;
1661
1662 let balance_from_db = IndexStore::get_balance_from_db(
1663 index_store.metrics.clone(),
1664 index_store.tables.coin_index.clone(),
1665 address,
1666 GAS::type_tag(),
1667 )?;
1668 let balance = index_store.get_balance(address, GAS::type_tag()).await?;
1669 assert_eq!(balance, balance_from_db);
1670 assert_eq!(balance.balance, 1000);
1671 assert_eq!(balance.num_coins, 10);
1672
1673 let all_balance = index_store.get_all_balance(address).await?;
1674 let balance = all_balance.get(&GAS::type_tag()).unwrap();
1675 assert_eq!(*balance, balance_from_db);
1676 assert_eq!(balance.balance, 1000);
1677 assert_eq!(balance.num_coins, 10);
1678
1679 written_objects.clear();
1680 let mut deleted_objects = vec![];
1681 for (id, object) in object_map.iter().take(3) {
1682 deleted_objects.push((address, *id));
1683 written_objects.insert(object.data.id(), object.clone());
1684 }
1685 let object_index_changes = ObjectIndexChanges {
1686 deleted_owners: deleted_objects,
1687 deleted_dynamic_fields: vec![],
1688 new_owners: vec![],
1689 new_dynamic_fields: vec![],
1690 };
1691 let tx_coins = (object_map, written_objects);
1692 index_store
1693 .index_tx(
1694 address,
1695 vec![].into_iter(),
1696 vec![].into_iter(),
1697 vec![].into_iter(),
1698 &TransactionEvents { data: vec![] },
1699 object_index_changes,
1700 &TransactionDigest::random(),
1701 1234,
1702 Some(tx_coins),
1703 )
1704 .await?;
1705 let balance_from_db = IndexStore::get_balance_from_db(
1706 index_store.metrics.clone(),
1707 index_store.tables.coin_index.clone(),
1708 address,
1709 GAS::type_tag(),
1710 )?;
1711 let balance = index_store.get_balance(address, GAS::type_tag()).await?;
1712 assert_eq!(balance, balance_from_db);
1713 assert_eq!(balance.balance, 700);
1714 assert_eq!(balance.num_coins, 7);
1715 index_store
1718 .caches
1719 .per_coin_type_balance
1720 .invalidate(&(address, GAS::type_tag()))
1721 .await;
1722 let all_balance = index_store.get_all_balance(address).await;
1723 let all_balance = all_balance?;
1724 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
1725 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
1726 let balance = index_store.get_balance(address, GAS::type_tag()).await?;
1727 assert_eq!(balance, balance_from_db);
1728 assert_eq!(balance.balance, 700);
1729 assert_eq!(balance.num_coins, 7);
1730
1731 Ok(())
1732 }
1733}