iota_storage/
indexes.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! IndexStore supports creation of various ancillary indexes of state in
6//! IotaDataStore. The main user of this data is the explorer.
7
8use std::{
9    cmp::{max, min},
10    collections::{HashMap, HashSet},
11    path::{Path, PathBuf},
12    sync::{
13        Arc,
14        atomic::{AtomicU64, Ordering},
15    },
16};
17
18use iota_json_rpc_types::{IotaObjectDataFilter, TransactionFilter};
19use iota_types::{
20    base_types::{
21        IotaAddress, ObjectDigest, ObjectID, ObjectInfo, ObjectRef, SequenceNumber,
22        TransactionDigest, TxSequenceNumber,
23    },
24    digests::TransactionEventsDigest,
25    dynamic_field::{self, DynamicFieldInfo},
26    effects::TransactionEvents,
27    error::{IotaError, IotaResult, UserInputError},
28    inner_temporary_store::TxCoins,
29    object::{Object, Owner},
30    parse_iota_struct_tag,
31};
32use itertools::Itertools;
33use move_core_types::{
34    identifier::Identifier,
35    language_storage::{ModuleId, StructTag, TypeTag},
36};
37use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
38use serde::{Deserialize, Serialize, de::DeserializeOwned};
39use tokio::{sync::OwnedMutexGuard, task::spawn_blocking};
40use tracing::{debug, trace};
41use typed_store::{
42    DBMapUtils, TypedStoreError,
43    rocks::{DBBatch, DBMap, DBOptions, MetricConf, default_db_options, read_size_from_env},
44    traits::{Map, TableSummary, TypedStoreDebug},
45};
46
47use crate::{mutex_table::MutexTable, sharded_lru::ShardedLruCache};
48
49type OwnerIndexKey = (IotaAddress, ObjectID);
50type CoinIndexKey = (IotaAddress, String, ObjectID);
51type DynamicFieldKey = (ObjectID, ObjectID);
52type EventId = (TxSequenceNumber, usize);
53type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
54type AllBalance = HashMap<TypeTag, TotalBalance>;
55
56pub const MAX_TX_RANGE_SIZE: u64 = 4096;
57
58pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
59const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
60const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
61const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
62
63#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
64pub struct TotalBalance {
65    pub balance: i128,
66    pub num_coins: i64,
67}
68
69#[derive(Debug)]
70pub struct ObjectIndexChanges {
71    pub deleted_owners: Vec<OwnerIndexKey>,
72    pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
73    pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
74    pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
75}
76
77#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
78pub struct CoinInfo {
79    pub version: SequenceNumber,
80    pub digest: ObjectDigest,
81    pub balance: u64,
82    pub previous_transaction: TransactionDigest,
83}
84
85impl CoinInfo {
86    pub fn from_object(object: &Object) -> Option<CoinInfo> {
87        object.as_coin_maybe().map(|coin| CoinInfo {
88            version: object.version(),
89            digest: object.digest(),
90            previous_transaction: object.previous_transaction,
91            balance: coin.value(),
92        })
93    }
94}
95
96pub struct IndexStoreMetrics {
97    balance_lookup_from_db: IntCounter,
98    balance_lookup_from_total: IntCounter,
99    all_balance_lookup_from_db: IntCounter,
100    all_balance_lookup_from_total: IntCounter,
101}
102
103impl IndexStoreMetrics {
104    pub fn new(registry: &Registry) -> IndexStoreMetrics {
105        Self {
106            balance_lookup_from_db: register_int_counter_with_registry!(
107                "balance_lookup_from_db",
108                "Total number of balance requests served from cache",
109                registry,
110            )
111            .unwrap(),
112            balance_lookup_from_total: register_int_counter_with_registry!(
113                "balance_lookup_from_total",
114                "Total number of balance requests served ",
115                registry,
116            )
117            .unwrap(),
118            all_balance_lookup_from_db: register_int_counter_with_registry!(
119                "all_balance_lookup_from_db",
120                "Total number of all balance requests served from cache",
121                registry,
122            )
123            .unwrap(),
124            all_balance_lookup_from_total: register_int_counter_with_registry!(
125                "all_balance_lookup_from_total",
126                "Total number of all balance requests served",
127                registry,
128            )
129            .unwrap(),
130        }
131    }
132}
133
134/// The `IndexStoreCaches` struct manages `ShardedLruCache` instances to
135/// facilitate balance lookups and ownership queries.
136pub struct IndexStoreCaches {
137    per_coin_type_balance: ShardedLruCache<(IotaAddress, TypeTag), IotaResult<TotalBalance>>,
138    all_balances: ShardedLruCache<IotaAddress, IotaResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
139    locks: MutexTable<IotaAddress>,
140}
141
142#[derive(Default)]
143pub struct IndexStoreCacheUpdates {
144    _locks: Vec<OwnedMutexGuard<()>>,
145    per_coin_type_balance_changes: Vec<((IotaAddress, TypeTag), IotaResult<TotalBalance>)>,
146    all_balance_changes: Vec<(IotaAddress, IotaResult<Arc<AllBalance>>)>,
147}
148
149/// The `IndexStoreTables` struct defines a set of `DBMaps` used to index
150/// various aspects of transaction and object data. Each field corresponds to a
151/// specific index, with keys such as `IotaAddress`, `TransactionDigest`, etc.
152/// Each mapping is configured with custom database options.
153#[derive(DBMapUtils)]
154pub struct IndexStoreTables {
155    /// Index from iota address to transactions initiated by that address.
156    #[default_options_override_fn = "transactions_from_addr_table_default_config"]
157    transactions_from_addr: DBMap<(IotaAddress, TxSequenceNumber), TransactionDigest>,
158
159    /// Index from iota address to transactions that were sent to that address.
160    #[default_options_override_fn = "transactions_to_addr_table_default_config"]
161    transactions_to_addr: DBMap<(IotaAddress, TxSequenceNumber), TransactionDigest>,
162
163    /// Index from object id to transactions that used that object id as input.
164    #[deprecated]
165    transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
166
167    /// Index from object id to transactions that modified/created that object
168    /// id.
169    #[deprecated]
170    transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
171
172    /// Index from package id, module and function identifier to transactions
173    /// that used that moce function call as input.
174    #[default_options_override_fn = "transactions_by_move_function_table_default_config"]
175    transactions_by_move_function:
176        DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
177
178    /// Ordering of all indexed transactions.
179    #[default_options_override_fn = "transactions_order_table_default_config"]
180    transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
181
182    /// Index from transaction digest to sequence number.
183    #[default_options_override_fn = "transactions_seq_table_default_config"]
184    transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
185
186    /// This is an index of object references to currently existing objects,
187    /// indexed by the composite key of the IotaAddress of their owner and
188    /// the object ID of the object. This composite index allows an
189    /// efficient iterator to list all objected currently owned
190    /// by a specific user, and their object reference.
191    #[default_options_override_fn = "owner_index_table_default_config"]
192    owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
193
194    #[default_options_override_fn = "coin_index_table_default_config"]
195    coin_index: DBMap<CoinIndexKey, CoinInfo>,
196
197    /// This is an index of object references to currently existing dynamic
198    /// field object, indexed by the composite key of the object ID of their
199    /// parent and the object ID of the dynamic field object. This composite
200    /// index allows an efficient iterator to list all objects currently owned
201    /// by a specific object, and their object reference.
202    #[default_options_override_fn = "dynamic_field_index_table_default_config"]
203    dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
204
205    #[default_options_override_fn = "index_table_default_config"]
206    event_order: DBMap<EventId, EventIndex>,
207
208    #[default_options_override_fn = "index_table_default_config"]
209    event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
210
211    #[default_options_override_fn = "index_table_default_config"]
212    event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
213
214    #[default_options_override_fn = "index_table_default_config"]
215    event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
216
217    #[default_options_override_fn = "index_table_default_config"]
218    event_by_sender: DBMap<(IotaAddress, EventId), EventIndex>,
219
220    #[default_options_override_fn = "index_table_default_config"]
221    event_by_time: DBMap<(u64, EventId), EventIndex>,
222}
223
224impl IndexStoreTables {
225    pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
226        &self.owner_index
227    }
228
229    pub fn coin_index(&self) -> &DBMap<CoinIndexKey, CoinInfo> {
230        &self.coin_index
231    }
232}
233
234/// The `IndexStore` enables users to access and manage indexed transaction
235/// data, including ownership and balance information for different objects and
236/// coins.
237pub struct IndexStore {
238    next_sequence_number: AtomicU64,
239    tables: IndexStoreTables,
240    caches: IndexStoreCaches,
241    metrics: Arc<IndexStoreMetrics>,
242    max_type_length: u64,
243    remove_deprecated_tables: bool,
244}
245
246// These functions are used to initialize the DB tables
247fn transactions_order_table_default_config() -> DBOptions {
248    default_db_options().disable_write_throttling()
249}
250fn transactions_seq_table_default_config() -> DBOptions {
251    default_db_options().disable_write_throttling()
252}
253fn transactions_from_addr_table_default_config() -> DBOptions {
254    default_db_options().disable_write_throttling()
255}
256fn transactions_to_addr_table_default_config() -> DBOptions {
257    default_db_options().disable_write_throttling()
258}
259fn transactions_by_move_function_table_default_config() -> DBOptions {
260    default_db_options().disable_write_throttling()
261}
262fn owner_index_table_default_config() -> DBOptions {
263    default_db_options().disable_write_throttling()
264}
265fn dynamic_field_index_table_default_config() -> DBOptions {
266    default_db_options().disable_write_throttling()
267}
268fn index_table_default_config() -> DBOptions {
269    default_db_options().disable_write_throttling()
270}
271fn coin_index_table_default_config() -> DBOptions {
272    default_db_options()
273        .optimize_for_write_throughput()
274        .optimize_for_read(
275            read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
276        )
277        .disable_write_throttling()
278}
279
280impl IndexStore {
281    pub fn new(
282        path: PathBuf,
283        registry: &Registry,
284        max_type_length: Option<u64>,
285        remove_deprecated_tables: bool,
286    ) -> Self {
287        let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
288            path,
289            MetricConf::new("index"),
290            None,
291            None,
292            remove_deprecated_tables,
293        );
294        let metrics = IndexStoreMetrics::new(registry);
295        let caches = IndexStoreCaches {
296            per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
297            all_balances: ShardedLruCache::new(1_000_000, 1000),
298            locks: MutexTable::new(128),
299        };
300        let next_sequence_number = tables
301            .transaction_order
302            .unbounded_iter()
303            .skip_to_last()
304            .next()
305            .map(|(seq, _)| seq + 1)
306            .unwrap_or(0)
307            .into();
308
309        Self {
310            tables,
311            next_sequence_number,
312            caches,
313            metrics: Arc::new(metrics),
314            max_type_length: max_type_length.unwrap_or(128),
315            remove_deprecated_tables,
316        }
317    }
318
319    pub fn tables(&self) -> &IndexStoreTables {
320        &self.tables
321    }
322
323    pub async fn index_coin(
324        &self,
325        digest: &TransactionDigest,
326        batch: &mut DBBatch,
327        object_index_changes: &ObjectIndexChanges,
328        tx_coins: Option<TxCoins>,
329    ) -> IotaResult<IndexStoreCacheUpdates> {
330        // In production if this code path is hit, we should expect `tx_coins` to not be
331        // None. However, in many tests today we do not distinguish validator
332        // and/or fullnode, so we gracefully exist here.
333        if tx_coins.is_none() {
334            return Ok(IndexStoreCacheUpdates::default());
335        }
336        // Acquire locks on changed coin owners
337        let mut addresses: HashSet<IotaAddress> = HashSet::new();
338        addresses.extend(
339            object_index_changes
340                .deleted_owners
341                .iter()
342                .map(|(owner, _)| *owner),
343        );
344        addresses.extend(
345            object_index_changes
346                .new_owners
347                .iter()
348                .map(|((owner, _), _)| *owner),
349        );
350        let _locks = self.caches.locks.acquire_locks(addresses.into_iter()).await;
351        let mut balance_changes: HashMap<IotaAddress, HashMap<TypeTag, TotalBalance>> =
352            HashMap::new();
353        // Index coin info
354        let (input_coins, written_coins) = tx_coins.unwrap();
355        // 1. Delete old owner if the object is deleted or transferred to a new owner,
356        // by looking at `object_index_changes.deleted_owners`.
357        // Objects in `deleted_owners` must be coin type (see
358        // `AuthorityState::commit_certificate`).
359        let coin_delete_keys = object_index_changes
360            .deleted_owners
361            .iter()
362            .filter_map(|(owner, obj_id)| {
363                let object = input_coins.get(obj_id).or(written_coins.get(obj_id))?;
364                let coin_type_tag = object.coin_type_maybe().unwrap_or_else(|| {
365                    panic!(
366                        "object_id: {:?} is not a coin type, input_coins: {:?}, written_coins: {:?}, tx_digest: {:?}",
367                        obj_id, input_coins, written_coins, digest
368                    )
369                });
370                let map = balance_changes.entry(*owner).or_default();
371                let entry = map.entry(coin_type_tag.clone()).or_insert(TotalBalance {
372                    num_coins: 0,
373                    balance: 0
374                });
375                if let Ok(Some(coin_info)) = &self.tables.coin_index.get(&(*owner, coin_type_tag.to_string(), *obj_id)) {
376                    entry.num_coins -= 1;
377                    entry.balance -= coin_info.balance as i128;
378                }
379                Some((*owner, coin_type_tag.to_string(), *obj_id))
380            }).collect::<Vec<_>>();
381        trace!(
382            tx_digest=?digest,
383            "coin_delete_keys: {:?}",
384            coin_delete_keys,
385        );
386        batch.delete_batch(&self.tables.coin_index, coin_delete_keys.into_iter())?;
387
388        // 2. Upsert new owner, by looking at `object_index_changes.new_owners`.
389        // For a object to appear in `new_owners`, it must be owned by `Owner::Address`
390        // after the tx. It also must not be deleted, hence appear in
391        // written_coins (see `AuthorityState::commit_certificate`) It also must
392        // be a coin type (see `AuthorityState::commit_certificate`).
393        // Here the coin could be transferred to a new address, to simply have the
394        // metadata changed (digest, balance etc) due to a successful or failed
395        // transaction.
396        let coin_add_keys = object_index_changes
397        .new_owners
398        .iter()
399        .filter_map(|((owner, obj_id), obj_info)| {
400            // If it's in written_coins, then it's not a coin. Skip it.
401            let obj = written_coins.get(obj_id)?;
402            let coin_type_tag = obj.coin_type_maybe().unwrap_or_else(|| {
403                panic!(
404                    "object_id: {:?} in written_coins is not a coin type, written_coins: {:?}, tx_digest: {:?}",
405                    obj_id, written_coins, digest
406                )
407            });
408            let coin = obj.as_coin_maybe().unwrap_or_else(|| {
409                panic!(
410                    "object_id: {:?} in written_coins cannot be deserialized as a Coin, written_coins: {:?}, tx_digest: {:?}",
411                    obj_id, written_coins, digest
412                )
413            });
414            let map = balance_changes.entry(*owner).or_default();
415            let entry = map.entry(coin_type_tag.clone()).or_insert(TotalBalance {
416                num_coins: 0,
417                balance: 0
418            });
419            let result = self.tables.coin_index.get(&(*owner, coin_type_tag.to_string(), *obj_id));
420            if let Ok(Some(coin_info)) = &result {
421                entry.balance -= coin_info.balance as i128;
422                entry.balance += coin.balance.value() as i128;
423            } else if let Ok(None) = &result {
424                entry.num_coins += 1;
425                entry.balance += coin.balance.value() as i128;
426            }
427            Some(((*owner, coin_type_tag.to_string(), *obj_id), (CoinInfo {version: obj_info.version, digest: obj_info.digest, balance: coin.balance.value(), previous_transaction: *digest})))
428        }).collect::<Vec<_>>();
429        trace!(
430            tx_digest=?digest,
431            "coin_add_keys: {:?}",
432            coin_add_keys,
433        );
434
435        batch.insert_batch(&self.tables.coin_index, coin_add_keys.into_iter())?;
436
437        let per_coin_type_balance_changes: Vec<_> = balance_changes
438            .iter()
439            .flat_map(|(address, balance_map)| {
440                balance_map.iter().map(|(type_tag, balance)| {
441                    (
442                        (*address, type_tag.clone()),
443                        Ok::<TotalBalance, IotaError>(*balance),
444                    )
445                })
446            })
447            .collect();
448        let all_balance_changes: Vec<_> = balance_changes
449            .into_iter()
450            .map(|(address, balance_map)| {
451                (
452                    address,
453                    Ok::<Arc<HashMap<TypeTag, TotalBalance>>, IotaError>(Arc::new(balance_map)),
454                )
455            })
456            .collect();
457        let cache_updates = IndexStoreCacheUpdates {
458            _locks,
459            per_coin_type_balance_changes,
460            all_balance_changes,
461        };
462        Ok(cache_updates)
463    }
464
465    /// Indexes a transaction by updating various indices in the `IndexStore`
466    /// with the provided transaction details.
467    pub async fn index_tx(
468        &self,
469        sender: IotaAddress,
470        active_inputs: impl Iterator<Item = ObjectID>,
471        mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
472        move_functions: impl Iterator<Item = (ObjectID, Identifier, Identifier)> + Clone,
473        events: &TransactionEvents,
474        object_index_changes: ObjectIndexChanges,
475        digest: &TransactionDigest,
476        timestamp_ms: u64,
477        tx_coins: Option<TxCoins>,
478    ) -> IotaResult<u64> {
479        let sequence = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
480        let mut batch = self.tables.transactions_from_addr.batch();
481
482        batch.insert_batch(
483            &self.tables.transaction_order,
484            std::iter::once((sequence, *digest)),
485        )?;
486
487        batch.insert_batch(
488            &self.tables.transactions_seq,
489            std::iter::once((*digest, sequence)),
490        )?;
491
492        batch.insert_batch(
493            &self.tables.transactions_from_addr,
494            std::iter::once(((sender, sequence), *digest)),
495        )?;
496
497        #[allow(deprecated)]
498        if !self.remove_deprecated_tables {
499            batch.insert_batch(
500                &self.tables.transactions_by_input_object_id,
501                active_inputs.map(|id| ((id, sequence), *digest)),
502            )?;
503
504            batch.insert_batch(
505                &self.tables.transactions_by_mutated_object_id,
506                mutated_objects
507                    .clone()
508                    .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
509            )?;
510        }
511
512        batch.insert_batch(
513            &self.tables.transactions_by_move_function,
514            move_functions.map(|(obj_id, module, function)| {
515                (
516                    (obj_id, module.to_string(), function.to_string(), sequence),
517                    *digest,
518                )
519            }),
520        )?;
521
522        batch.insert_batch(
523            &self.tables.transactions_to_addr,
524            mutated_objects.filter_map(|(_, owner)| {
525                owner
526                    .get_address_owner_address()
527                    .ok()
528                    .map(|addr| ((addr, sequence), digest))
529            }),
530        )?;
531
532        // Coin Index
533        let cache_updates = self
534            .index_coin(digest, &mut batch, &object_index_changes, tx_coins)
535            .await?;
536
537        // Owner index
538        batch.delete_batch(
539            &self.tables.owner_index,
540            object_index_changes.deleted_owners.into_iter(),
541        )?;
542        batch.delete_batch(
543            &self.tables.dynamic_field_index,
544            object_index_changes.deleted_dynamic_fields.into_iter(),
545        )?;
546
547        batch.insert_batch(
548            &self.tables.owner_index,
549            object_index_changes.new_owners.into_iter(),
550        )?;
551
552        batch.insert_batch(
553            &self.tables.dynamic_field_index,
554            object_index_changes.new_dynamic_fields.into_iter(),
555        )?;
556
557        // events
558        let event_digest = events.digest();
559        batch.insert_batch(
560            &self.tables.event_order,
561            events
562                .data
563                .iter()
564                .enumerate()
565                .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
566        )?;
567        batch.insert_batch(
568            &self.tables.event_by_move_module,
569            events
570                .data
571                .iter()
572                .enumerate()
573                .map(|(i, e)| {
574                    (
575                        i,
576                        ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
577                    )
578                })
579                .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
580        )?;
581        batch.insert_batch(
582            &self.tables.event_by_sender,
583            events.data.iter().enumerate().map(|(i, e)| {
584                (
585                    (e.sender, (sequence, i)),
586                    (event_digest, *digest, timestamp_ms),
587                )
588            }),
589        )?;
590        batch.insert_batch(
591            &self.tables.event_by_move_event,
592            events.data.iter().enumerate().map(|(i, e)| {
593                (
594                    (e.type_.clone(), (sequence, i)),
595                    (event_digest, *digest, timestamp_ms),
596                )
597            }),
598        )?;
599
600        batch.insert_batch(
601            &self.tables.event_by_time,
602            events.data.iter().enumerate().map(|(i, _)| {
603                (
604                    (timestamp_ms, (sequence, i)),
605                    (event_digest, *digest, timestamp_ms),
606                )
607            }),
608        )?;
609
610        batch.insert_batch(
611            &self.tables.event_by_event_module,
612            events.data.iter().enumerate().map(|(i, e)| {
613                (
614                    (
615                        ModuleId::new(e.type_.address, e.type_.module.clone()),
616                        (sequence, i),
617                    ),
618                    (event_digest, *digest, timestamp_ms),
619                )
620            }),
621        )?;
622
623        let invalidate_caches =
624            read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
625
626        if invalidate_caches {
627            // Invalidate cache before writing to db so we always serve latest values
628            self.invalidate_per_coin_type_cache(
629                cache_updates
630                    .per_coin_type_balance_changes
631                    .iter()
632                    .map(|x| x.0.clone()),
633            )
634            .await?;
635            self.invalidate_all_balance_cache(
636                cache_updates.all_balance_changes.iter().map(|x| x.0),
637            )
638            .await?;
639        }
640
641        batch.write()?;
642
643        if !invalidate_caches {
644            // We cannot update the cache before updating the db or else on failing to write
645            // to db we will update the cache (when we retry to index this
646            // transaction again we would have updated the cache twice).
647            // However, this only means cache is eventually consistent with
648            // the db (within a very short delay)
649            self.update_per_coin_type_cache(cache_updates.per_coin_type_balance_changes)
650                .await?;
651            self.update_all_balance_cache(cache_updates.all_balance_changes)
652                .await?;
653        }
654        Ok(sequence)
655    }
656
657    pub fn next_sequence_number(&self) -> TxSequenceNumber {
658        self.next_sequence_number.load(Ordering::SeqCst) + 1
659    }
660
661    pub fn get_transactions(
662        &self,
663        filter: Option<TransactionFilter>,
664        cursor: Option<TransactionDigest>,
665        limit: Option<usize>,
666        reverse: bool,
667    ) -> IotaResult<Vec<TransactionDigest>> {
668        // Lookup TransactionDigest sequence number,
669        let cursor = if let Some(cursor) = cursor {
670            Some(
671                self.get_transaction_seq(&cursor)?
672                    .ok_or(IotaError::TransactionNotFound { digest: cursor })?,
673            )
674        } else {
675            None
676        };
677        match filter {
678            Some(TransactionFilter::MoveFunction {
679                package,
680                module,
681                function,
682            }) => Ok(self.get_transactions_by_move_function(
683                package, module, function, cursor, limit, reverse,
684            )?),
685            Some(TransactionFilter::InputObject(object_id)) => {
686                Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
687            }
688            Some(TransactionFilter::ChangedObject(object_id)) => {
689                Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
690            }
691            Some(TransactionFilter::FromAddress(address)) => {
692                Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
693            }
694            Some(TransactionFilter::ToAddress(address)) => {
695                Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
696            }
697            // NOTE: filter via checkpoint sequence number is implemented in
698            // `get_transactions` of authority.rs.
699            Some(_) => Err(IotaError::UserInput {
700                error: UserInputError::Unsupported(format!("{:?}", filter)),
701            }),
702            None => {
703                let iter = self.tables.transaction_order.unbounded_iter();
704
705                if reverse {
706                    let iter = iter
707                        .skip_prior_to(&cursor.unwrap_or(TxSequenceNumber::MAX))?
708                        .reverse()
709                        .skip(usize::from(cursor.is_some()))
710                        .map(|(_, digest)| digest);
711                    if let Some(limit) = limit {
712                        Ok(iter.take(limit).collect())
713                    } else {
714                        Ok(iter.collect())
715                    }
716                } else {
717                    let iter = iter
718                        .skip_to(&cursor.unwrap_or(TxSequenceNumber::MIN))?
719                        .skip(usize::from(cursor.is_some()))
720                        .map(|(_, digest)| digest);
721                    if let Some(limit) = limit {
722                        Ok(iter.take(limit).collect())
723                    } else {
724                        Ok(iter.collect())
725                    }
726                }
727            }
728        }
729    }
730
731    fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
732        index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
733        key: KeyT,
734        cursor: Option<TxSequenceNumber>,
735        limit: Option<usize>,
736        reverse: bool,
737    ) -> IotaResult<Vec<TransactionDigest>> {
738        Ok(if reverse {
739            let iter = index
740                .unbounded_iter()
741                .skip_prior_to(&(key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX)))?
742                .reverse()
743                // skip one more if exclusive cursor is Some
744                .skip(usize::from(cursor.is_some()))
745                .take_while(|((id, _), _)| *id == key)
746                .map(|(_, digest)| digest);
747            if let Some(limit) = limit {
748                iter.take(limit).collect()
749            } else {
750                iter.collect()
751            }
752        } else {
753            let iter = index
754                .unbounded_iter()
755                .skip_to(&(key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN)))?
756                // skip one more if exclusive cursor is Some
757                .skip(usize::from(cursor.is_some()))
758                .take_while(|((id, _), _)| *id == key)
759                .map(|(_, digest)| digest);
760            if let Some(limit) = limit {
761                iter.take(limit).collect()
762            } else {
763                iter.collect()
764            }
765        })
766    }
767
768    pub fn get_transactions_by_input_object(
769        &self,
770        input_object: ObjectID,
771        cursor: Option<TxSequenceNumber>,
772        limit: Option<usize>,
773        reverse: bool,
774    ) -> IotaResult<Vec<TransactionDigest>> {
775        if self.remove_deprecated_tables {
776            return Ok(vec![]);
777        }
778        #[allow(deprecated)]
779        Self::get_transactions_from_index(
780            &self.tables.transactions_by_input_object_id,
781            input_object,
782            cursor,
783            limit,
784            reverse,
785        )
786    }
787
788    pub fn get_transactions_by_mutated_object(
789        &self,
790        mutated_object: ObjectID,
791        cursor: Option<TxSequenceNumber>,
792        limit: Option<usize>,
793        reverse: bool,
794    ) -> IotaResult<Vec<TransactionDigest>> {
795        if self.remove_deprecated_tables {
796            return Ok(vec![]);
797        }
798        #[allow(deprecated)]
799        Self::get_transactions_from_index(
800            &self.tables.transactions_by_mutated_object_id,
801            mutated_object,
802            cursor,
803            limit,
804            reverse,
805        )
806    }
807
808    pub fn get_transactions_from_addr(
809        &self,
810        addr: IotaAddress,
811        cursor: Option<TxSequenceNumber>,
812        limit: Option<usize>,
813        reverse: bool,
814    ) -> IotaResult<Vec<TransactionDigest>> {
815        Self::get_transactions_from_index(
816            &self.tables.transactions_from_addr,
817            addr,
818            cursor,
819            limit,
820            reverse,
821        )
822    }
823
824    pub fn get_transactions_by_move_function(
825        &self,
826        package: ObjectID,
827        module: Option<String>,
828        function: Option<String>,
829        cursor: Option<TxSequenceNumber>,
830        limit: Option<usize>,
831        reverse: bool,
832    ) -> IotaResult<Vec<TransactionDigest>> {
833        // If we are passed a function with no module return a UserInputError
834        if function.is_some() && module.is_none() {
835            return Err(IotaError::UserInput {
836                error: UserInputError::MoveFunctionInput(
837                    "Cannot supply function without supplying module".to_string(),
838                ),
839            });
840        }
841
842        // We cannot have a cursor without filling out the other keys.
843        if cursor.is_some() && (module.is_none() || function.is_none()) {
844            return Err(IotaError::UserInput {
845                error: UserInputError::MoveFunctionInput(
846                    "Cannot supply cursor without supplying module and function".to_string(),
847                ),
848            });
849        }
850
851        let cursor_val = cursor.unwrap_or(if reverse {
852            TxSequenceNumber::MAX
853        } else {
854            TxSequenceNumber::MIN
855        });
856
857        let max_string = "Z".repeat(self.max_type_length.try_into().unwrap());
858        let module_val = module.clone().unwrap_or(if reverse {
859            max_string.clone()
860        } else {
861            "".to_string()
862        });
863
864        let function_val =
865            function
866                .clone()
867                .unwrap_or(if reverse { max_string } else { "".to_string() });
868
869        let key = (package, module_val, function_val, cursor_val);
870        let iter = self.tables.transactions_by_move_function.unbounded_iter();
871        Ok(if reverse {
872            let iter = iter
873                .skip_prior_to(&key)?
874                .reverse()
875                // skip one more if exclusive cursor is Some
876                .skip(usize::from(cursor.is_some()))
877                .take_while(|((id, m, f, _), _)| {
878                    *id == package
879                        && module.as_ref().map(|x| x == m).unwrap_or(true)
880                        && function.as_ref().map(|x| x == f).unwrap_or(true)
881                })
882                .map(|(_, digest)| digest);
883            if let Some(limit) = limit {
884                iter.take(limit).collect()
885            } else {
886                iter.collect()
887            }
888        } else {
889            let iter = iter
890                .skip_to(&key)?
891                // skip one more if exclusive cursor is Some
892                .skip(usize::from(cursor.is_some()))
893                .take_while(|((id, m, f, _), _)| {
894                    *id == package
895                        && module.as_ref().map(|x| x == m).unwrap_or(true)
896                        && function.as_ref().map(|x| x == f).unwrap_or(true)
897                })
898                .map(|(_, digest)| digest);
899            if let Some(limit) = limit {
900                iter.take(limit).collect()
901            } else {
902                iter.collect()
903            }
904        })
905    }
906
907    pub fn get_transactions_to_addr(
908        &self,
909        addr: IotaAddress,
910        cursor: Option<TxSequenceNumber>,
911        limit: Option<usize>,
912        reverse: bool,
913    ) -> IotaResult<Vec<TransactionDigest>> {
914        Self::get_transactions_from_index(
915            &self.tables.transactions_to_addr,
916            addr,
917            cursor,
918            limit,
919            reverse,
920        )
921    }
922
923    pub fn get_transaction_seq(
924        &self,
925        digest: &TransactionDigest,
926    ) -> IotaResult<Option<TxSequenceNumber>> {
927        Ok(self.tables.transactions_seq.get(digest)?)
928    }
929
930    pub fn all_events(
931        &self,
932        tx_seq: TxSequenceNumber,
933        event_seq: usize,
934        limit: usize,
935        descending: bool,
936    ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
937        Ok(if descending {
938            self.tables
939                .event_order
940                .unbounded_iter()
941                .skip_prior_to(&(tx_seq, event_seq))?
942                .reverse()
943                .take(limit)
944                .map(|((_, event_seq), (digest, tx_digest, time))| {
945                    (digest, tx_digest, event_seq, time)
946                })
947                .collect()
948        } else {
949            self.tables
950                .event_order
951                .unbounded_iter()
952                .skip_to(&(tx_seq, event_seq))?
953                .take(limit)
954                .map(|((_, event_seq), (digest, tx_digest, time))| {
955                    (digest, tx_digest, event_seq, time)
956                })
957                .collect()
958        })
959    }
960
961    pub fn events_by_transaction(
962        &self,
963        digest: &TransactionDigest,
964        tx_seq: TxSequenceNumber,
965        event_seq: usize,
966        limit: usize,
967        descending: bool,
968    ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
969        let seq = self
970            .get_transaction_seq(digest)?
971            .ok_or(IotaError::TransactionNotFound { digest: *digest })?;
972        Ok(if descending {
973            self.tables
974                .event_order
975                .unbounded_iter()
976                .skip_prior_to(&(min(tx_seq, seq), event_seq))?
977                .reverse()
978                .take_while(|((tx, _), _)| tx == &seq)
979                .take(limit)
980                .map(|((_, event_seq), (digest, tx_digest, time))| {
981                    (digest, tx_digest, event_seq, time)
982                })
983                .collect()
984        } else {
985            self.tables
986                .event_order
987                .unbounded_iter()
988                .skip_to(&(max(tx_seq, seq), event_seq))?
989                .take_while(|((tx, _), _)| tx == &seq)
990                .take(limit)
991                .map(|((_, event_seq), (digest, tx_digest, time))| {
992                    (digest, tx_digest, event_seq, time)
993                })
994                .collect()
995        })
996    }
997
998    fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
999        index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1000        key: &KeyT,
1001        tx_seq: TxSequenceNumber,
1002        event_seq: usize,
1003        limit: usize,
1004        descending: bool,
1005    ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1006        Ok(if descending {
1007            index
1008                .unbounded_iter()
1009                .skip_prior_to(&(key.clone(), (tx_seq, event_seq)))?
1010                .reverse()
1011                .take_while(|((m, _), _)| m == key)
1012                .take(limit)
1013                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1014                    (digest, tx_digest, event_seq, time)
1015                })
1016                .collect()
1017        } else {
1018            index
1019                .unbounded_iter()
1020                .skip_to(&(key.clone(), (tx_seq, event_seq)))?
1021                .take_while(|((m, _), _)| m == key)
1022                .take(limit)
1023                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1024                    (digest, tx_digest, event_seq, time)
1025                })
1026                .collect()
1027        })
1028    }
1029
1030    pub fn events_by_module_id(
1031        &self,
1032        module: &ModuleId,
1033        tx_seq: TxSequenceNumber,
1034        event_seq: usize,
1035        limit: usize,
1036        descending: bool,
1037    ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1038        Self::get_event_from_index(
1039            &self.tables.event_by_move_module,
1040            module,
1041            tx_seq,
1042            event_seq,
1043            limit,
1044            descending,
1045        )
1046    }
1047
1048    pub fn events_by_move_event_struct_name(
1049        &self,
1050        struct_name: &StructTag,
1051        tx_seq: TxSequenceNumber,
1052        event_seq: usize,
1053        limit: usize,
1054        descending: bool,
1055    ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1056        Self::get_event_from_index(
1057            &self.tables.event_by_move_event,
1058            struct_name,
1059            tx_seq,
1060            event_seq,
1061            limit,
1062            descending,
1063        )
1064    }
1065
1066    pub fn events_by_move_event_module(
1067        &self,
1068        module_id: &ModuleId,
1069        tx_seq: TxSequenceNumber,
1070        event_seq: usize,
1071        limit: usize,
1072        descending: bool,
1073    ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1074        Self::get_event_from_index(
1075            &self.tables.event_by_event_module,
1076            module_id,
1077            tx_seq,
1078            event_seq,
1079            limit,
1080            descending,
1081        )
1082    }
1083
1084    pub fn events_by_sender(
1085        &self,
1086        sender: &IotaAddress,
1087        tx_seq: TxSequenceNumber,
1088        event_seq: usize,
1089        limit: usize,
1090        descending: bool,
1091    ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1092        Self::get_event_from_index(
1093            &self.tables.event_by_sender,
1094            sender,
1095            tx_seq,
1096            event_seq,
1097            limit,
1098            descending,
1099        )
1100    }
1101
1102    pub fn event_iterator(
1103        &self,
1104        start_time: u64,
1105        end_time: u64,
1106        tx_seq: TxSequenceNumber,
1107        event_seq: usize,
1108        limit: usize,
1109        descending: bool,
1110    ) -> IotaResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1111        Ok(if descending {
1112            self.tables
1113                .event_by_time
1114                .unbounded_iter()
1115                .skip_prior_to(&(end_time, (tx_seq, event_seq)))?
1116                .reverse()
1117                .take_while(|((m, _), _)| m >= &start_time)
1118                .take(limit)
1119                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1120                    (digest, tx_digest, event_seq, time)
1121                })
1122                .collect()
1123        } else {
1124            self.tables
1125                .event_by_time
1126                .unbounded_iter()
1127                .skip_to(&(start_time, (tx_seq, event_seq)))?
1128                .take_while(|((m, _), _)| m <= &end_time)
1129                .take(limit)
1130                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1131                    (digest, tx_digest, event_seq, time)
1132                })
1133                .collect()
1134        })
1135    }
1136
1137    pub fn get_dynamic_fields_iterator(
1138        &self,
1139        object: ObjectID,
1140        cursor: Option<ObjectID>,
1141    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1142    {
1143        debug!(?object, "get_dynamic_fields");
1144        let iter_lower_bound = (object, ObjectID::ZERO);
1145        let iter_upper_bound = (object, ObjectID::MAX);
1146        Ok(self
1147            .tables
1148            .dynamic_field_index
1149            .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
1150            // The object id 0 is the smallest possible
1151            .skip_to(&(object, cursor.unwrap_or(ObjectID::ZERO)))?
1152            // skip an extra b/c the cursor is exclusive
1153            .skip(usize::from(cursor.is_some()))
1154            .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
1155            .map_ok(|((_, c), object_info)| (c, object_info)))
1156    }
1157
1158    pub fn get_dynamic_field_object_id(
1159        &self,
1160        object: ObjectID,
1161        name_type: TypeTag,
1162        name_bcs_bytes: &[u8],
1163    ) -> IotaResult<Option<ObjectID>> {
1164        debug!(?object, "get_dynamic_field_object_id");
1165        let dynamic_field_id =
1166            dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1167                |e| {
1168                    IotaError::Unknown(format!(
1169                        "Unable to generate dynamic field id. Got error: {e:?}"
1170                    ))
1171                },
1172            )?;
1173
1174        if let Some(info) = self
1175            .tables
1176            .dynamic_field_index
1177            .get(&(object, dynamic_field_id))?
1178        {
1179            // info.object_id != dynamic_field_id ==> is_wrapper
1180            debug_assert!(
1181                info.object_id == dynamic_field_id
1182                    || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1183            );
1184            return Ok(Some(info.object_id));
1185        }
1186
1187        let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1188        let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1189        let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1190            object,
1191            &dynamic_object_field_type,
1192            name_bcs_bytes,
1193        )
1194        .map_err(|e| {
1195            IotaError::Unknown(format!(
1196                "Unable to generate dynamic field id. Got error: {e:?}"
1197            ))
1198        })?;
1199        if let Some(info) = self
1200            .tables
1201            .dynamic_field_index
1202            .get(&(object, dynamic_object_field_id))?
1203        {
1204            return Ok(Some(info.object_id));
1205        }
1206
1207        Ok(None)
1208    }
1209
1210    pub fn get_owner_objects(
1211        &self,
1212        owner: IotaAddress,
1213        cursor: Option<ObjectID>,
1214        limit: usize,
1215        filter: Option<IotaObjectDataFilter>,
1216    ) -> IotaResult<Vec<ObjectInfo>> {
1217        let cursor = match cursor {
1218            Some(cursor) => cursor,
1219            None => ObjectID::ZERO,
1220        };
1221        Ok(self
1222            .get_owner_objects_iterator(owner, cursor, filter)?
1223            .take(limit)
1224            .collect())
1225    }
1226
1227    pub fn get_owned_coins_iterator(
1228        coin_index: &DBMap<CoinIndexKey, CoinInfo>,
1229        owner: IotaAddress,
1230        coin_type_tag: Option<String>,
1231    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
1232        let all_coins = coin_type_tag.is_none();
1233        let starting_coin_type =
1234            coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1235        Ok(coin_index
1236            .unbounded_iter()
1237            .skip_to(&(owner, starting_coin_type.clone(), ObjectID::ZERO))?
1238            .take_while(move |((addr, coin_type, _), _)| {
1239                if addr != &owner {
1240                    return false;
1241                }
1242                if !all_coins && &starting_coin_type != coin_type {
1243                    return false;
1244                }
1245                true
1246            })
1247            .map(|((_, coin_type, obj_id), coin)| (coin_type, obj_id, coin)))
1248    }
1249
1250    pub fn get_owned_coins_iterator_with_cursor(
1251        &self,
1252        owner: IotaAddress,
1253        cursor: (String, ObjectID),
1254        limit: usize,
1255        one_coin_type_only: bool,
1256    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
1257        let (starting_coin_type, starting_object_id) = cursor;
1258        Ok(self
1259            .tables
1260            .coin_index
1261            .unbounded_iter()
1262            .skip_to(&(owner, starting_coin_type.clone(), starting_object_id))?
1263            .filter(move |((_, _, obj_id), _)| obj_id != &starting_object_id)
1264            .enumerate()
1265            .take_while(move |(index, ((addr, coin_type, _), _))| {
1266                if *index >= limit {
1267                    return false;
1268                }
1269                if addr != &owner {
1270                    return false;
1271                }
1272                if one_coin_type_only && &starting_coin_type != coin_type {
1273                    return false;
1274                }
1275                true
1276            })
1277            .map(|(_, ((_, coin_type, obj_id), coin))| (coin_type, obj_id, coin)))
1278    }
1279
1280    /// starting_object_id can be used to implement pagination, where a client
1281    /// remembers the last object id of each page, and use it to query the
1282    /// next page.
1283    pub fn get_owner_objects_iterator(
1284        &self,
1285        owner: IotaAddress,
1286        starting_object_id: ObjectID,
1287        filter: Option<IotaObjectDataFilter>,
1288    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
1289        Ok(self
1290            .tables
1291            .owner_index
1292            .unbounded_iter()
1293            // The object id 0 is the smallest possible
1294            .skip_to(&(owner, starting_object_id))?
1295            .skip(usize::from(starting_object_id != ObjectID::ZERO))
1296            .take_while(move |((address_owner, _), _)| address_owner == &owner)
1297            .filter(move |(_, o)| {
1298                if let Some(filter) = filter.as_ref() {
1299                    filter.matches(o)
1300                } else {
1301                    true
1302                }
1303            })
1304            .map(|(_, object_info)| object_info))
1305    }
1306
1307    pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> IotaResult {
1308        let mut batch = self.tables.owner_index.batch();
1309        batch.insert_batch(
1310            &self.tables.owner_index,
1311            object_index_changes.new_owners.into_iter(),
1312        )?;
1313        batch.insert_batch(
1314            &self.tables.dynamic_field_index,
1315            object_index_changes.new_dynamic_fields.into_iter(),
1316        )?;
1317        batch.write()?;
1318        Ok(())
1319    }
1320
1321    pub fn is_empty(&self) -> bool {
1322        self.tables.owner_index.is_empty()
1323    }
1324
1325    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
1326        // We are checkpointing the whole db
1327        self.tables
1328            .transactions_from_addr
1329            .checkpoint_db(path)
1330            .map_err(Into::into)
1331    }
1332
1333    /// This method first gets the balance from `per_coin_type_balance` cache.
1334    /// On a cache miss, it gets the balance for passed in `coin_type` from
1335    /// the `all_balance` cache. Only on the second cache miss, we go to the
1336    /// database (expensive) and update the cache. Notice that db read is
1337    /// done with `spawn_blocking` as that is expected to block
1338    pub async fn get_balance(
1339        &self,
1340        owner: IotaAddress,
1341        coin_type: TypeTag,
1342    ) -> IotaResult<TotalBalance> {
1343        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1344        let cloned_coin_type = coin_type.clone();
1345        let metrics_cloned = self.metrics.clone();
1346        let coin_index_cloned = self.tables.coin_index.clone();
1347        if force_disable_cache {
1348            return spawn_blocking(move || {
1349                Self::get_balance_from_db(
1350                    metrics_cloned,
1351                    coin_index_cloned,
1352                    owner,
1353                    cloned_coin_type,
1354                )
1355            })
1356            .await
1357            .unwrap()
1358            .map_err(|e| IotaError::Execution(format!("Failed to read balance frm DB: {:?}", e)));
1359        }
1360
1361        self.metrics.balance_lookup_from_total.inc();
1362
1363        let balance = self
1364            .caches
1365            .per_coin_type_balance
1366            .get(&(owner, coin_type.clone()))
1367            .await;
1368        if let Some(balance) = balance {
1369            return balance;
1370        }
1371        // cache miss, lookup in all balance cache
1372        let all_balance = self.caches.all_balances.get(&owner.clone()).await;
1373        if let Some(Ok(all_balance)) = all_balance {
1374            if let Some(balance) = all_balance.get(&coin_type) {
1375                return Ok(*balance);
1376            }
1377        }
1378        let cloned_coin_type = coin_type.clone();
1379        let metrics_cloned = self.metrics.clone();
1380        let coin_index_cloned = self.tables.coin_index.clone();
1381        self.caches
1382            .per_coin_type_balance
1383            .get_with((owner, coin_type), async move {
1384                spawn_blocking(move || {
1385                    Self::get_balance_from_db(
1386                        metrics_cloned,
1387                        coin_index_cloned,
1388                        owner,
1389                        cloned_coin_type,
1390                    )
1391                })
1392                .await
1393                .unwrap()
1394                .map_err(|e| {
1395                    IotaError::Execution(format!("Failed to read balance frm DB: {:?}", e))
1396                })
1397            })
1398            .await
1399    }
1400
1401    /// This method gets the balance for all coin types from the `all_balance`
1402    /// cache. On a cache miss, we go to the database (expensive) and update
1403    /// the cache. This cache is dual purpose in the sense that it not only
1404    /// serves `get_AllBalance()` calls but is also used for serving
1405    /// `get_Balance()` queries. Notice that db read is performed with
1406    /// `spawn_blocking` as that is expected to block
1407    pub async fn get_all_balance(
1408        &self,
1409        owner: IotaAddress,
1410    ) -> IotaResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1411        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1412        let metrics_cloned = self.metrics.clone();
1413        let coin_index_cloned = self.tables.coin_index.clone();
1414        if force_disable_cache {
1415            return spawn_blocking(move || {
1416                Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
1417            })
1418            .await
1419            .unwrap()
1420            .map_err(|e| {
1421                IotaError::Execution(format!("Failed to read all balance from DB: {:?}", e))
1422            });
1423        }
1424
1425        self.metrics.all_balance_lookup_from_total.inc();
1426        let metrics_cloned = self.metrics.clone();
1427        let coin_index_cloned = self.tables.coin_index.clone();
1428        self.caches
1429            .all_balances
1430            .get_with(owner, async move {
1431                spawn_blocking(move || {
1432                    Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
1433                })
1434                .await
1435                .unwrap()
1436                .map_err(|e| {
1437                    IotaError::Execution(format!("Failed to read all balance from DB: {:?}", e))
1438                })
1439            })
1440            .await
1441            .map(|mut balances_map| {
1442                Arc::make_mut(&mut balances_map)
1443                    .retain(|_, TotalBalance { num_coins, .. }| *num_coins > 0);
1444                balances_map
1445            })
1446    }
1447
1448    /// Read balance for a `IotaAddress` and `CoinType` from the backend
1449    /// database
1450    pub fn get_balance_from_db(
1451        metrics: Arc<IndexStoreMetrics>,
1452        coin_index: DBMap<CoinIndexKey, CoinInfo>,
1453        owner: IotaAddress,
1454        coin_type: TypeTag,
1455    ) -> IotaResult<TotalBalance> {
1456        metrics.balance_lookup_from_db.inc();
1457        let coin_type_str = coin_type.to_string();
1458        let coins =
1459            Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?
1460                .map(|(_coin_type, obj_id, coin)| (coin_type_str.clone(), obj_id, coin));
1461
1462        let mut balance = 0i128;
1463        let mut num_coins = 0;
1464        for (_coin_type, _obj_id, coin_info) in coins {
1465            balance += coin_info.balance as i128;
1466            num_coins += 1;
1467        }
1468        Ok(TotalBalance { balance, num_coins })
1469    }
1470
1471    /// Read all balances for a `IotaAddress` from the backend database
1472    pub fn get_all_balances_from_db(
1473        metrics: Arc<IndexStoreMetrics>,
1474        coin_index: DBMap<CoinIndexKey, CoinInfo>,
1475        owner: IotaAddress,
1476    ) -> IotaResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1477        metrics.all_balance_lookup_from_db.inc();
1478        let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
1479        let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
1480            .chunk_by(|(coin_type, _obj_id, _coin)| coin_type.clone());
1481        for (coin_type, coins) in &coins {
1482            let mut total_balance = 0i128;
1483            let mut coin_object_count = 0;
1484            for (_coin_type, _obj_id, coin_info) in coins {
1485                total_balance += coin_info.balance as i128;
1486                coin_object_count += 1;
1487            }
1488            let coin_type = TypeTag::Struct(Box::new(parse_iota_struct_tag(&coin_type).map_err(
1489                |e| IotaError::Execution(format!("Failed to parse event sender address: {:?}", e)),
1490            )?));
1491            balances.insert(
1492                coin_type,
1493                TotalBalance {
1494                    num_coins: coin_object_count,
1495                    balance: total_balance,
1496                },
1497            );
1498        }
1499        Ok(Arc::new(balances))
1500    }
1501
1502    async fn invalidate_per_coin_type_cache(
1503        &self,
1504        keys: impl IntoIterator<Item = (IotaAddress, TypeTag)>,
1505    ) -> IotaResult {
1506        self.caches
1507            .per_coin_type_balance
1508            .batch_invalidate(keys)
1509            .await;
1510        Ok(())
1511    }
1512
1513    async fn invalidate_all_balance_cache(
1514        &self,
1515        addresses: impl IntoIterator<Item = IotaAddress>,
1516    ) -> IotaResult {
1517        self.caches.all_balances.batch_invalidate(addresses).await;
1518        Ok(())
1519    }
1520
1521    async fn update_per_coin_type_cache(
1522        &self,
1523        keys: impl IntoIterator<Item = ((IotaAddress, TypeTag), IotaResult<TotalBalance>)>,
1524    ) -> IotaResult {
1525        self.caches
1526            .per_coin_type_balance
1527            .batch_merge(keys, Self::merge_balance)
1528            .await;
1529        Ok(())
1530    }
1531
1532    fn merge_balance(
1533        old_balance: &IotaResult<TotalBalance>,
1534        balance_delta: &IotaResult<TotalBalance>,
1535    ) -> IotaResult<TotalBalance> {
1536        if let Ok(old_balance) = old_balance {
1537            if let Ok(balance_delta) = balance_delta {
1538                Ok(TotalBalance {
1539                    balance: old_balance.balance + balance_delta.balance,
1540                    num_coins: old_balance.num_coins + balance_delta.num_coins,
1541                })
1542            } else {
1543                balance_delta.clone()
1544            }
1545        } else {
1546            old_balance.clone()
1547        }
1548    }
1549
1550    async fn update_all_balance_cache(
1551        &self,
1552        keys: impl IntoIterator<Item = (IotaAddress, IotaResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
1553    ) -> IotaResult {
1554        self.caches
1555            .all_balances
1556            .batch_merge(keys, Self::merge_all_balance)
1557            .await;
1558        Ok(())
1559    }
1560
1561    fn merge_all_balance(
1562        old_balance: &IotaResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1563        balance_delta: &IotaResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1564    ) -> IotaResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1565        if let Ok(old_balance) = old_balance {
1566            if let Ok(balance_delta) = balance_delta {
1567                let mut new_balance = HashMap::new();
1568                for (key, value) in old_balance.iter() {
1569                    new_balance.insert(key.clone(), *value);
1570                }
1571                for (key, delta) in balance_delta.iter() {
1572                    let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
1573                        balance: 0,
1574                        num_coins: 0,
1575                    });
1576                    let new_total = TotalBalance {
1577                        balance: old.balance + delta.balance,
1578                        num_coins: old.num_coins + delta.num_coins,
1579                    };
1580                    new_balance.insert(key.clone(), new_total);
1581                }
1582                Ok(Arc::new(new_balance))
1583            } else {
1584                balance_delta.clone()
1585            }
1586        } else {
1587            old_balance.clone()
1588        }
1589    }
1590}
1591
1592#[cfg(test)]
1593mod tests {
1594    use std::{collections::BTreeMap, env::temp_dir};
1595
1596    use iota_types::{
1597        base_types::{IotaAddress, ObjectInfo, ObjectType},
1598        digests::TransactionDigest,
1599        effects::TransactionEvents,
1600        gas_coin::GAS,
1601        object,
1602        object::Owner,
1603    };
1604    use move_core_types::account_address::AccountAddress;
1605    use prometheus::Registry;
1606
1607    use crate::{IndexStore, indexes::ObjectIndexChanges};
1608
1609    #[tokio::test]
1610    async fn test_index_cache() -> anyhow::Result<()> {
1611        // This test is going to invoke `index_tx()`where 10 coins each with balance 100
1612        // are going to be added to an address. The balance is then going to be read
1613        // from the db and the cache. It should be 1000. Then, we are going to
1614        // delete 3 of those coins from the address and invoke `index_tx()`
1615        // again and read balance. The balance should be 700 and verified from
1616        // both db and cache. This tests make sure we are invalidating entries
1617        // in the cache and always reading latest balance.
1618        let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
1619        let address: IotaAddress = AccountAddress::random().into();
1620        let mut written_objects = BTreeMap::new();
1621        let mut object_map = BTreeMap::new();
1622
1623        let mut new_objects = vec![];
1624        for _i in 0..10 {
1625            let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
1626            new_objects.push((
1627                (address, object.id()),
1628                ObjectInfo {
1629                    object_id: object.id(),
1630                    version: object.version(),
1631                    digest: object.digest(),
1632                    type_: ObjectType::Struct(object.type_().unwrap().clone()),
1633                    owner: Owner::AddressOwner(address),
1634                    previous_transaction: object.previous_transaction,
1635                },
1636            ));
1637            object_map.insert(object.id(), object.clone());
1638            written_objects.insert(object.data.id(), object);
1639        }
1640        let object_index_changes = ObjectIndexChanges {
1641            deleted_owners: vec![],
1642            deleted_dynamic_fields: vec![],
1643            new_owners: new_objects,
1644            new_dynamic_fields: vec![],
1645        };
1646
1647        let tx_coins = (object_map.clone(), written_objects.clone());
1648        index_store
1649            .index_tx(
1650                address,
1651                vec![].into_iter(),
1652                vec![].into_iter(),
1653                vec![].into_iter(),
1654                &TransactionEvents { data: vec![] },
1655                object_index_changes,
1656                &TransactionDigest::random(),
1657                1234,
1658                Some(tx_coins),
1659            )
1660            .await?;
1661
1662        let balance_from_db = IndexStore::get_balance_from_db(
1663            index_store.metrics.clone(),
1664            index_store.tables.coin_index.clone(),
1665            address,
1666            GAS::type_tag(),
1667        )?;
1668        let balance = index_store.get_balance(address, GAS::type_tag()).await?;
1669        assert_eq!(balance, balance_from_db);
1670        assert_eq!(balance.balance, 1000);
1671        assert_eq!(balance.num_coins, 10);
1672
1673        let all_balance = index_store.get_all_balance(address).await?;
1674        let balance = all_balance.get(&GAS::type_tag()).unwrap();
1675        assert_eq!(*balance, balance_from_db);
1676        assert_eq!(balance.balance, 1000);
1677        assert_eq!(balance.num_coins, 10);
1678
1679        written_objects.clear();
1680        let mut deleted_objects = vec![];
1681        for (id, object) in object_map.iter().take(3) {
1682            deleted_objects.push((address, *id));
1683            written_objects.insert(object.data.id(), object.clone());
1684        }
1685        let object_index_changes = ObjectIndexChanges {
1686            deleted_owners: deleted_objects,
1687            deleted_dynamic_fields: vec![],
1688            new_owners: vec![],
1689            new_dynamic_fields: vec![],
1690        };
1691        let tx_coins = (object_map, written_objects);
1692        index_store
1693            .index_tx(
1694                address,
1695                vec![].into_iter(),
1696                vec![].into_iter(),
1697                vec![].into_iter(),
1698                &TransactionEvents { data: vec![] },
1699                object_index_changes,
1700                &TransactionDigest::random(),
1701                1234,
1702                Some(tx_coins),
1703            )
1704            .await?;
1705        let balance_from_db = IndexStore::get_balance_from_db(
1706            index_store.metrics.clone(),
1707            index_store.tables.coin_index.clone(),
1708            address,
1709            GAS::type_tag(),
1710        )?;
1711        let balance = index_store.get_balance(address, GAS::type_tag()).await?;
1712        assert_eq!(balance, balance_from_db);
1713        assert_eq!(balance.balance, 700);
1714        assert_eq!(balance.num_coins, 7);
1715        // Invalidate per coin type balance cache and read from all balance cache to
1716        // ensure the balance matches
1717        index_store
1718            .caches
1719            .per_coin_type_balance
1720            .invalidate(&(address, GAS::type_tag()))
1721            .await;
1722        let all_balance = index_store.get_all_balance(address).await;
1723        let all_balance = all_balance?;
1724        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
1725        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
1726        let balance = index_store.get_balance(address, GAS::type_tag()).await?;
1727        assert_eq!(balance, balance_from_db);
1728        assert_eq!(balance.balance, 700);
1729        assert_eq!(balance.num_coins, 7);
1730
1731        Ok(())
1732    }
1733}