iota_indexer/
indexer_reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashSet,
7    sync::{Arc, Mutex},
8};
9
10use anyhow::{Result, anyhow};
11use cached::{Cached, SizedCache};
12use diesel::{
13    ExpressionMethods, JoinOnDsl, NullableExpressionMethods, OptionalExtension, PgConnection,
14    QueryDsl, RunQueryDsl, SelectableHelper, TextExpressionMethods,
15    dsl::sql,
16    r2d2::ConnectionManager,
17    sql_query,
18    sql_types::{BigInt, Bool},
19};
20use fastcrypto::encoding::{Encoding, Hex};
21use iota_json_rpc_types::{
22    AddressMetrics, Balance, CheckpointId, Coin as IotaCoin, DisplayFieldsResponse, EpochInfo,
23    EventFilter, IotaCoinMetadata, IotaEvent, IotaMoveValue, IotaObjectDataFilter,
24    IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
25    IotaTransactionKind, MoveCallMetrics, MoveFunctionName, NetworkMetrics, ParticipationMetrics,
26    TransactionFilter,
27};
28use iota_package_resolver::{Package, PackageStore, PackageStoreWithLruCache, Resolver};
29use iota_types::{
30    TypeTag,
31    balance::Supply,
32    base_types::{IotaAddress, ObjectID, SequenceNumber, VersionNumber},
33    coin::{CoinMetadata, TreasuryCap},
34    coin_manager::CoinManager,
35    committee::EpochId,
36    digests::{ChainIdentifier, TransactionDigest},
37    dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
38    effects::TransactionEvents,
39    event::EventID,
40    iota_system_state::{
41        IotaSystemStateTrait,
42        iota_system_state_summary::{IotaSystemStateSummary, IotaValidatorSummary},
43    },
44    is_system_package,
45    messages_checkpoint::CheckpointDigest,
46    object::{Object, ObjectRead, PastObjectRead, bounded_visitor::BoundedVisitor},
47};
48use itertools::Itertools;
49use move_core_types::{annotated_value::MoveStructLayout, language_storage::StructTag};
50use tap::TapFallible;
51
52use crate::{
53    db::{ConnectionConfig, ConnectionPool, ConnectionPoolConfig},
54    errors::IndexerError,
55    models::{
56        address_metrics::StoredAddressMetrics,
57        checkpoints::{StoredChainIdentifier, StoredCheckpoint},
58        display::StoredDisplay,
59        epoch::StoredEpochInfo,
60        events::StoredEvent,
61        move_call_metrics::QueriedMoveCallMetrics,
62        network_metrics::StoredNetworkMetrics,
63        obj_indices::StoredObjectVersion,
64        objects::{CoinBalance, StoredHistoryObject, StoredObject},
65        participation_metrics::StoredParticipationMetrics,
66        transactions::{
67            OptimisticTransaction, StoredTransaction, StoredTransactionEvents,
68            stored_events_to_events, tx_events_to_iota_tx_events,
69        },
70        tx_indices::TxSequenceNumber,
71    },
72    schema::{
73        address_metrics, addresses, chain_identifier, checkpoints, display, epochs, events,
74        objects, objects_history, objects_snapshot, objects_version, optimistic_transactions,
75        packages, pruner_cp_watermark, transactions, tx_digests, tx_insertion_order,
76    },
77    store::{diesel_macro::*, package_resolver::IndexerStorePackageResolver},
78    types::{IndexerResult, OwnerType},
79};
80
81pub const TX_SEQUENCE_NUMBER_STR: &str = "tx_sequence_number";
82pub const TRANSACTION_DIGEST_STR: &str = "transaction_digest";
83pub const EVENT_SEQUENCE_NUMBER_STR: &str = "event_sequence_number";
84
85pub struct IndexerReader {
86    pool: ConnectionPool,
87    package_resolver: PackageResolver,
88    package_obj_type_cache: Arc<Mutex<SizedCache<String, Option<ObjectID>>>>,
89}
90
91impl Clone for IndexerReader {
92    fn clone(&self) -> IndexerReader {
93        IndexerReader {
94            pool: self.pool.clone(),
95            package_resolver: self.package_resolver.clone(),
96            package_obj_type_cache: self.package_obj_type_cache.clone(),
97        }
98    }
99}
100
101pub type PackageResolver = Arc<Resolver<PackageStoreWithLruCache<IndexerStorePackageResolver>>>;
102
103// Impl for common initialization and utilities
104impl IndexerReader {
105    pub fn new<T: Into<String>>(db_url: T) -> Result<Self> {
106        let config = ConnectionPoolConfig::default();
107        Self::new_with_config(db_url, config)
108    }
109
110    pub fn new_with_config<T: Into<String>>(
111        db_url: T,
112        config: ConnectionPoolConfig,
113    ) -> Result<Self> {
114        let manager = ConnectionManager::<PgConnection>::new(db_url);
115
116        let connection_config = ConnectionConfig {
117            statement_timeout: config.statement_timeout,
118            read_only: true,
119        };
120
121        let pool = diesel::r2d2::Pool::builder()
122            .max_size(config.pool_size)
123            .connection_timeout(config.connection_timeout)
124            .connection_customizer(Box::new(connection_config))
125            .build(manager)
126            .map_err(|e| anyhow!("Failed to initialize connection pool. Error: {:?}. If Error is None, please check whether the configured pool size (currently {}) exceeds the maximum number of connections allowed by the database.", e, config.pool_size))?;
127
128        let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
129        let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
130        let package_resolver = Arc::new(Resolver::new(package_cache));
131        let package_obj_type_cache = Arc::new(Mutex::new(SizedCache::with_size(10000)));
132        Ok(Self {
133            pool,
134            package_resolver,
135            package_obj_type_cache,
136        })
137    }
138
139    pub async fn spawn_blocking<F, R, E>(&self, f: F) -> Result<R, E>
140    where
141        F: FnOnce(Self) -> Result<R, E> + Send + 'static,
142        R: Send + 'static,
143        E: Send + 'static,
144    {
145        let this = self.clone();
146        let current_span = tracing::Span::current();
147        tokio::task::spawn_blocking(move || {
148            CALLED_FROM_BLOCKING_POOL
149                .with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
150            let _guard = current_span.enter();
151            f(this)
152        })
153        .await
154        .expect("propagate any panics")
155    }
156
157    pub fn get_pool(&self) -> ConnectionPool {
158        self.pool.clone()
159    }
160}
161
162// Impl for reading data from the DB
163impl IndexerReader {
164    fn get_object_from_db(
165        &self,
166        object_id: &ObjectID,
167        version: Option<VersionNumber>,
168    ) -> Result<Option<StoredObject>, IndexerError> {
169        let object_id = object_id.to_vec();
170
171        let stored_object = run_query!(&self.pool, |conn| {
172            if let Some(version) = version {
173                objects::dsl::objects
174                    .filter(objects::dsl::object_id.eq(object_id))
175                    .filter(objects::dsl::object_version.eq(version.value() as i64))
176                    .first::<StoredObject>(conn)
177                    .optional()
178            } else {
179                objects::dsl::objects
180                    .filter(objects::dsl::object_id.eq(object_id))
181                    .first::<StoredObject>(conn)
182                    .optional()
183            }
184        })?;
185        Ok(stored_object)
186    }
187
188    fn get_object(
189        &self,
190        object_id: &ObjectID,
191        version: Option<VersionNumber>,
192    ) -> Result<Option<Object>, IndexerError> {
193        let Some(stored_package) = self.get_object_from_db(object_id, version)? else {
194            return Ok(None);
195        };
196
197        let object = stored_package.try_into()?;
198        Ok(Some(object))
199    }
200
201    pub async fn get_object_in_blocking_task(
202        &self,
203        object_id: ObjectID,
204    ) -> Result<Option<Object>, IndexerError> {
205        self.spawn_blocking(move |this| this.get_object(&object_id, None))
206            .await
207    }
208
209    pub async fn get_object_read_in_blocking_task(
210        &self,
211        object_id: ObjectID,
212    ) -> Result<ObjectRead, IndexerError> {
213        let stored_object = self
214            .spawn_blocking(move |this| this.get_object_raw(object_id))
215            .await?;
216
217        if let Some(object) = stored_object {
218            object
219                .try_into_object_read(self.package_resolver.clone())
220                .await
221        } else {
222            Ok(ObjectRead::NotExists(object_id))
223        }
224    }
225
226    fn get_object_raw(&self, object_id: ObjectID) -> Result<Option<StoredObject>, IndexerError> {
227        let id = object_id.to_vec();
228        let stored_object = run_query!(&self.pool, |conn| {
229            objects::dsl::objects
230                .filter(objects::dsl::object_id.eq(id))
231                .first::<StoredObject>(conn)
232                .optional()
233        })?;
234        Ok(stored_object)
235    }
236
237    /// Fetches a past object by its ID and version.
238    ///
239    /// - If `before_version` is `false`, it looks for the exact version.
240    /// - If `true`, it finds the latest version before the given one.
241    ///
242    /// Searches the requested object version and checkpoint sequence number
243    /// in `objects_version` and fetches the requested object from
244    /// `objects_history`.
245    pub(crate) async fn get_past_object_read(
246        &self,
247        object_id: ObjectID,
248        object_version: SequenceNumber,
249        before_version: bool,
250    ) -> Result<PastObjectRead, IndexerError> {
251        let object_version_num = object_version.value() as i64;
252
253        // Query objects_version to find the requested version and relevant
254        // checkpoint sequence number considering the `before_version` flag.
255        let pool = self.get_pool();
256        let object_id_bytes = object_id.to_vec();
257        let object_version_info: Option<StoredObjectVersion> =
258            run_query_async!(&pool, move |conn| {
259                let mut query = objects_version::dsl::objects_version
260                    .filter(objects_version::object_id.eq(&object_id_bytes))
261                    .into_boxed();
262
263                if before_version {
264                    query = query.filter(objects_version::object_version.lt(object_version_num));
265                } else {
266                    query = query.filter(objects_version::object_version.eq(object_version_num));
267                }
268
269                query
270                    .order_by(objects_version::object_version.desc())
271                    .limit(1)
272                    .first::<StoredObjectVersion>(conn)
273                    .optional()
274            })?;
275
276        let Some(object_version_info) = object_version_info else {
277            // Check if the object ever existed.
278            let pool = self.get_pool();
279            let object_id_bytes = object_id.to_vec();
280            let latest_existing_version: Option<i64> = run_query_async!(&pool, move |conn| {
281                objects_version::dsl::objects_version
282                    .filter(objects_version::object_id.eq(&object_id_bytes))
283                    .order_by(objects_version::object_version.desc())
284                    .select(objects_version::object_version)
285                    .limit(1)
286                    .first::<i64>(conn)
287                    .optional()
288            })?;
289
290            return match latest_existing_version {
291                Some(latest) if object_version_num > latest => Ok(PastObjectRead::VersionTooHigh {
292                    object_id,
293                    asked_version: object_version,
294                    latest_version: SequenceNumber::from(latest as u64),
295                }),
296                Some(_) => Ok(PastObjectRead::VersionNotFound(object_id, object_version)),
297                None => Ok(PastObjectRead::ObjectNotExists(object_id)),
298            };
299        };
300
301        // Query objects_history for the object with the requested version.
302        let history_object = self
303            .get_stored_history_object(
304                object_id,
305                object_version_info.object_version,
306                object_version_info.cp_sequence_number,
307            )
308            .await?;
309
310        match history_object {
311            Some(obj) => {
312                obj.try_into_past_object_read(self.package_resolver.clone())
313                    .await
314            }
315            None => Err(IndexerError::PersistentStorageDataCorruption(format!(
316                "Object version {} not found in objects_history for object {}",
317                object_version_info.object_version, object_id
318            ))),
319        }
320    }
321
322    pub async fn get_stored_history_object(
323        &self,
324        object_id: ObjectID,
325        object_version: i64,
326        checkpoint_sequence_number: i64,
327    ) -> Result<Option<StoredHistoryObject>, IndexerError> {
328        let pool = self.get_pool();
329        let object_id_bytes = object_id.to_vec();
330        run_query_async!(&pool, move |conn| {
331            // Match on the primary key.
332            let query = objects_history::dsl::objects_history
333                .filter(objects_history::checkpoint_sequence_number.eq(checkpoint_sequence_number))
334                .filter(objects_history::object_id.eq(&object_id_bytes))
335                .filter(objects_history::object_version.eq(object_version))
336                .into_boxed();
337
338            query
339                .order_by(objects_history::object_version.desc())
340                .limit(1)
341                .first::<StoredHistoryObject>(conn)
342                .optional()
343        })
344    }
345
346    pub async fn get_package(&self, package_id: ObjectID) -> Result<Package, IndexerError> {
347        let store = self.package_resolver.package_store();
348        let pkg = store
349            .fetch(package_id.into())
350            .await
351            .map_err(|e| {
352                IndexerError::PostgresRead(format!(
353                    "Fail to fetch package from package store with error {:?}",
354                    e
355                ))
356            })?
357            .as_ref()
358            .clone();
359        Ok(pkg)
360    }
361
362    pub fn get_epoch_info_from_db(
363        &self,
364        epoch: Option<EpochId>,
365    ) -> Result<Option<StoredEpochInfo>, IndexerError> {
366        let stored_epoch = run_query!(&self.pool, |conn| {
367            if let Some(epoch) = epoch {
368                epochs::dsl::epochs
369                    .filter(epochs::epoch.eq(epoch as i64))
370                    .first::<StoredEpochInfo>(conn)
371                    .optional()
372            } else {
373                epochs::dsl::epochs
374                    .order_by(epochs::epoch.desc())
375                    .first::<StoredEpochInfo>(conn)
376                    .optional()
377            }
378        })?;
379
380        Ok(stored_epoch)
381    }
382
383    pub fn get_latest_epoch_info_from_db(&self) -> Result<StoredEpochInfo, IndexerError> {
384        let stored_epoch = run_query!(&self.pool, |conn| {
385            epochs::dsl::epochs
386                .order_by(epochs::epoch.desc())
387                .first::<StoredEpochInfo>(conn)
388        })?;
389
390        Ok(stored_epoch)
391    }
392
393    pub fn get_epoch_info(
394        &self,
395        epoch: Option<EpochId>,
396    ) -> Result<Option<EpochInfo>, IndexerError> {
397        let stored_epoch = self.get_epoch_info_from_db(epoch)?;
398
399        let stored_epoch = match stored_epoch {
400            Some(stored_epoch) => stored_epoch,
401            None => return Ok(None),
402        };
403
404        let epoch_info = EpochInfo::try_from(stored_epoch)?;
405        Ok(Some(epoch_info))
406    }
407
408    fn get_epochs_from_db(
409        &self,
410        cursor: Option<u64>,
411        limit: usize,
412        descending_order: bool,
413    ) -> Result<Vec<StoredEpochInfo>, IndexerError> {
414        run_query!(&self.pool, |conn| {
415            let mut boxed_query = epochs::table.into_boxed();
416            if let Some(cursor) = cursor {
417                if descending_order {
418                    boxed_query = boxed_query.filter(epochs::epoch.lt(cursor as i64));
419                } else {
420                    boxed_query = boxed_query.filter(epochs::epoch.gt(cursor as i64));
421                }
422            }
423            if descending_order {
424                boxed_query = boxed_query.order_by(epochs::epoch.desc());
425            } else {
426                boxed_query = boxed_query.order_by(epochs::epoch.asc());
427            }
428
429            boxed_query.limit(limit as i64).load(conn)
430        })
431    }
432
433    pub fn get_epochs(
434        &self,
435        cursor: Option<u64>,
436        limit: usize,
437        descending_order: bool,
438    ) -> Result<Vec<EpochInfo>, IndexerError> {
439        self.get_epochs_from_db(cursor, limit, descending_order)?
440            .into_iter()
441            .map(EpochInfo::try_from)
442            .collect::<Result<Vec<_>, _>>()
443    }
444
445    pub fn get_latest_iota_system_state(&self) -> Result<IotaSystemStateSummary, IndexerError> {
446        let system_state: IotaSystemStateSummary =
447            iota_types::iota_system_state::get_iota_system_state(self)?
448                .into_iota_system_state_summary();
449        Ok(system_state)
450    }
451
452    /// Retrieve the system state data for the given epoch. If no epoch is
453    /// given, it will retrieve the latest epoch's data and return the
454    /// system state. System state of the an epoch is written at the end of
455    /// the epoch, so system state of the current epoch is empty until the
456    /// epoch ends. You can call `get_latest_iota_system_state` for current
457    /// epoch instead.
458    pub fn get_epoch_iota_system_state(
459        &self,
460        epoch: Option<EpochId>,
461    ) -> Result<IotaSystemStateSummary, IndexerError> {
462        let stored_epoch = self.get_epoch_info_from_db(epoch)?;
463        let stored_epoch = match stored_epoch {
464            Some(stored_epoch) => stored_epoch,
465            None => return Err(IndexerError::InvalidArgument("Invalid epoch".into())),
466        };
467
468        let system_state: IotaSystemStateSummary = bcs::from_bytes(&stored_epoch.system_state)
469            .map_err(|_| {
470                IndexerError::PersistentStorageDataCorruption(format!(
471                    "Failed to deserialize `system_state` for epoch {:?}",
472                    epoch,
473                ))
474            })?;
475        Ok(system_state)
476    }
477
478    pub async fn get_chain_identifier_in_blocking_task(
479        &self,
480    ) -> Result<ChainIdentifier, IndexerError> {
481        self.spawn_blocking(|this| this.get_chain_identifier())
482            .await
483    }
484
485    pub fn get_chain_identifier(&self) -> Result<ChainIdentifier, IndexerError> {
486        let stored_chain_identifier = run_query!(&self.pool, |conn| {
487            chain_identifier::dsl::chain_identifier
488                .first::<StoredChainIdentifier>(conn)
489                .optional()
490        })?
491        .ok_or(IndexerError::PostgresRead(
492            "chain identifier not found".to_string(),
493        ))?;
494
495        let checkpoint_digest =
496            CheckpointDigest::try_from(stored_chain_identifier.checkpoint_digest).map_err(|e| {
497                IndexerError::PersistentStorageDataCorruption(format!(
498                    "failed to decode chain identifier with err: {e:?}"
499                ))
500            })?;
501
502        Ok(checkpoint_digest.into())
503    }
504
505    pub fn get_checkpoint_from_db(
506        &self,
507        checkpoint_id: CheckpointId,
508    ) -> Result<Option<StoredCheckpoint>, IndexerError> {
509        let stored_checkpoint = run_query!(&self.pool, |conn| {
510            match checkpoint_id {
511                CheckpointId::SequenceNumber(seq) => checkpoints::dsl::checkpoints
512                    .filter(checkpoints::sequence_number.eq(seq as i64))
513                    .first::<StoredCheckpoint>(conn)
514                    .optional(),
515                CheckpointId::Digest(digest) => checkpoints::dsl::checkpoints
516                    .filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec()))
517                    .first::<StoredCheckpoint>(conn)
518                    .optional(),
519            }
520        })?;
521
522        Ok(stored_checkpoint)
523    }
524
525    pub fn get_latest_checkpoint_from_db(&self) -> Result<StoredCheckpoint, IndexerError> {
526        let stored_checkpoint = run_query!(&self.pool, |conn| {
527            checkpoints::dsl::checkpoints
528                .order_by(checkpoints::sequence_number.desc())
529                .first::<StoredCheckpoint>(conn)
530        })?;
531
532        Ok(stored_checkpoint)
533    }
534
535    pub fn get_checkpoint(
536        &self,
537        checkpoint_id: CheckpointId,
538    ) -> Result<Option<iota_json_rpc_types::Checkpoint>, IndexerError> {
539        let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id)? {
540            Some(stored_checkpoint) => stored_checkpoint,
541            None => return Ok(None),
542        };
543
544        let checkpoint = iota_json_rpc_types::Checkpoint::try_from(stored_checkpoint)?;
545        Ok(Some(checkpoint))
546    }
547
548    pub fn get_latest_checkpoint(&self) -> Result<iota_json_rpc_types::Checkpoint, IndexerError> {
549        let stored_checkpoint = self.get_latest_checkpoint_from_db()?;
550
551        iota_json_rpc_types::Checkpoint::try_from(stored_checkpoint)
552    }
553
554    pub async fn get_latest_checkpoint_timestamp_ms_in_blocking_task(
555        &self,
556    ) -> Result<u64, IndexerError> {
557        self.spawn_blocking(|this| this.get_latest_checkpoint_timestamp_ms())
558            .await
559    }
560
561    pub fn get_latest_checkpoint_timestamp_ms(&self) -> Result<u64, IndexerError> {
562        Ok(self.get_latest_checkpoint()?.timestamp_ms)
563    }
564
565    fn get_checkpoints_from_db(
566        &self,
567        cursor: Option<u64>,
568        limit: usize,
569        descending_order: bool,
570    ) -> Result<Vec<StoredCheckpoint>, IndexerError> {
571        run_query!(&self.pool, |conn| {
572            let mut boxed_query = checkpoints::table.into_boxed();
573            if let Some(cursor) = cursor {
574                if descending_order {
575                    boxed_query =
576                        boxed_query.filter(checkpoints::sequence_number.lt(cursor as i64));
577                } else {
578                    boxed_query =
579                        boxed_query.filter(checkpoints::sequence_number.gt(cursor as i64));
580                }
581            }
582            if descending_order {
583                boxed_query = boxed_query.order_by(checkpoints::sequence_number.desc());
584            } else {
585                boxed_query = boxed_query.order_by(checkpoints::sequence_number.asc());
586            }
587
588            boxed_query
589                .limit(limit as i64)
590                .load::<StoredCheckpoint>(conn)
591        })
592    }
593
594    pub fn get_checkpoints(
595        &self,
596        cursor: Option<u64>,
597        limit: usize,
598        descending_order: bool,
599    ) -> Result<Vec<iota_json_rpc_types::Checkpoint>, IndexerError> {
600        self.get_checkpoints_from_db(cursor, limit, descending_order)?
601            .into_iter()
602            .map(iota_json_rpc_types::Checkpoint::try_from)
603            .collect()
604    }
605
606    fn get_transaction_effects_with_digest(
607        &self,
608        digest: TransactionDigest,
609    ) -> Result<IotaTransactionBlockEffects, IndexerError> {
610        let stored_txn: StoredTransaction = run_query!(&self.pool, |conn| {
611            transactions::table
612                .filter(
613                    transactions::tx_sequence_number
614                        .nullable()
615                        .eq(tx_digests::table
616                            .select(tx_digests::tx_sequence_number)
617                            // we filter the tx_digests table because it is indexed by digest,
618                            // transactions table is not
619                            .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
620                            .single_value()),
621                )
622                .first::<StoredTransaction>(conn)
623        })?;
624
625        stored_txn.try_into_iota_transaction_effects()
626    }
627
628    fn get_transaction_effects_with_sequence_number(
629        &self,
630        sequence_number: i64,
631    ) -> Result<IotaTransactionBlockEffects, IndexerError> {
632        let stored_txn: StoredTransaction = run_query!(&self.pool, |conn| {
633            transactions::table
634                .filter(transactions::tx_sequence_number.eq(sequence_number))
635                .first::<StoredTransaction>(conn)
636        })?;
637
638        stored_txn.try_into_iota_transaction_effects()
639    }
640
641    fn multi_get_transactions(
642        &self,
643        digests: &[TransactionDigest],
644    ) -> Result<Vec<StoredTransaction>, IndexerError> {
645        let digests = digests
646            .iter()
647            .map(|digest| digest.inner().to_vec())
648            .collect::<HashSet<_>>();
649        let checkpointed_txs = run_query!(&self.pool, |conn| {
650            transactions::table
651                .inner_join(
652                    tx_digests::table
653                        .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
654                )
655                // we filter the tx_digests table because it is indexed by digest,
656                // transactions table is not
657                .filter(tx_digests::tx_digest.eq_any(&digests))
658                .select(StoredTransaction::as_select())
659                .load::<StoredTransaction>(conn)
660        })?;
661        if checkpointed_txs.len() == digests.len() {
662            return Ok(checkpointed_txs);
663        }
664        let mut missing_digests = digests;
665        for tx in &checkpointed_txs {
666            missing_digests.remove(&tx.transaction_digest);
667        }
668        let optimistic_txs = run_query!(&self.pool, |conn| {
669            optimistic_transactions::table
670                .inner_join(
671                    tx_insertion_order::table.on(optimistic_transactions::insertion_order
672                        .eq(tx_insertion_order::insertion_order)),
673                )
674                // we filter the tx_insertion_order table because it is indexed by digest,
675                // optimistic_transactions table is not
676                .filter(tx_insertion_order::tx_digest.eq_any(missing_digests))
677                .select(OptimisticTransaction::as_select())
678                .load::<OptimisticTransaction>(conn)
679        })?;
680        Ok(checkpointed_txs
681            .into_iter()
682            .chain(optimistic_txs.into_iter().map(Into::into))
683            .collect())
684    }
685
686    async fn multi_get_transactions_in_blocking_task(
687        &self,
688        digests: Vec<TransactionDigest>,
689    ) -> Result<Vec<StoredTransaction>, IndexerError> {
690        self.spawn_blocking(move |this| this.multi_get_transactions(&digests))
691            .await
692    }
693
694    /// This method tries to transform [`StoredTransaction`] values
695    /// into transaction blocks, without any other modification.
696    async fn stored_transaction_to_transaction_block(
697        &self,
698        stored_txes: Vec<StoredTransaction>,
699        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
700    ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
701        let mut tx_block_responses_futures = vec![];
702        for stored_tx in stored_txes {
703            let package_resolver_clone = self.package_resolver();
704            let options_clone = options.clone();
705            tx_block_responses_futures.push(tokio::task::spawn(
706                stored_tx.try_into_iota_transaction_block_response(
707                    options_clone,
708                    package_resolver_clone,
709                ),
710            ));
711        }
712
713        let tx_blocks = futures::future::join_all(tx_block_responses_futures)
714            .await
715            .into_iter()
716            .collect::<Result<Vec<_>, _>>()
717            .tap_err(|e| tracing::error!("Failed to join all tx block futures: {}", e))?
718            .into_iter()
719            .collect::<Result<Vec<_>, _>>()
720            .tap_err(|e| tracing::error!("Failed to collect tx block futures: {}", e))?;
721        Ok(tx_blocks)
722    }
723
724    fn multi_get_transactions_with_sequence_numbers(
725        &self,
726        tx_sequence_numbers: Vec<i64>,
727        // Some(true) for desc, Some(false) for asc, None for undefined order
728        is_descending: Option<bool>,
729    ) -> Result<Vec<StoredTransaction>, IndexerError> {
730        let mut query = transactions::table
731            .filter(transactions::tx_sequence_number.eq_any(tx_sequence_numbers))
732            .into_boxed();
733        match is_descending {
734            Some(true) => {
735                query = query.order(transactions::dsl::tx_sequence_number.desc());
736            }
737            Some(false) => {
738                query = query.order(transactions::dsl::tx_sequence_number.asc());
739            }
740            None => (),
741        }
742        run_query!(&self.pool, |conn| query.load::<StoredTransaction>(conn))
743    }
744
745    pub async fn get_owned_objects_in_blocking_task(
746        &self,
747        address: IotaAddress,
748        filter: Option<IotaObjectDataFilter>,
749        cursor: Option<ObjectID>,
750        limit: usize,
751    ) -> Result<Vec<StoredObject>, IndexerError> {
752        self.spawn_blocking(move |this| this.get_owned_objects_impl(address, filter, cursor, limit))
753            .await
754    }
755
756    fn get_owned_objects_impl(
757        &self,
758        address: IotaAddress,
759        filter: Option<IotaObjectDataFilter>,
760        cursor: Option<ObjectID>,
761        limit: usize,
762    ) -> Result<Vec<StoredObject>, IndexerError> {
763        run_query!(&self.pool, |conn| {
764            let mut query = objects::dsl::objects
765                .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
766                .filter(objects::dsl::owner_id.eq(address.to_vec()))
767                .order(objects::dsl::object_id.asc())
768                .limit(limit as i64)
769                .into_boxed();
770            if let Some(filter) = filter {
771                match filter {
772                    IotaObjectDataFilter::StructType(struct_tag) => {
773                        let object_type =
774                            struct_tag.to_canonical_string(/* with_prefix */ true);
775                        query =
776                            query.filter(objects::object_type.like(format!("{}%", object_type)));
777                    }
778                    IotaObjectDataFilter::MatchAny(filters) => {
779                        let mut condition = "(".to_string();
780                        for (i, filter) in filters.iter().enumerate() {
781                            if let IotaObjectDataFilter::StructType(struct_tag) = filter {
782                                let object_type =
783                                    struct_tag.to_canonical_string(/* with_prefix */ true);
784                                if i == 0 {
785                                    condition +=
786                                        format!("objects.object_type LIKE '{}%'", object_type)
787                                            .as_str();
788                                } else {
789                                    condition +=
790                                        format!(" OR objects.object_type LIKE '{}%'", object_type)
791                                            .as_str();
792                                }
793                            } else {
794                                return Err(IndexerError::InvalidArgument(
795                                    "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
796                                ));
797                            }
798                        }
799                        condition += ")";
800                        query = query.filter(sql::<Bool>(&condition));
801                    }
802                    IotaObjectDataFilter::MatchNone(filters) => {
803                        for filter in filters {
804                            if let IotaObjectDataFilter::StructType(struct_tag) = filter {
805                                let object_type =
806                                    struct_tag.to_canonical_string(/* with_prefix */ true);
807                                query = query.filter(
808                                    objects::object_type.not_like(format!("{}%", object_type)),
809                                );
810                            } else {
811                                return Err(IndexerError::InvalidArgument(
812                                    "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
813                                ));
814                            }
815                        }
816                    }
817                    _ => {
818                        return Err(IndexerError::InvalidArgument(
819                            "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
820                        ));
821                    }
822                }
823            }
824
825            if let Some(object_cursor) = cursor {
826                query = query.filter(objects::dsl::object_id.gt(object_cursor.to_vec()));
827            }
828
829            query
830                .load::<StoredObject>(conn)
831                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
832        })
833    }
834
835    fn get_singleton_object(&self, struct_tag: StructTag) -> Result<Option<Object>, IndexerError> {
836        let object_type = struct_tag.to_canonical_string(/* with_prefix */ true);
837
838        run_query!(&self.pool, |conn| {
839            let object = match objects::dsl::objects
840                .filter(objects::object_type_package.eq(struct_tag.address.to_vec()))
841                .filter(objects::object_type_module.eq(struct_tag.module.to_string()))
842                .filter(objects::object_type_name.eq(struct_tag.name.to_string()))
843                .filter(objects::object_type.eq(object_type))
844                .first::<StoredObject>(conn)
845                .optional()
846                .map_err(|e| IndexerError::PostgresRead(e.to_string()))?
847            {
848                Some(object) => object,
849                None => return Ok::<Option<Object>, IndexerError>(None),
850            }
851            .try_into()?;
852            Ok(Some(object))
853        })
854    }
855
856    fn filter_object_id_with_type(
857        &self,
858        object_ids: Vec<ObjectID>,
859        object_type: String,
860    ) -> Result<Vec<ObjectID>, IndexerError> {
861        let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
862        let filtered_ids = run_query!(&self.pool, |conn| {
863            objects::dsl::objects
864                .filter(objects::object_id.eq_any(object_ids))
865                .filter(objects::object_type.eq(object_type))
866                .select(objects::object_id)
867                .load::<Vec<u8>>(conn)
868        })?;
869
870        filtered_ids
871            .into_iter()
872            .map(|id| {
873                ObjectID::from_bytes(id.clone()).map_err(|_e| {
874                    IndexerError::PersistentStorageDataCorruption(format!(
875                        "Can't convert {:?} to ObjectID",
876                        id,
877                    ))
878                })
879            })
880            .collect::<Result<Vec<_>, _>>()
881    }
882
883    pub async fn multi_get_objects_in_blocking_task(
884        &self,
885        object_ids: Vec<ObjectID>,
886    ) -> Result<Vec<StoredObject>, IndexerError> {
887        self.spawn_blocking(move |this| this.multi_get_objects_impl(object_ids))
888            .await
889    }
890
891    fn multi_get_objects_impl(
892        &self,
893        object_ids: Vec<ObjectID>,
894    ) -> Result<Vec<StoredObject>, IndexerError> {
895        let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
896        run_query!(&self.pool, |conn| {
897            objects::dsl::objects
898                .filter(objects::object_id.eq_any(object_ids))
899                .load::<StoredObject>(conn)
900        })
901    }
902
903    async fn query_transaction_blocks_by_checkpoint_impl(
904        &self,
905        checkpoint_seq: u64,
906        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
907        cursor_tx_seq: Option<i64>,
908        limit: usize,
909        is_descending: bool,
910    ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
911        let pool = self.get_pool();
912        let tx_range: (i64, i64) = run_query_async!(&pool, move |conn| {
913            pruner_cp_watermark::dsl::pruner_cp_watermark
914                .select((
915                    pruner_cp_watermark::min_tx_sequence_number,
916                    pruner_cp_watermark::max_tx_sequence_number,
917                ))
918                // we filter the pruner_cp_watermark table because it is indexed by
919                // checkpoint_sequence_number, transactions is not
920                .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint_seq as i64))
921                .first::<(i64, i64)>(conn)
922        })?;
923
924        let mut query = transactions::dsl::transactions
925            .filter(transactions::tx_sequence_number.between(tx_range.0, tx_range.1))
926            .into_boxed();
927
928        // Translate transaction digest cursor to tx sequence number
929        if let Some(cursor_tx_seq) = cursor_tx_seq {
930            if is_descending {
931                query = query.filter(transactions::dsl::tx_sequence_number.lt(cursor_tx_seq));
932            } else {
933                query = query.filter(transactions::dsl::tx_sequence_number.gt(cursor_tx_seq));
934            }
935        }
936        if is_descending {
937            query = query.order(transactions::dsl::tx_sequence_number.desc());
938        } else {
939            query = query.order(transactions::dsl::tx_sequence_number.asc());
940        }
941        let pool = self.get_pool();
942        let stored_txes = run_query_async!(&pool, move |conn| query
943            .limit(limit as i64)
944            .load::<StoredTransaction>(conn))?;
945
946        self.stored_transaction_to_transaction_block(stored_txes, options)
947            .await
948    }
949
950    pub async fn query_transaction_blocks_in_blocking_task(
951        &self,
952        filter: Option<TransactionFilter>,
953        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
954        cursor: Option<TransactionDigest>,
955        limit: usize,
956        is_descending: bool,
957    ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
958        self.query_transaction_blocks_impl(filter, options, cursor, limit, is_descending)
959            .await
960    }
961
962    async fn query_transaction_blocks_impl(
963        &self,
964        filter: Option<TransactionFilter>,
965        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
966        cursor: Option<TransactionDigest>,
967        limit: usize,
968        is_descending: bool,
969    ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
970        let cursor_tx_seq = if let Some(cursor) = cursor {
971            let pool = self.get_pool();
972            let tx_seq = run_query_async!(&pool, move |conn| {
973                tx_digests::table
974                    .select(tx_digests::tx_sequence_number)
975                    // we filter the tx_digests table because it is indexed by digest,
976                    // transactions (and other tables) are not
977                    .filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec()))
978                    .first::<i64>(conn)
979            })?;
980            Some(tx_seq)
981        } else {
982            None
983        };
984        let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
985            if is_descending {
986                format!("AND {TX_SEQUENCE_NUMBER_STR} < {}", cursor_tx_seq)
987            } else {
988                format!("AND {TX_SEQUENCE_NUMBER_STR} > {}", cursor_tx_seq)
989            }
990        } else {
991            "".to_string()
992        };
993        let order_str = if is_descending { "DESC" } else { "ASC" };
994        let (table_name, main_where_clause) = match filter {
995            // Processed above
996            Some(TransactionFilter::Checkpoint(seq)) => {
997                return self
998                    .query_transaction_blocks_by_checkpoint_impl(
999                        seq,
1000                        options,
1001                        cursor_tx_seq,
1002                        limit,
1003                        is_descending,
1004                    )
1005                    .await;
1006            }
1007            // FIXME: sanitize module & function
1008            Some(TransactionFilter::MoveFunction {
1009                package,
1010                module,
1011                function,
1012            }) => {
1013                let package = Hex::encode(package.to_vec());
1014                match (module, function) {
1015                    (Some(module), Some(function)) => (
1016                        "tx_calls_fun".into(),
1017                        format!(
1018                            "package = '\\x{}'::bytea AND module = '{}' AND func = '{}'",
1019                            package, module, function
1020                        ),
1021                    ),
1022                    (Some(module), None) => (
1023                        "tx_calls_mod".into(),
1024                        format!(
1025                            "package = '\\x{}'::bytea AND module = '{}'",
1026                            package, module
1027                        ),
1028                    ),
1029                    (None, Some(_)) => {
1030                        return Err(IndexerError::InvalidArgument(
1031                            "Function cannot be present without Module.".into(),
1032                        ));
1033                    }
1034                    (None, None) => (
1035                        "tx_calls_pkg".into(),
1036                        format!("package = '\\x{}'::bytea", package),
1037                    ),
1038                }
1039            }
1040            Some(TransactionFilter::InputObject(object_id)) => {
1041                let object_id = Hex::encode(object_id.to_vec());
1042                (
1043                    "tx_input_objects".into(),
1044                    format!("object_id = '\\x{}'::bytea", object_id),
1045                )
1046            }
1047            Some(TransactionFilter::ChangedObject(object_id)) => {
1048                let object_id = Hex::encode(object_id.to_vec());
1049                (
1050                    "tx_changed_objects".into(),
1051                    format!("object_id = '\\x{}'::bytea", object_id),
1052                )
1053            }
1054            Some(TransactionFilter::FromAddress(from_address)) => {
1055                let from_address = Hex::encode(from_address.to_vec());
1056                (
1057                    "tx_senders".into(),
1058                    format!("sender = '\\x{}'::bytea", from_address),
1059                )
1060            }
1061            Some(TransactionFilter::ToAddress(to_address)) => {
1062                let to_address = Hex::encode(to_address.to_vec());
1063                (
1064                    "tx_recipients".into(),
1065                    format!("recipient = '\\x{}'::bytea", to_address),
1066                )
1067            }
1068            Some(TransactionFilter::FromAndToAddress { from, to }) => {
1069                let from_address = Hex::encode(from.to_vec());
1070                let to_address = Hex::encode(to.to_vec());
1071                // Need to remove ambiguities for tx_sequence_number column
1072                let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
1073                    if is_descending {
1074                        format!(
1075                            "AND tx_senders.{TX_SEQUENCE_NUMBER_STR} < {}",
1076                            cursor_tx_seq
1077                        )
1078                    } else {
1079                        format!(
1080                            "AND tx_senders.{TX_SEQUENCE_NUMBER_STR} > {}",
1081                            cursor_tx_seq
1082                        )
1083                    }
1084                } else {
1085                    "".to_string()
1086                };
1087                let inner_query = format!(
1088                    "(SELECT tx_senders.{TX_SEQUENCE_NUMBER_STR} \
1089                    FROM tx_senders \
1090                    JOIN tx_recipients \
1091                    ON tx_senders.{TX_SEQUENCE_NUMBER_STR} = tx_recipients.{TX_SEQUENCE_NUMBER_STR} \
1092                    WHERE tx_senders.sender = '\\x{}'::BYTEA \
1093                    AND tx_recipients.recipient = '\\x{}'::BYTEA \
1094                    {} \
1095                    ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1096                    LIMIT {}) AS inner_query
1097                    ",
1098                    from_address,
1099                    to_address,
1100                    cursor_clause,
1101                    order_str,
1102                    limit,
1103                );
1104                (inner_query, "1 = 1".into())
1105            }
1106            Some(TransactionFilter::FromOrToAddress { addr }) => {
1107                let address = Hex::encode(addr.to_vec());
1108                let inner_query = format!(
1109                    "( \
1110                        ( \
1111                            SELECT {TX_SEQUENCE_NUMBER_STR} FROM tx_senders \
1112                            WHERE sender = '\\x{}'::BYTEA {} \
1113                            ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1114                            LIMIT {} \
1115                        ) \
1116                        UNION \
1117                        ( \
1118                            SELECT {TX_SEQUENCE_NUMBER_STR} FROM tx_recipients \
1119                            WHERE recipient = '\\x{}'::BYTEA {} \
1120                            ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1121                            LIMIT {} \
1122                        ) \
1123                    ) AS combined",
1124                    address,
1125                    cursor_clause,
1126                    order_str,
1127                    limit,
1128                    address,
1129                    cursor_clause,
1130                    order_str,
1131                    limit,
1132                );
1133                (inner_query, "1 = 1".into())
1134            }
1135            Some(TransactionFilter::TransactionKind(kind)) => {
1136                // The `SystemTransaction` variant can be used to filter for all types of system
1137                // transactions.
1138                if kind == IotaTransactionKind::SystemTransaction {
1139                    ("tx_kinds".into(), "tx_kind != 1".to_string())
1140                } else {
1141                    ("tx_kinds".into(), format!("tx_kind = {}", kind as u8))
1142                }
1143            }
1144            Some(TransactionFilter::TransactionKindIn(kind_vec)) => {
1145                if kind_vec.is_empty() {
1146                    return Err(IndexerError::InvalidArgument(
1147                        "no transaction kind provided".into(),
1148                    ));
1149                }
1150
1151                let mut has_system_transaction = false;
1152                let mut has_programmable_transaction = false;
1153                let mut other_kinds = HashSet::new();
1154
1155                for kind in kind_vec.iter() {
1156                    match kind {
1157                        IotaTransactionKind::SystemTransaction => has_system_transaction = true,
1158                        IotaTransactionKind::ProgrammableTransaction => {
1159                            has_programmable_transaction = true
1160                        }
1161                        other => {
1162                            other_kinds.insert(*other as u8);
1163                        }
1164                    }
1165                }
1166
1167                let query = if has_system_transaction {
1168                    // Case: If `SystemTransaction` is present but `ProgrammableTransaction` is not,
1169                    // we need to filter out `ProgrammableTransaction`.
1170                    if !has_programmable_transaction {
1171                        "tx_kind != 1".to_string()
1172                    } else {
1173                        // No filter applied if both exist
1174                        "1 = 1".to_string()
1175                    }
1176                } else {
1177                    // Case: `ProgrammableTransaction` is present
1178                    if has_programmable_transaction {
1179                        other_kinds.insert(IotaTransactionKind::ProgrammableTransaction as u8);
1180                    }
1181
1182                    if other_kinds.is_empty() {
1183                        // If there's nothing to filter on, return an empty query
1184                        "1 = 1".to_string()
1185                    } else {
1186                        let mut query = String::from("tx_kind IN (");
1187                        query.push_str(
1188                            &other_kinds
1189                                .iter()
1190                                .map(ToString::to_string)
1191                                .collect::<Vec<_>>()
1192                                .join(", "),
1193                        );
1194                        query.push(')');
1195                        query
1196                    }
1197                };
1198
1199                ("tx_kinds".into(), query)
1200            }
1201            None => {
1202                // apply no filter
1203                ("transactions".into(), "1 = 1".into())
1204            }
1205        };
1206
1207        let query = format!(
1208            "SELECT {TX_SEQUENCE_NUMBER_STR} FROM {} WHERE {} {} ORDER BY {TX_SEQUENCE_NUMBER_STR} {} LIMIT {}",
1209            table_name, main_where_clause, cursor_clause, order_str, limit,
1210        );
1211
1212        tracing::debug!("query transaction blocks: {}", query);
1213        let pool = self.get_pool();
1214        let tx_sequence_numbers = run_query_async!(&pool, move |conn| {
1215            diesel::sql_query(query.clone()).load::<TxSequenceNumber>(conn)
1216        })?
1217        .into_iter()
1218        .map(|tsn| tsn.tx_sequence_number)
1219        .collect::<Vec<i64>>();
1220        self.multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
1221            tx_sequence_numbers,
1222            options,
1223            Some(is_descending),
1224        )
1225        .await
1226    }
1227
1228    async fn multi_get_transaction_block_response_in_blocking_task_impl(
1229        &self,
1230        digests: &[TransactionDigest],
1231        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1232    ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1233        let stored_txes = self
1234            .multi_get_transactions_in_blocking_task(digests.to_vec())
1235            .await?;
1236        self.stored_transaction_to_transaction_block(stored_txes, options)
1237            .await
1238    }
1239
1240    async fn multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
1241        &self,
1242        tx_sequence_numbers: Vec<i64>,
1243        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1244        // Some(true) for desc, Some(false) for asc, None for undefined order
1245        is_descending: Option<bool>,
1246    ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1247        let stored_txes: Vec<StoredTransaction> = self
1248            .spawn_blocking(move |this| {
1249                this.multi_get_transactions_with_sequence_numbers(
1250                    tx_sequence_numbers,
1251                    is_descending,
1252                )
1253            })
1254            .await?;
1255        self.stored_transaction_to_transaction_block(stored_txes, options)
1256            .await
1257    }
1258
1259    pub async fn multi_get_transaction_block_response_in_blocking_task(
1260        &self,
1261        digests: Vec<TransactionDigest>,
1262        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1263    ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1264        self.multi_get_transaction_block_response_in_blocking_task_impl(&digests, options)
1265            .await
1266    }
1267
1268    pub async fn get_transaction_events_in_blocking_task(
1269        &self,
1270        digest: TransactionDigest,
1271    ) -> Result<Vec<iota_json_rpc_types::IotaEvent>, IndexerError> {
1272        let pool = self.get_pool();
1273        let (timestamp_ms, serialized_events) = run_query_async!(&pool, move |conn| {
1274            transactions::table
1275                .filter(
1276                    transactions::tx_sequence_number
1277                        .nullable()
1278                        .eq(tx_digests::table
1279                            .select(tx_digests::tx_sequence_number)
1280                            // we filter the tx_digests table because it is indexed by digest,
1281                            // transactions table is not
1282                            .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
1283                            .single_value()),
1284                )
1285                .select((transactions::timestamp_ms, transactions::events))
1286                .first::<(i64, StoredTransactionEvents)>(conn)
1287        })?;
1288
1289        let events = stored_events_to_events(serialized_events)?;
1290        let tx_events = TransactionEvents { data: events };
1291
1292        let iota_tx_events = tx_events_to_iota_tx_events(
1293            tx_events,
1294            self.package_resolver(),
1295            digest,
1296            timestamp_ms as u64,
1297        )
1298        .await?;
1299        Ok(iota_tx_events.map_or(vec![], |ste| ste.data))
1300    }
1301
1302    async fn query_events_by_tx_digest(
1303        &self,
1304        tx_digest: TransactionDigest,
1305        cursor: Option<EventID>,
1306        limit: usize,
1307        descending_order: bool,
1308    ) -> IndexerResult<Vec<IotaEvent>> {
1309        let mut query = events::table.into_boxed();
1310
1311        if let Some(cursor) = cursor {
1312            if cursor.tx_digest != tx_digest {
1313                return Err(IndexerError::InvalidArgument(
1314                    "Cursor tx_digest does not match the tx_digest in the query.".into(),
1315                ));
1316            }
1317            if descending_order {
1318                query = query.filter(events::event_sequence_number.lt(cursor.event_seq as i64));
1319            } else {
1320                query = query.filter(events::event_sequence_number.gt(cursor.event_seq as i64));
1321            }
1322        } else if descending_order {
1323            query = query.filter(events::event_sequence_number.le(i64::MAX));
1324        } else {
1325            query = query.filter(events::event_sequence_number.ge(0));
1326        };
1327
1328        if descending_order {
1329            query = query.order(events::event_sequence_number.desc());
1330        } else {
1331            query = query.order(events::event_sequence_number.asc());
1332        }
1333
1334        query = query.filter(
1335            events::tx_sequence_number.nullable().eq(tx_digests::table
1336                .select(tx_digests::tx_sequence_number)
1337                // we filter the tx_digests table because it is indexed by digest,
1338                // events table is not
1339                .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1340                .single_value()),
1341        );
1342
1343        let pool = self.get_pool();
1344        let stored_events = run_query_async!(&pool, move |conn| {
1345            query.limit(limit as i64).load::<StoredEvent>(conn)
1346        })?;
1347
1348        let mut iota_event_futures = vec![];
1349        for stored_event in stored_events {
1350            iota_event_futures.push(tokio::task::spawn(
1351                stored_event.try_into_iota_event(self.package_resolver.clone()),
1352            ));
1353        }
1354
1355        let iota_events = futures::future::join_all(iota_event_futures)
1356            .await
1357            .into_iter()
1358            .collect::<Result<Vec<_>, _>>()
1359            .tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))?
1360            .into_iter()
1361            .collect::<Result<Vec<_>, _>>()
1362            .tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?;
1363        Ok(iota_events)
1364    }
1365
1366    pub async fn query_events_in_blocking_task(
1367        &self,
1368        filter: EventFilter,
1369        cursor: Option<EventID>,
1370        limit: usize,
1371        descending_order: bool,
1372    ) -> IndexerResult<Vec<IotaEvent>> {
1373        let pool = self.get_pool();
1374        let (tx_seq, event_seq) = if let Some(cursor) = cursor {
1375            let EventID {
1376                tx_digest,
1377                event_seq,
1378            } = cursor;
1379            let tx_seq = run_query_async!(&pool, move |conn| {
1380                transactions::dsl::transactions
1381                    .select(transactions::tx_sequence_number)
1382                    .filter(
1383                        transactions::tx_sequence_number
1384                            .nullable()
1385                            .eq(tx_digests::table
1386                                .select(tx_digests::tx_sequence_number)
1387                                // we filter the tx_digests table because it is indexed by digest,
1388                                // transactions table is not
1389                                .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1390                                .single_value()),
1391                    )
1392                    .first::<i64>(conn)
1393            })?;
1394            (tx_seq, event_seq as i64)
1395        } else if descending_order {
1396            let max_tx_seq = i64::MAX;
1397            let max_event_seq = i64::MAX;
1398            (max_tx_seq, max_event_seq)
1399        } else {
1400            (-1, 0)
1401        };
1402
1403        let query = if let EventFilter::Sender(sender) = &filter {
1404            // Need to remove ambiguities for tx_sequence_number column
1405            let cursor_clause = if descending_order {
1406                format!(
1407                    "(e.{TX_SEQUENCE_NUMBER_STR} < {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} < {}))",
1408                    tx_seq, tx_seq, event_seq
1409                )
1410            } else {
1411                format!(
1412                    "(e.{TX_SEQUENCE_NUMBER_STR} > {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} > {}))",
1413                    tx_seq, tx_seq, event_seq
1414                )
1415            };
1416            let order_clause = if descending_order {
1417                format!("e.{TX_SEQUENCE_NUMBER_STR} DESC, e.{EVENT_SEQUENCE_NUMBER_STR} DESC")
1418            } else {
1419                format!("e.{TX_SEQUENCE_NUMBER_STR} ASC, e.{EVENT_SEQUENCE_NUMBER_STR} ASC")
1420            };
1421            format!(
1422                "( \
1423                    SELECT *
1424                    FROM tx_senders s
1425                    JOIN events e
1426                    ON e.tx_sequence_number = s.tx_sequence_number
1427                    AND s.sender = '\\x{}'::bytea
1428                    WHERE {} \
1429                    ORDER BY {} \
1430                    LIMIT {}
1431                )",
1432                Hex::encode(sender.to_vec()),
1433                cursor_clause,
1434                order_clause,
1435                limit,
1436            )
1437        } else if let EventFilter::Transaction(tx_digest) = filter {
1438            return self
1439                .query_events_by_tx_digest(tx_digest, cursor, limit, descending_order)
1440                .await;
1441        } else {
1442            let main_where_clause = match filter {
1443                EventFilter::Package(package_id) => {
1444                    format!("package = '\\x{}'::bytea", package_id.to_hex())
1445                }
1446                EventFilter::MoveModule { package, module } => {
1447                    format!(
1448                        "package = '\\x{}'::bytea AND module = '{}'",
1449                        package.to_hex(),
1450                        module,
1451                    )
1452                }
1453                EventFilter::MoveEventType(struct_tag) => {
1454                    let formatted_struct_tag = struct_tag.to_canonical_string(true);
1455                    format!("event_type = '{formatted_struct_tag}'")
1456                }
1457                EventFilter::MoveEventModule { package, module } => {
1458                    let package_module_prefix = format!("{}::{}", package.to_hex_literal(), module);
1459                    format!("event_type LIKE '{package_module_prefix}::%'")
1460                }
1461                EventFilter::Sender(_) => {
1462                    // Processed above
1463                    unreachable!()
1464                }
1465                EventFilter::Transaction(_) => {
1466                    // Processed above
1467                    unreachable!()
1468                }
1469                EventFilter::MoveEventField { .. }
1470                | EventFilter::All(_)
1471                | EventFilter::Any(_)
1472                | EventFilter::And(_, _)
1473                | EventFilter::Or(_, _)
1474                | EventFilter::TimeRange { .. } => {
1475                    return Err(IndexerError::NotSupported(
1476                        "This type of EventFilter is not supported.".into(),
1477                    ));
1478                }
1479            };
1480
1481            let cursor_clause = if descending_order {
1482                format!(
1483                    "AND ({TX_SEQUENCE_NUMBER_STR} < {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} < {}))",
1484                    tx_seq, tx_seq, event_seq
1485                )
1486            } else {
1487                format!(
1488                    "AND ({TX_SEQUENCE_NUMBER_STR} > {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} > {}))",
1489                    tx_seq, tx_seq, event_seq
1490                )
1491            };
1492            let order_clause = if descending_order {
1493                format!("{TX_SEQUENCE_NUMBER_STR} DESC, {EVENT_SEQUENCE_NUMBER_STR} DESC")
1494            } else {
1495                format!("{TX_SEQUENCE_NUMBER_STR} ASC, {EVENT_SEQUENCE_NUMBER_STR} ASC")
1496            };
1497
1498            format!(
1499                "
1500                    SELECT * FROM events \
1501                    WHERE {} {} \
1502                    ORDER BY {} \
1503                    LIMIT {}
1504                ",
1505                main_where_clause, cursor_clause, order_clause, limit,
1506            )
1507        };
1508        tracing::debug!("query events: {}", query);
1509        let pool = self.get_pool();
1510        let stored_events = run_query_async!(&pool, move |conn| diesel::sql_query(query)
1511            .load::<StoredEvent>(conn))?;
1512
1513        let mut iota_event_futures = vec![];
1514        for stored_event in stored_events {
1515            iota_event_futures.push(tokio::task::spawn(
1516                stored_event.try_into_iota_event(self.package_resolver.clone()),
1517            ));
1518        }
1519
1520        let iota_events = futures::future::join_all(iota_event_futures)
1521            .await
1522            .into_iter()
1523            .collect::<Result<Vec<_>, _>>()
1524            .tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))?
1525            .into_iter()
1526            .collect::<Result<Vec<_>, _>>()
1527            .tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?;
1528        Ok(iota_events)
1529    }
1530
1531    pub async fn get_dynamic_fields_in_blocking_task(
1532        &self,
1533        parent_object_id: ObjectID,
1534        cursor: Option<ObjectID>,
1535        limit: usize,
1536    ) -> Result<Vec<DynamicFieldInfo>, IndexerError> {
1537        let stored_objects = self
1538            .spawn_blocking(move |this| {
1539                this.get_dynamic_fields_raw(parent_object_id, cursor, limit)
1540            })
1541            .await?;
1542
1543        let mut df_futures = vec![];
1544        let indexer_reader_arc = Arc::new(self.clone());
1545        for stored_object in stored_objects {
1546            let indexer_reader_arc_clone = Arc::clone(&indexer_reader_arc);
1547            df_futures.push(tokio::task::spawn(async move {
1548                indexer_reader_arc_clone
1549                    .try_create_dynamic_field_info(stored_object)
1550                    .await
1551            }));
1552        }
1553        let df_infos = futures::future::try_join_all(df_futures)
1554            .await
1555            .tap_err(|e| tracing::error!("Error joining DF futures: {:?}", e))?
1556            .into_iter()
1557            .collect::<Result<Vec<_>, _>>()
1558            .tap_err(|e| {
1559                tracing::error!(
1560                    "Error calling DF try_create_dynamic_field_info function: {:?}",
1561                    e
1562                )
1563            })?
1564            .into_iter()
1565            .flatten()
1566            .collect::<Vec<_>>();
1567        Ok(df_infos)
1568    }
1569
1570    pub async fn get_dynamic_fields_raw_in_blocking_task(
1571        &self,
1572        parent_object_id: ObjectID,
1573        cursor: Option<ObjectID>,
1574        limit: usize,
1575    ) -> Result<Vec<StoredObject>, IndexerError> {
1576        self.spawn_blocking(move |this| {
1577            this.get_dynamic_fields_raw(parent_object_id, cursor, limit)
1578        })
1579        .await
1580    }
1581
1582    fn get_dynamic_fields_raw(
1583        &self,
1584        parent_object_id: ObjectID,
1585        cursor: Option<ObjectID>,
1586        limit: usize,
1587    ) -> Result<Vec<StoredObject>, IndexerError> {
1588        let objects: Vec<StoredObject> = run_query!(&self.pool, |conn| {
1589            let mut query = objects::dsl::objects
1590                .filter(objects::dsl::owner_type.eq(OwnerType::Object as i16))
1591                .filter(objects::dsl::owner_id.eq(parent_object_id.to_vec()))
1592                .order(objects::dsl::object_id.asc())
1593                .limit(limit as i64)
1594                .into_boxed();
1595            if let Some(object_cursor) = cursor {
1596                query = query.filter(objects::dsl::object_id.gt(object_cursor.to_vec()));
1597            }
1598            query.load::<StoredObject>(conn)
1599        })?;
1600
1601        Ok(objects)
1602    }
1603
1604    async fn try_create_dynamic_field_info(
1605        &self,
1606        stored_object: StoredObject,
1607    ) -> Result<Option<DynamicFieldInfo>, IndexerError> {
1608        if stored_object.df_kind.is_none() {
1609            return Ok(None);
1610        }
1611
1612        let object: Object = stored_object.try_into()?;
1613        let Some(move_object) = object.data.try_as_move().cloned() else {
1614            return Err(IndexerError::ResolveMoveStruct(
1615                "Object is not a MoveObject".to_string(),
1616            ));
1617        };
1618        let type_tag: TypeTag = move_object.type_().clone().into();
1619        let layout = self
1620            .package_resolver
1621            .type_layout(type_tag.clone())
1622            .await
1623            .map_err(|e| {
1624                IndexerError::ResolveMoveStruct(format!(
1625                    "Failed to get type layout for type {}: {e}",
1626                    type_tag.to_canonical_display(/* with_prefix */ true),
1627                ))
1628            })?;
1629
1630        let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
1631            .tap_err(|e| tracing::warn!("{e}"))?;
1632
1633        let type_ = field.kind;
1634        let name_type: TypeTag = field.name_layout.into();
1635        let bcs_name = field.name_bytes.to_owned();
1636
1637        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
1638            .tap_err(|e| tracing::warn!("{e}"))?;
1639
1640        let name = DynamicFieldName {
1641            type_: name_type,
1642            value: IotaMoveValue::from(name_value).to_json_value(),
1643        };
1644
1645        let value_metadata = field.value_metadata().map_err(|e| {
1646            tracing::warn!("{e}");
1647            IndexerError::Uncategorized(anyhow!(e))
1648        })?;
1649
1650        Ok(Some(match value_metadata {
1651            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
1652                name,
1653                bcs_name,
1654                type_,
1655                object_type: object_type.to_canonical_string(/* with_prefix */ true),
1656                object_id: object.id(),
1657                version: object.version(),
1658                digest: object.digest(),
1659            },
1660
1661            DFV::ValueMetadata::DynamicObjectField(object_id) => {
1662                let object = self
1663                    .get_object_in_blocking_task(object_id)
1664                    .await?
1665                    .ok_or_else(|| {
1666                        IndexerError::Uncategorized(anyhow!(
1667                            "Failed to find object_id {} when trying to create dynamic field info",
1668                            object_id.to_canonical_display(/* with_prefix */ true),
1669                        ))
1670                    })?;
1671
1672                let object_type = object.data.type_().unwrap().clone();
1673                DynamicFieldInfo {
1674                    name,
1675                    bcs_name,
1676                    type_,
1677                    object_type: object_type.to_canonical_string(/* with_prefix */ true),
1678                    object_id,
1679                    version: object.version(),
1680                    digest: object.digest(),
1681                }
1682            }
1683        }))
1684    }
1685
1686    pub async fn bcs_name_from_dynamic_field_name(
1687        &self,
1688        name: &DynamicFieldName,
1689    ) -> Result<Vec<u8>, IndexerError> {
1690        let move_type_layout = self
1691            .package_resolver()
1692            .type_layout(name.type_.clone())
1693            .await
1694            .map_err(|e| {
1695                IndexerError::ResolveMoveStruct(format!(
1696                    "Failed to get type layout for type {}: {}",
1697                    name.type_, e
1698                ))
1699            })?;
1700        let iota_json_value = iota_json::IotaJsonValue::new(name.value.clone())?;
1701        let name_bcs_value = iota_json_value.to_bcs_bytes(&move_type_layout)?;
1702        Ok(name_bcs_value)
1703    }
1704
1705    pub async fn get_display_object_by_type(
1706        &self,
1707        object_type: &move_core_types::language_storage::StructTag,
1708    ) -> Result<Option<iota_types::display::DisplayVersionUpdatedEvent>, IndexerError> {
1709        let object_type = object_type.to_canonical_string(/* with_prefix */ true);
1710        self.spawn_blocking(move |this| this.get_display_update_event(object_type))
1711            .await
1712    }
1713
1714    fn get_display_update_event(
1715        &self,
1716        object_type: String,
1717    ) -> Result<Option<iota_types::display::DisplayVersionUpdatedEvent>, IndexerError> {
1718        let stored_display = run_query!(&self.pool, |conn| {
1719            display::table
1720                .filter(display::object_type.eq(object_type))
1721                .first::<StoredDisplay>(conn)
1722                .optional()
1723        })?;
1724
1725        let stored_display = match stored_display {
1726            Some(display) => display,
1727            None => return Ok(None),
1728        };
1729
1730        let display_update = stored_display.to_display_update_event()?;
1731
1732        Ok(Some(display_update))
1733    }
1734
1735    pub async fn get_owned_coins_in_blocking_task(
1736        &self,
1737        owner: IotaAddress,
1738        coin_type: Option<String>,
1739        cursor: ObjectID,
1740        limit: usize,
1741    ) -> Result<Vec<IotaCoin>, IndexerError> {
1742        self.spawn_blocking(move |this| this.get_owned_coins(owner, coin_type, cursor, limit))
1743            .await
1744    }
1745
1746    fn get_owned_coins(
1747        &self,
1748        owner: IotaAddress,
1749        // If coin_type is None, look for all coins.
1750        coin_type: Option<String>,
1751        cursor: ObjectID,
1752        limit: usize,
1753    ) -> Result<Vec<IotaCoin>, IndexerError> {
1754        let mut query = objects::dsl::objects
1755            .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
1756            .filter(objects::dsl::owner_id.eq(owner.to_vec()))
1757            .filter(objects::dsl::object_id.gt(cursor.to_vec()))
1758            .into_boxed();
1759        if let Some(coin_type) = coin_type {
1760            query = query.filter(objects::dsl::coin_type.eq(Some(coin_type)));
1761        } else {
1762            query = query.filter(objects::dsl::coin_type.is_not_null());
1763        }
1764        query = query
1765            .order(objects::dsl::object_id.asc())
1766            .limit(limit as i64);
1767
1768        let stored_objects = run_query!(&self.pool, |conn| query.load::<StoredObject>(conn))?;
1769
1770        stored_objects
1771            .into_iter()
1772            .map(|o| o.try_into())
1773            .collect::<IndexerResult<Vec<_>>>()
1774    }
1775
1776    pub async fn get_coin_balances_in_blocking_task(
1777        &self,
1778        owner: IotaAddress,
1779        // If coin_type is None, look for all coins.
1780        coin_type: Option<String>,
1781    ) -> Result<Vec<Balance>, IndexerError> {
1782        self.spawn_blocking(move |this| this.get_coin_balances(owner, coin_type))
1783            .await
1784    }
1785
1786    fn get_coin_balances(
1787        &self,
1788        owner: IotaAddress,
1789        // If coin_type is None, look for all coins.
1790        coin_type: Option<String>,
1791    ) -> Result<Vec<Balance>, IndexerError> {
1792        let coin_type_filter = if let Some(coin_type) = coin_type {
1793            format!("= '{}'", coin_type)
1794        } else {
1795            "IS NOT NULL".to_string()
1796        };
1797        // Note: important to cast to BIGINT to avoid deserialize confusion
1798        let query = format!(
1799            "
1800            SELECT coin_type, \
1801            CAST(COUNT(*) AS BIGINT) AS coin_num, \
1802            CAST(SUM(coin_balance) AS BIGINT) AS coin_balance \
1803            FROM objects \
1804            WHERE owner_type = {} \
1805            AND owner_id = '\\x{}'::BYTEA \
1806            AND coin_type {} \
1807            GROUP BY coin_type \
1808            ORDER BY coin_type ASC
1809        ",
1810            OwnerType::Address as i16,
1811            Hex::encode(owner.to_vec()),
1812            coin_type_filter,
1813        );
1814
1815        tracing::debug!("get coin balances query: {query}");
1816        let coin_balances = run_query!(&self.pool, |conn| diesel::sql_query(query)
1817            .load::<CoinBalance>(conn))?;
1818        coin_balances
1819            .into_iter()
1820            .map(|cb| cb.try_into())
1821            .collect::<IndexerResult<Vec<_>>>()
1822    }
1823
1824    pub fn get_latest_network_metrics(&self) -> IndexerResult<NetworkMetrics> {
1825        let mut metrics = run_query!(&self.pool, |conn| {
1826            diesel::sql_query("SELECT * FROM network_metrics;")
1827                .get_result::<StoredNetworkMetrics>(conn)
1828        })?;
1829        if metrics.total_addresses == -1 {
1830            // this implies that the estimate is not available in the db
1831            // so we fallback to the more expensive count query
1832            metrics.total_addresses = run_query!(&self.pool, |conn| {
1833                addresses::dsl::addresses.count().get_result::<i64>(conn)
1834            })?;
1835        }
1836        if metrics.total_packages == -1 {
1837            // this implies that the estimate is not available in the db
1838            // so we fallback to the more expensive count query
1839            metrics.total_packages = run_query!(&self.pool, |conn| {
1840                packages::dsl::packages.count().get_result::<i64>(conn)
1841            })?;
1842        }
1843        Ok(metrics.into())
1844    }
1845
1846    /// Get the latest move call metrics.
1847    pub fn get_latest_move_call_metrics(&self) -> IndexerResult<MoveCallMetrics> {
1848        let latest_3_days = self.get_latest_move_call_metrics_by_day(3)?;
1849        let latest_7_days = self.get_latest_move_call_metrics_by_day(7)?;
1850        let latest_30_days = self.get_latest_move_call_metrics_by_day(30)?;
1851
1852        // sort by call count desc.
1853        let rank_3_days = latest_3_days
1854            .into_iter()
1855            .sorted_by(|a, b| b.1.cmp(&a.1))
1856            .collect::<Vec<_>>();
1857        let rank_7_days = latest_7_days
1858            .into_iter()
1859            .sorted_by(|a, b| b.1.cmp(&a.1))
1860            .collect::<Vec<_>>();
1861        let rank_30_days = latest_30_days
1862            .into_iter()
1863            .sorted_by(|a, b| b.1.cmp(&a.1))
1864            .collect::<Vec<_>>();
1865
1866        Ok(MoveCallMetrics {
1867            rank_3_days,
1868            rank_7_days,
1869            rank_30_days,
1870        })
1871    }
1872
1873    /// Get the latest move call metrics by day.
1874    pub fn get_latest_move_call_metrics_by_day(
1875        &self,
1876        day_value: i64,
1877    ) -> IndexerResult<Vec<(MoveFunctionName, usize)>> {
1878        let query = "
1879            SELECT id, epoch, day, move_package, move_module, move_function, count
1880            FROM move_call_metrics
1881            WHERE day = $1
1882              AND epoch = (SELECT MAX(epoch) FROM move_call_metrics WHERE day = $1)
1883            ORDER BY count DESC
1884            LIMIT 10
1885        ";
1886
1887        let queried_metrics = run_query!(&self.pool, |conn| sql_query(query)
1888            .bind::<BigInt, _>(day_value)
1889            .load::<QueriedMoveCallMetrics>(conn))?;
1890
1891        let metrics = queried_metrics
1892            .into_iter()
1893            .map(|m| {
1894                m.try_into()
1895                    .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))
1896            })
1897            .collect::<Result<Vec<_>, _>>()?;
1898
1899        Ok(metrics)
1900    }
1901
1902    pub fn get_latest_address_metrics(&self) -> IndexerResult<AddressMetrics> {
1903        let stored_address_metrics = run_query!(&self.pool, |conn| {
1904            address_metrics::table
1905                .order(address_metrics::dsl::checkpoint.desc())
1906                .first::<StoredAddressMetrics>(conn)
1907        })?;
1908        Ok(stored_address_metrics.into())
1909    }
1910
1911    pub fn get_checkpoint_address_metrics(
1912        &self,
1913        checkpoint_seq: u64,
1914    ) -> IndexerResult<AddressMetrics> {
1915        let stored_address_metrics = run_query!(&self.pool, |conn| {
1916            address_metrics::table
1917                .filter(address_metrics::dsl::checkpoint.eq(checkpoint_seq as i64))
1918                .first::<StoredAddressMetrics>(conn)
1919        })?;
1920        Ok(stored_address_metrics.into())
1921    }
1922
1923    pub fn get_all_epoch_address_metrics(
1924        &self,
1925        descending_order: Option<bool>,
1926    ) -> IndexerResult<Vec<AddressMetrics>> {
1927        let is_descending = descending_order.unwrap_or_default();
1928        let epoch_address_metrics_query = format!(
1929            "WITH ranked_rows AS (
1930                SELECT
1931                  checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses,
1932                  row_number() OVER(PARTITION BY epoch ORDER BY checkpoint DESC) as row_num
1933                FROM
1934                  address_metrics
1935              )
1936              SELECT
1937                checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses
1938              FROM ranked_rows
1939              WHERE row_num = 1 ORDER BY epoch {}",
1940            if is_descending { "DESC" } else { "ASC" },
1941        );
1942        let epoch_address_metrics = run_query!(&self.pool, |conn| {
1943            diesel::sql_query(epoch_address_metrics_query).load::<StoredAddressMetrics>(conn)
1944        })?;
1945
1946        Ok(epoch_address_metrics
1947            .into_iter()
1948            .map(|stored_address_metrics| stored_address_metrics.into())
1949            .collect())
1950    }
1951
1952    pub(crate) async fn get_display_fields(
1953        &self,
1954        original_object: &iota_types::object::Object,
1955        original_layout: &Option<MoveStructLayout>,
1956    ) -> Result<DisplayFieldsResponse, IndexerError> {
1957        let (object_type, layout) = if let Some((object_type, layout)) =
1958            iota_json_rpc::read_api::get_object_type_and_struct(original_object, original_layout)
1959                .map_err(|e| IndexerError::Generic(e.to_string()))?
1960        {
1961            (object_type, layout)
1962        } else {
1963            return Ok(DisplayFieldsResponse {
1964                data: None,
1965                error: None,
1966            });
1967        };
1968
1969        if let Some(display_object) = self.get_display_object_by_type(&object_type).await? {
1970            return iota_json_rpc::read_api::get_rendered_fields(display_object.fields, &layout)
1971                .map_err(|e| IndexerError::Generic(e.to_string()));
1972        }
1973        Ok(DisplayFieldsResponse {
1974            data: None,
1975            error: None,
1976        })
1977    }
1978
1979    pub async fn get_coin_metadata_in_blocking_task(
1980        &self,
1981        coin_struct: StructTag,
1982    ) -> Result<Option<IotaCoinMetadata>, IndexerError> {
1983        self.spawn_blocking(move |this| this.get_coin_metadata(coin_struct))
1984            .await
1985    }
1986
1987    fn get_coin_metadata(
1988        &self,
1989        coin_struct: StructTag,
1990    ) -> Result<Option<IotaCoinMetadata>, IndexerError> {
1991        let coin_metadata_type = CoinMetadata::type_(coin_struct.clone());
1992        let metadata_object = self
1993            .get_singleton_object(coin_metadata_type)?
1994            .and_then(|o| IotaCoinMetadata::try_from(o).ok());
1995
1996        if let Some(metadata_object) = metadata_object {
1997            Ok(Some(metadata_object))
1998        } else {
1999            let coin_manager_obj = self.get_coin_manager_obj(coin_struct)?;
2000            Ok(
2001                coin_manager_obj.and_then(|m| match (m.metadata, m.immutable_metadata) {
2002                    (Some(metadata), _) => Some(metadata.into()),
2003                    (_, Some(immutable_metadata)) => Some(IotaCoinMetadata {
2004                        decimals: immutable_metadata.decimals,
2005                        name: immutable_metadata.name,
2006                        symbol: immutable_metadata.symbol,
2007                        description: immutable_metadata.description,
2008                        icon_url: immutable_metadata.icon_url,
2009                        id: None,
2010                    }),
2011                    (None, None) => None,
2012                }),
2013            )
2014        }
2015    }
2016
2017    fn get_coin_manager_obj(
2018        &self,
2019        coin_type: StructTag,
2020    ) -> Result<Option<CoinManager>, IndexerError> {
2021        let coin_manager_type = CoinManager::type_(coin_type);
2022        let coin_manager_object = self
2023            .get_singleton_object(coin_manager_type)?
2024            .and_then(|o| CoinManager::try_from(o).ok());
2025        Ok(coin_manager_object)
2026    }
2027
2028    pub async fn get_total_supply_in_blocking_task(
2029        &self,
2030        coin_struct: StructTag,
2031    ) -> Result<Supply, IndexerError> {
2032        self.spawn_blocking(move |this| this.get_total_supply(coin_struct))
2033            .await
2034    }
2035
2036    fn get_total_supply(&self, coin_struct: StructTag) -> Result<Supply, IndexerError> {
2037        let package_id = coin_struct.address.into();
2038        let treasury_cap_type =
2039            TreasuryCap::type_(coin_struct).to_canonical_string(/* with_prefix */ true);
2040        let treasury_cap_obj_id = self
2041            .package_obj_type_cache
2042            .lock()
2043            .unwrap()
2044            .cache_get_or_set_with(format!("{}{}", package_id, treasury_cap_type), || {
2045                get_single_obj_id_from_package_publish(self, package_id, treasury_cap_type.clone())
2046                    .unwrap()
2047            })
2048            .ok_or(IndexerError::Generic(format!(
2049                "Cannot find treasury cap for type {}",
2050                treasury_cap_type
2051            )))?;
2052        let treasury_cap_obj_object =
2053            self.get_object(&treasury_cap_obj_id, None)?
2054                .ok_or(IndexerError::Generic(format!(
2055                    "Cannot find treasury cap object with id {}",
2056                    treasury_cap_obj_id
2057                )))?;
2058        Ok(TreasuryCap::try_from(treasury_cap_obj_object)?.total_supply)
2059    }
2060
2061    pub fn get_consistent_read_range(&self) -> Result<(i64, i64), IndexerError> {
2062        let latest_checkpoint_sequence = run_query!(&self.pool, |conn| {
2063            checkpoints::table
2064                .select(checkpoints::sequence_number)
2065                .order(checkpoints::sequence_number.desc())
2066                .first::<i64>(conn)
2067                .optional()
2068        })?
2069        .unwrap_or_default();
2070        let latest_object_snapshot_checkpoint_sequence = run_query!(&self.pool, |conn| {
2071            objects_snapshot::table
2072                .select(objects_snapshot::checkpoint_sequence_number)
2073                .order(objects_snapshot::checkpoint_sequence_number.desc())
2074                .first::<i64>(conn)
2075                .optional()
2076        })?
2077        .unwrap_or_default();
2078        Ok((
2079            latest_object_snapshot_checkpoint_sequence,
2080            latest_checkpoint_sequence,
2081        ))
2082    }
2083
2084    pub fn package_resolver(&self) -> PackageResolver {
2085        self.package_resolver.clone()
2086    }
2087
2088    pub async fn pending_active_validators(
2089        &self,
2090    ) -> Result<Vec<IotaValidatorSummary>, IndexerError> {
2091        self.spawn_blocking(move |this| {
2092            iota_types::iota_system_state::get_iota_system_state(&this)
2093                .and_then(|system_state| system_state.get_pending_active_validators(&this))
2094        })
2095        .await
2096        .map_err(Into::into)
2097    }
2098
2099    /// Get the participation metrics. Participation is defined as the total
2100    /// number of unique addresses that have delegated stake in the current
2101    /// epoch. Includes both staked and timelocked staked IOTA.
2102    pub fn get_participation_metrics(&self) -> IndexerResult<ParticipationMetrics> {
2103        run_query!(&self.pool, |conn| {
2104            diesel::sql_query("SELECT * FROM participation_metrics")
2105                .get_result::<StoredParticipationMetrics>(conn)
2106        })
2107        .map(Into::into)
2108    }
2109}
2110
2111impl iota_types::storage::ObjectStore for IndexerReader {
2112    fn get_object(
2113        &self,
2114        object_id: &ObjectID,
2115    ) -> Result<Option<iota_types::object::Object>, iota_types::storage::error::Error> {
2116        self.get_object(object_id, None)
2117            .map_err(iota_types::storage::error::Error::custom)
2118    }
2119
2120    fn get_object_by_key(
2121        &self,
2122        object_id: &ObjectID,
2123        version: iota_types::base_types::VersionNumber,
2124    ) -> Result<Option<iota_types::object::Object>, iota_types::storage::error::Error> {
2125        self.get_object(object_id, Some(version))
2126            .map_err(iota_types::storage::error::Error::custom)
2127    }
2128}
2129
2130fn get_single_obj_id_from_package_publish(
2131    reader: &IndexerReader,
2132    package_id: ObjectID,
2133    obj_type: String,
2134) -> Result<Option<ObjectID>, IndexerError> {
2135    let publish_txn_effects_opt = if is_system_package(package_id) {
2136        Some(reader.get_transaction_effects_with_sequence_number(0))
2137    } else {
2138        reader.get_object(&package_id, None)?.map(|o| {
2139            let publish_txn_digest = o.previous_transaction;
2140            reader.get_transaction_effects_with_digest(publish_txn_digest)
2141        })
2142    };
2143    if let Some(publish_txn_effects) = publish_txn_effects_opt {
2144        let created_objs = publish_txn_effects?
2145            .created()
2146            .iter()
2147            .map(|o| o.object_id())
2148            .collect::<Vec<_>>();
2149        let obj_ids_with_type =
2150            reader.filter_object_id_with_type(created_objs, obj_type.clone())?;
2151        if obj_ids_with_type.len() == 1 {
2152            Ok(Some(obj_ids_with_type[0]))
2153        } else if obj_ids_with_type.is_empty() {
2154            // The package exists but no such object is created in that transaction. Or
2155            // maybe it is wrapped and we don't know yet.
2156            Ok(None)
2157        } else {
2158            // We expect there to be only one object of this type created by the package but
2159            // more than one is found.
2160            tracing::error!(
2161                "There are more than one objects found for type {}",
2162                obj_type
2163            );
2164            Ok(None)
2165        }
2166    } else {
2167        // The coin package does not exist.
2168        Ok(None)
2169    }
2170}