iota_indexer/
indexer_reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashSet,
7    sync::{Arc, Mutex},
8};
9
10use anyhow::{Result, anyhow};
11use cached::{Cached, SizedCache};
12use diesel::{
13    ExpressionMethods, JoinOnDsl, NullableExpressionMethods, OptionalExtension, PgConnection,
14    QueryDsl, RunQueryDsl, SelectableHelper, TextExpressionMethods,
15    dsl::sql,
16    r2d2::ConnectionManager,
17    sql_query,
18    sql_types::{BigInt, Bool},
19};
20use fastcrypto::encoding::{Encoding, Hex};
21use iota_json_rpc_types::{
22    AddressMetrics, Balance, CheckpointId, Coin as IotaCoin, DisplayFieldsResponse, EpochInfo,
23    EventFilter, IotaCoinMetadata, IotaEvent, IotaMoveValue, IotaObjectDataFilter,
24    IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
25    IotaTransactionKind, MoveCallMetrics, MoveFunctionName, NetworkMetrics, ParticipationMetrics,
26    TransactionFilter,
27};
28use iota_package_resolver::{Package, PackageStore, PackageStoreWithLruCache, Resolver};
29use iota_types::{
30    TypeTag,
31    balance::Supply,
32    base_types::{IotaAddress, ObjectID, SequenceNumber, VersionNumber},
33    coin::{CoinMetadata, TreasuryCap},
34    committee::EpochId,
35    digests::{ChainIdentifier, TransactionDigest},
36    dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
37    effects::TransactionEvents,
38    event::EventID,
39    iota_system_state::{
40        IotaSystemStateTrait,
41        iota_system_state_summary::{IotaSystemStateSummary, IotaValidatorSummary},
42    },
43    is_system_package,
44    messages_checkpoint::CheckpointDigest,
45    object::{Object, ObjectRead, PastObjectRead, bounded_visitor::BoundedVisitor},
46};
47use itertools::Itertools;
48use move_core_types::{annotated_value::MoveStructLayout, language_storage::StructTag};
49use tap::TapFallible;
50
51use crate::{
52    db::{ConnectionConfig, ConnectionPool, ConnectionPoolConfig},
53    errors::IndexerError,
54    models::{
55        address_metrics::StoredAddressMetrics,
56        checkpoints::{StoredChainIdentifier, StoredCheckpoint},
57        display::StoredDisplay,
58        epoch::StoredEpochInfo,
59        events::StoredEvent,
60        move_call_metrics::QueriedMoveCallMetrics,
61        network_metrics::StoredNetworkMetrics,
62        obj_indices::StoredObjectVersion,
63        objects::{CoinBalance, StoredHistoryObject, StoredObject},
64        participation_metrics::StoredParticipationMetrics,
65        transactions::{
66            OptimisticTransaction, StoredTransaction, StoredTransactionEvents,
67            stored_events_to_events, tx_events_to_iota_tx_events,
68        },
69        tx_indices::TxSequenceNumber,
70    },
71    schema::{
72        address_metrics, addresses, chain_identifier, checkpoints, display, epochs, events,
73        objects, objects_history, objects_snapshot, objects_version, optimistic_transactions,
74        packages, pruner_cp_watermark, transactions, tx_digests, tx_insertion_order,
75    },
76    store::{diesel_macro::*, package_resolver::IndexerStorePackageResolver},
77    types::{IndexerResult, OwnerType},
78};
79
80pub const TX_SEQUENCE_NUMBER_STR: &str = "tx_sequence_number";
81pub const TRANSACTION_DIGEST_STR: &str = "transaction_digest";
82pub const EVENT_SEQUENCE_NUMBER_STR: &str = "event_sequence_number";
83
84pub struct IndexerReader {
85    pool: ConnectionPool,
86    package_resolver: PackageResolver,
87    package_obj_type_cache: Arc<Mutex<SizedCache<String, Option<ObjectID>>>>,
88}
89
90impl Clone for IndexerReader {
91    fn clone(&self) -> IndexerReader {
92        IndexerReader {
93            pool: self.pool.clone(),
94            package_resolver: self.package_resolver.clone(),
95            package_obj_type_cache: self.package_obj_type_cache.clone(),
96        }
97    }
98}
99
100pub type PackageResolver = Arc<Resolver<PackageStoreWithLruCache<IndexerStorePackageResolver>>>;
101
102// Impl for common initialization and utilities
103impl IndexerReader {
104    pub fn new<T: Into<String>>(db_url: T) -> Result<Self> {
105        let config = ConnectionPoolConfig::default();
106        Self::new_with_config(db_url, config)
107    }
108
109    pub fn new_with_config<T: Into<String>>(
110        db_url: T,
111        config: ConnectionPoolConfig,
112    ) -> Result<Self> {
113        let manager = ConnectionManager::<PgConnection>::new(db_url);
114
115        let connection_config = ConnectionConfig {
116            statement_timeout: config.statement_timeout,
117            read_only: true,
118        };
119
120        let pool = diesel::r2d2::Pool::builder()
121            .max_size(config.pool_size)
122            .connection_timeout(config.connection_timeout)
123            .connection_customizer(Box::new(connection_config))
124            .build(manager)
125            .map_err(|e| anyhow!("Failed to initialize connection pool. Error: {:?}. If Error is None, please check whether the configured pool size (currently {}) exceeds the maximum number of connections allowed by the database.", e, config.pool_size))?;
126
127        let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
128        let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
129        let package_resolver = Arc::new(Resolver::new(package_cache));
130        let package_obj_type_cache = Arc::new(Mutex::new(SizedCache::with_size(10000)));
131        Ok(Self {
132            pool,
133            package_resolver,
134            package_obj_type_cache,
135        })
136    }
137
138    pub async fn spawn_blocking<F, R, E>(&self, f: F) -> Result<R, E>
139    where
140        F: FnOnce(Self) -> Result<R, E> + Send + 'static,
141        R: Send + 'static,
142        E: Send + 'static,
143    {
144        let this = self.clone();
145        let current_span = tracing::Span::current();
146        tokio::task::spawn_blocking(move || {
147            CALLED_FROM_BLOCKING_POOL
148                .with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
149            let _guard = current_span.enter();
150            f(this)
151        })
152        .await
153        .expect("propagate any panics")
154    }
155
156    pub fn get_pool(&self) -> ConnectionPool {
157        self.pool.clone()
158    }
159}
160
161// Impl for reading data from the DB
162impl IndexerReader {
163    fn get_object_from_db(
164        &self,
165        object_id: &ObjectID,
166        version: Option<VersionNumber>,
167    ) -> Result<Option<StoredObject>, IndexerError> {
168        let object_id = object_id.to_vec();
169
170        let stored_object = run_query!(&self.pool, |conn| {
171            if let Some(version) = version {
172                objects::dsl::objects
173                    .filter(objects::dsl::object_id.eq(object_id))
174                    .filter(objects::dsl::object_version.eq(version.value() as i64))
175                    .first::<StoredObject>(conn)
176                    .optional()
177            } else {
178                objects::dsl::objects
179                    .filter(objects::dsl::object_id.eq(object_id))
180                    .first::<StoredObject>(conn)
181                    .optional()
182            }
183        })?;
184        Ok(stored_object)
185    }
186
187    fn get_object(
188        &self,
189        object_id: &ObjectID,
190        version: Option<VersionNumber>,
191    ) -> Result<Option<Object>, IndexerError> {
192        let Some(stored_package) = self.get_object_from_db(object_id, version)? else {
193            return Ok(None);
194        };
195
196        let object = stored_package.try_into()?;
197        Ok(Some(object))
198    }
199
200    pub async fn get_object_in_blocking_task(
201        &self,
202        object_id: ObjectID,
203    ) -> Result<Option<Object>, IndexerError> {
204        self.spawn_blocking(move |this| this.get_object(&object_id, None))
205            .await
206    }
207
208    pub async fn get_object_read_in_blocking_task(
209        &self,
210        object_id: ObjectID,
211    ) -> Result<ObjectRead, IndexerError> {
212        let stored_object = self
213            .spawn_blocking(move |this| this.get_object_raw(object_id))
214            .await?;
215
216        if let Some(object) = stored_object {
217            object
218                .try_into_object_read(self.package_resolver.clone())
219                .await
220        } else {
221            Ok(ObjectRead::NotExists(object_id))
222        }
223    }
224
225    fn get_object_raw(&self, object_id: ObjectID) -> Result<Option<StoredObject>, IndexerError> {
226        let id = object_id.to_vec();
227        let stored_object = run_query!(&self.pool, |conn| {
228            objects::dsl::objects
229                .filter(objects::dsl::object_id.eq(id))
230                .first::<StoredObject>(conn)
231                .optional()
232        })?;
233        Ok(stored_object)
234    }
235
236    /// Fetches a past object by its ID and version.
237    ///
238    /// - If `before_version` is `false`, it looks for the exact version.
239    /// - If `true`, it finds the latest version before the given one.
240    ///
241    /// Searches the requested object version and checkpoint sequence number
242    /// in `objects_version` and fetches the requested object from
243    /// `objects_history`.
244    pub(crate) async fn get_past_object_read(
245        &self,
246        object_id: ObjectID,
247        object_version: SequenceNumber,
248        before_version: bool,
249    ) -> Result<PastObjectRead, IndexerError> {
250        let object_version_num = object_version.value() as i64;
251
252        // Query objects_version to find the requested version and relevant
253        // checkpoint sequence number considering the `before_version` flag.
254        let pool = self.get_pool();
255        let object_id_bytes = object_id.to_vec();
256        let object_version_info: Option<StoredObjectVersion> =
257            run_query_async!(&pool, move |conn| {
258                let mut query = objects_version::dsl::objects_version
259                    .filter(objects_version::object_id.eq(&object_id_bytes))
260                    .into_boxed();
261
262                if before_version {
263                    query = query.filter(objects_version::object_version.lt(object_version_num));
264                } else {
265                    query = query.filter(objects_version::object_version.eq(object_version_num));
266                }
267
268                query
269                    .order_by(objects_version::object_version.desc())
270                    .limit(1)
271                    .first::<StoredObjectVersion>(conn)
272                    .optional()
273            })?;
274
275        let Some(object_version_info) = object_version_info else {
276            // Check if the object ever existed.
277            let pool = self.get_pool();
278            let object_id_bytes = object_id.to_vec();
279            let latest_existing_version: Option<i64> = run_query_async!(&pool, move |conn| {
280                objects_version::dsl::objects_version
281                    .filter(objects_version::object_id.eq(&object_id_bytes))
282                    .order_by(objects_version::object_version.desc())
283                    .select(objects_version::object_version)
284                    .limit(1)
285                    .first::<i64>(conn)
286                    .optional()
287            })?;
288
289            return match latest_existing_version {
290                Some(latest) if object_version_num > latest => Ok(PastObjectRead::VersionTooHigh {
291                    object_id,
292                    asked_version: object_version,
293                    latest_version: SequenceNumber::from(latest as u64),
294                }),
295                Some(_) => Ok(PastObjectRead::VersionNotFound(object_id, object_version)),
296                None => Ok(PastObjectRead::ObjectNotExists(object_id)),
297            };
298        };
299
300        // Query objects_history for the object with the requested version.
301        let history_object = self
302            .get_stored_history_object(
303                object_id,
304                object_version_info.object_version,
305                object_version_info.cp_sequence_number,
306            )
307            .await?;
308
309        match history_object {
310            Some(obj) => {
311                obj.try_into_past_object_read(self.package_resolver.clone())
312                    .await
313            }
314            None => Err(IndexerError::PersistentStorageDataCorruption(format!(
315                "Object version {} not found in objects_history for object {}",
316                object_version_info.object_version, object_id
317            ))),
318        }
319    }
320
321    pub async fn get_stored_history_object(
322        &self,
323        object_id: ObjectID,
324        object_version: i64,
325        checkpoint_sequence_number: i64,
326    ) -> Result<Option<StoredHistoryObject>, IndexerError> {
327        let pool = self.get_pool();
328        let object_id_bytes = object_id.to_vec();
329        run_query_async!(&pool, move |conn| {
330            // Match on the primary key.
331            let query = objects_history::dsl::objects_history
332                .filter(objects_history::checkpoint_sequence_number.eq(checkpoint_sequence_number))
333                .filter(objects_history::object_id.eq(&object_id_bytes))
334                .filter(objects_history::object_version.eq(object_version))
335                .into_boxed();
336
337            query
338                .order_by(objects_history::object_version.desc())
339                .limit(1)
340                .first::<StoredHistoryObject>(conn)
341                .optional()
342        })
343    }
344
345    pub async fn get_package(&self, package_id: ObjectID) -> Result<Package, IndexerError> {
346        let store = self.package_resolver.package_store();
347        let pkg = store
348            .fetch(package_id.into())
349            .await
350            .map_err(|e| {
351                IndexerError::PostgresRead(format!(
352                    "Fail to fetch package from package store with error {:?}",
353                    e
354                ))
355            })?
356            .as_ref()
357            .clone();
358        Ok(pkg)
359    }
360
361    pub fn get_epoch_info_from_db(
362        &self,
363        epoch: Option<EpochId>,
364    ) -> Result<Option<StoredEpochInfo>, IndexerError> {
365        let stored_epoch = run_query!(&self.pool, |conn| {
366            if let Some(epoch) = epoch {
367                epochs::dsl::epochs
368                    .filter(epochs::epoch.eq(epoch as i64))
369                    .first::<StoredEpochInfo>(conn)
370                    .optional()
371            } else {
372                epochs::dsl::epochs
373                    .order_by(epochs::epoch.desc())
374                    .first::<StoredEpochInfo>(conn)
375                    .optional()
376            }
377        })?;
378
379        Ok(stored_epoch)
380    }
381
382    pub fn get_latest_epoch_info_from_db(&self) -> Result<StoredEpochInfo, IndexerError> {
383        let stored_epoch = run_query!(&self.pool, |conn| {
384            epochs::dsl::epochs
385                .order_by(epochs::epoch.desc())
386                .first::<StoredEpochInfo>(conn)
387        })?;
388
389        Ok(stored_epoch)
390    }
391
392    pub fn get_epoch_info(
393        &self,
394        epoch: Option<EpochId>,
395    ) -> Result<Option<EpochInfo>, IndexerError> {
396        let stored_epoch = self.get_epoch_info_from_db(epoch)?;
397
398        let stored_epoch = match stored_epoch {
399            Some(stored_epoch) => stored_epoch,
400            None => return Ok(None),
401        };
402
403        let epoch_info = EpochInfo::try_from(stored_epoch)?;
404        Ok(Some(epoch_info))
405    }
406
407    fn get_epochs_from_db(
408        &self,
409        cursor: Option<u64>,
410        limit: usize,
411        descending_order: bool,
412    ) -> Result<Vec<StoredEpochInfo>, IndexerError> {
413        run_query!(&self.pool, |conn| {
414            let mut boxed_query = epochs::table.into_boxed();
415            if let Some(cursor) = cursor {
416                if descending_order {
417                    boxed_query = boxed_query.filter(epochs::epoch.lt(cursor as i64));
418                } else {
419                    boxed_query = boxed_query.filter(epochs::epoch.gt(cursor as i64));
420                }
421            }
422            if descending_order {
423                boxed_query = boxed_query.order_by(epochs::epoch.desc());
424            } else {
425                boxed_query = boxed_query.order_by(epochs::epoch.asc());
426            }
427
428            boxed_query.limit(limit as i64).load(conn)
429        })
430    }
431
432    pub fn get_epochs(
433        &self,
434        cursor: Option<u64>,
435        limit: usize,
436        descending_order: bool,
437    ) -> Result<Vec<EpochInfo>, IndexerError> {
438        self.get_epochs_from_db(cursor, limit, descending_order)?
439            .into_iter()
440            .map(EpochInfo::try_from)
441            .collect::<Result<Vec<_>, _>>()
442    }
443
444    pub fn get_latest_iota_system_state(&self) -> Result<IotaSystemStateSummary, IndexerError> {
445        let system_state: IotaSystemStateSummary =
446            iota_types::iota_system_state::get_iota_system_state(self)?
447                .into_iota_system_state_summary();
448        Ok(system_state)
449    }
450
451    /// Retrieve the system state data for the given epoch. If no epoch is
452    /// given, it will retrieve the latest epoch's data and return the
453    /// system state. System state of the an epoch is written at the end of
454    /// the epoch, so system state of the current epoch is empty until the
455    /// epoch ends. You can call `get_latest_iota_system_state` for current
456    /// epoch instead.
457    pub fn get_epoch_iota_system_state(
458        &self,
459        epoch: Option<EpochId>,
460    ) -> Result<IotaSystemStateSummary, IndexerError> {
461        let stored_epoch = self.get_epoch_info_from_db(epoch)?;
462        let stored_epoch = match stored_epoch {
463            Some(stored_epoch) => stored_epoch,
464            None => return Err(IndexerError::InvalidArgument("Invalid epoch".into())),
465        };
466
467        let system_state: IotaSystemStateSummary = bcs::from_bytes(&stored_epoch.system_state)
468            .map_err(|_| {
469                IndexerError::PersistentStorageDataCorruption(format!(
470                    "Failed to deserialize `system_state` for epoch {:?}",
471                    epoch,
472                ))
473            })?;
474        Ok(system_state)
475    }
476
477    pub async fn get_chain_identifier_in_blocking_task(
478        &self,
479    ) -> Result<ChainIdentifier, IndexerError> {
480        self.spawn_blocking(|this| this.get_chain_identifier())
481            .await
482    }
483
484    pub fn get_chain_identifier(&self) -> Result<ChainIdentifier, IndexerError> {
485        let stored_chain_identifier = run_query!(&self.pool, |conn| {
486            chain_identifier::dsl::chain_identifier
487                .first::<StoredChainIdentifier>(conn)
488                .optional()
489        })?
490        .ok_or(IndexerError::PostgresRead(
491            "chain identifier not found".to_string(),
492        ))?;
493
494        let checkpoint_digest =
495            CheckpointDigest::try_from(stored_chain_identifier.checkpoint_digest).map_err(|e| {
496                IndexerError::PersistentStorageDataCorruption(format!(
497                    "failed to decode chain identifier with err: {e:?}"
498                ))
499            })?;
500
501        Ok(checkpoint_digest.into())
502    }
503
504    pub fn get_checkpoint_from_db(
505        &self,
506        checkpoint_id: CheckpointId,
507    ) -> Result<Option<StoredCheckpoint>, IndexerError> {
508        let stored_checkpoint = run_query!(&self.pool, |conn| {
509            match checkpoint_id {
510                CheckpointId::SequenceNumber(seq) => checkpoints::dsl::checkpoints
511                    .filter(checkpoints::sequence_number.eq(seq as i64))
512                    .first::<StoredCheckpoint>(conn)
513                    .optional(),
514                CheckpointId::Digest(digest) => checkpoints::dsl::checkpoints
515                    .filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec()))
516                    .first::<StoredCheckpoint>(conn)
517                    .optional(),
518            }
519        })?;
520
521        Ok(stored_checkpoint)
522    }
523
524    pub fn get_latest_checkpoint_from_db(&self) -> Result<StoredCheckpoint, IndexerError> {
525        let stored_checkpoint = run_query!(&self.pool, |conn| {
526            checkpoints::dsl::checkpoints
527                .order_by(checkpoints::sequence_number.desc())
528                .first::<StoredCheckpoint>(conn)
529        })?;
530
531        Ok(stored_checkpoint)
532    }
533
534    pub fn get_checkpoint(
535        &self,
536        checkpoint_id: CheckpointId,
537    ) -> Result<Option<iota_json_rpc_types::Checkpoint>, IndexerError> {
538        let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id)? {
539            Some(stored_checkpoint) => stored_checkpoint,
540            None => return Ok(None),
541        };
542
543        let checkpoint = iota_json_rpc_types::Checkpoint::try_from(stored_checkpoint)?;
544        Ok(Some(checkpoint))
545    }
546
547    pub fn get_latest_checkpoint(&self) -> Result<iota_json_rpc_types::Checkpoint, IndexerError> {
548        let stored_checkpoint = self.get_latest_checkpoint_from_db()?;
549
550        iota_json_rpc_types::Checkpoint::try_from(stored_checkpoint)
551    }
552
553    pub async fn get_latest_checkpoint_timestamp_ms_in_blocking_task(
554        &self,
555    ) -> Result<u64, IndexerError> {
556        self.spawn_blocking(|this| this.get_latest_checkpoint_timestamp_ms())
557            .await
558    }
559
560    pub fn get_latest_checkpoint_timestamp_ms(&self) -> Result<u64, IndexerError> {
561        Ok(self.get_latest_checkpoint()?.timestamp_ms)
562    }
563
564    fn get_checkpoints_from_db(
565        &self,
566        cursor: Option<u64>,
567        limit: usize,
568        descending_order: bool,
569    ) -> Result<Vec<StoredCheckpoint>, IndexerError> {
570        run_query!(&self.pool, |conn| {
571            let mut boxed_query = checkpoints::table.into_boxed();
572            if let Some(cursor) = cursor {
573                if descending_order {
574                    boxed_query =
575                        boxed_query.filter(checkpoints::sequence_number.lt(cursor as i64));
576                } else {
577                    boxed_query =
578                        boxed_query.filter(checkpoints::sequence_number.gt(cursor as i64));
579                }
580            }
581            if descending_order {
582                boxed_query = boxed_query.order_by(checkpoints::sequence_number.desc());
583            } else {
584                boxed_query = boxed_query.order_by(checkpoints::sequence_number.asc());
585            }
586
587            boxed_query
588                .limit(limit as i64)
589                .load::<StoredCheckpoint>(conn)
590        })
591    }
592
593    pub fn get_checkpoints(
594        &self,
595        cursor: Option<u64>,
596        limit: usize,
597        descending_order: bool,
598    ) -> Result<Vec<iota_json_rpc_types::Checkpoint>, IndexerError> {
599        self.get_checkpoints_from_db(cursor, limit, descending_order)?
600            .into_iter()
601            .map(iota_json_rpc_types::Checkpoint::try_from)
602            .collect()
603    }
604
605    fn get_transaction_effects_with_digest(
606        &self,
607        digest: TransactionDigest,
608    ) -> Result<IotaTransactionBlockEffects, IndexerError> {
609        let stored_txn: StoredTransaction = run_query!(&self.pool, |conn| {
610            transactions::table
611                .filter(
612                    transactions::tx_sequence_number
613                        .nullable()
614                        .eq(tx_digests::table
615                            .select(tx_digests::tx_sequence_number)
616                            // we filter the tx_digests table because it is indexed by digest,
617                            // transactions table is not
618                            .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
619                            .single_value()),
620                )
621                .first::<StoredTransaction>(conn)
622        })?;
623
624        stored_txn.try_into_iota_transaction_effects()
625    }
626
627    fn get_transaction_effects_with_sequence_number(
628        &self,
629        sequence_number: i64,
630    ) -> Result<IotaTransactionBlockEffects, IndexerError> {
631        let stored_txn: StoredTransaction = run_query!(&self.pool, |conn| {
632            transactions::table
633                .filter(transactions::tx_sequence_number.eq(sequence_number))
634                .first::<StoredTransaction>(conn)
635        })?;
636
637        stored_txn.try_into_iota_transaction_effects()
638    }
639
640    fn multi_get_transactions(
641        &self,
642        digests: &[TransactionDigest],
643    ) -> Result<Vec<StoredTransaction>, IndexerError> {
644        let digests = digests
645            .iter()
646            .map(|digest| digest.inner().to_vec())
647            .collect::<HashSet<_>>();
648        let checkpointed_txs = run_query!(&self.pool, |conn| {
649            transactions::table
650                .inner_join(
651                    tx_digests::table
652                        .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
653                )
654                // we filter the tx_digests table because it is indexed by digest,
655                // transactions table is not
656                .filter(tx_digests::tx_digest.eq_any(&digests))
657                .select(StoredTransaction::as_select())
658                .load::<StoredTransaction>(conn)
659        })?;
660        if checkpointed_txs.len() == digests.len() {
661            return Ok(checkpointed_txs);
662        }
663        let mut missing_digests = digests;
664        for tx in &checkpointed_txs {
665            missing_digests.remove(&tx.transaction_digest);
666        }
667        let optimistic_txs = run_query!(&self.pool, |conn| {
668            optimistic_transactions::table
669                .inner_join(
670                    tx_insertion_order::table.on(optimistic_transactions::insertion_order
671                        .eq(tx_insertion_order::insertion_order)),
672                )
673                // we filter the tx_insertion_order table because it is indexed by digest,
674                // optimistic_transactions table is not
675                .filter(tx_insertion_order::tx_digest.eq_any(missing_digests))
676                .select(OptimisticTransaction::as_select())
677                .load::<OptimisticTransaction>(conn)
678        })?;
679        Ok(checkpointed_txs
680            .into_iter()
681            .chain(optimistic_txs.into_iter().map(Into::into))
682            .collect())
683    }
684
685    async fn multi_get_transactions_in_blocking_task(
686        &self,
687        digests: Vec<TransactionDigest>,
688    ) -> Result<Vec<StoredTransaction>, IndexerError> {
689        self.spawn_blocking(move |this| this.multi_get_transactions(&digests))
690            .await
691    }
692
693    /// This method tries to transform [`StoredTransaction`] values
694    /// into transaction blocks, without any other modification.
695    async fn stored_transaction_to_transaction_block(
696        &self,
697        stored_txes: Vec<StoredTransaction>,
698        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
699    ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
700        let mut tx_block_responses_futures = vec![];
701        for stored_tx in stored_txes {
702            let package_resolver_clone = self.package_resolver();
703            let options_clone = options.clone();
704            tx_block_responses_futures.push(tokio::task::spawn(
705                stored_tx.try_into_iota_transaction_block_response(
706                    options_clone,
707                    package_resolver_clone,
708                ),
709            ));
710        }
711
712        let tx_blocks = futures::future::join_all(tx_block_responses_futures)
713            .await
714            .into_iter()
715            .collect::<Result<Vec<_>, _>>()
716            .tap_err(|e| tracing::error!("Failed to join all tx block futures: {}", e))?
717            .into_iter()
718            .collect::<Result<Vec<_>, _>>()
719            .tap_err(|e| tracing::error!("Failed to collect tx block futures: {}", e))?;
720        Ok(tx_blocks)
721    }
722
723    fn multi_get_transactions_with_sequence_numbers(
724        &self,
725        tx_sequence_numbers: Vec<i64>,
726        // Some(true) for desc, Some(false) for asc, None for undefined order
727        is_descending: Option<bool>,
728    ) -> Result<Vec<StoredTransaction>, IndexerError> {
729        let mut query = transactions::table
730            .filter(transactions::tx_sequence_number.eq_any(tx_sequence_numbers))
731            .into_boxed();
732        match is_descending {
733            Some(true) => {
734                query = query.order(transactions::dsl::tx_sequence_number.desc());
735            }
736            Some(false) => {
737                query = query.order(transactions::dsl::tx_sequence_number.asc());
738            }
739            None => (),
740        }
741        run_query!(&self.pool, |conn| query.load::<StoredTransaction>(conn))
742    }
743
744    pub async fn get_owned_objects_in_blocking_task(
745        &self,
746        address: IotaAddress,
747        filter: Option<IotaObjectDataFilter>,
748        cursor: Option<ObjectID>,
749        limit: usize,
750    ) -> Result<Vec<StoredObject>, IndexerError> {
751        self.spawn_blocking(move |this| this.get_owned_objects_impl(address, filter, cursor, limit))
752            .await
753    }
754
755    fn get_owned_objects_impl(
756        &self,
757        address: IotaAddress,
758        filter: Option<IotaObjectDataFilter>,
759        cursor: Option<ObjectID>,
760        limit: usize,
761    ) -> Result<Vec<StoredObject>, IndexerError> {
762        run_query!(&self.pool, |conn| {
763            let mut query = objects::dsl::objects
764                .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
765                .filter(objects::dsl::owner_id.eq(address.to_vec()))
766                .order(objects::dsl::object_id.asc())
767                .limit(limit as i64)
768                .into_boxed();
769            if let Some(filter) = filter {
770                match filter {
771                    IotaObjectDataFilter::StructType(struct_tag) => {
772                        let object_type =
773                            struct_tag.to_canonical_string(/* with_prefix */ true);
774                        query =
775                            query.filter(objects::object_type.like(format!("{}%", object_type)));
776                    }
777                    IotaObjectDataFilter::MatchAny(filters) => {
778                        let mut condition = "(".to_string();
779                        for (i, filter) in filters.iter().enumerate() {
780                            if let IotaObjectDataFilter::StructType(struct_tag) = filter {
781                                let object_type =
782                                    struct_tag.to_canonical_string(/* with_prefix */ true);
783                                if i == 0 {
784                                    condition +=
785                                        format!("objects.object_type LIKE '{}%'", object_type)
786                                            .as_str();
787                                } else {
788                                    condition +=
789                                        format!(" OR objects.object_type LIKE '{}%'", object_type)
790                                            .as_str();
791                                }
792                            } else {
793                                return Err(IndexerError::InvalidArgument(
794                                    "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
795                                ));
796                            }
797                        }
798                        condition += ")";
799                        query = query.filter(sql::<Bool>(&condition));
800                    }
801                    IotaObjectDataFilter::MatchNone(filters) => {
802                        for filter in filters {
803                            if let IotaObjectDataFilter::StructType(struct_tag) = filter {
804                                let object_type =
805                                    struct_tag.to_canonical_string(/* with_prefix */ true);
806                                query = query.filter(
807                                    objects::object_type.not_like(format!("{}%", object_type)),
808                                );
809                            } else {
810                                return Err(IndexerError::InvalidArgument(
811                                    "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
812                                ));
813                            }
814                        }
815                    }
816                    _ => {
817                        return Err(IndexerError::InvalidArgument(
818                            "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
819                        ));
820                    }
821                }
822            }
823
824            if let Some(object_cursor) = cursor {
825                query = query.filter(objects::dsl::object_id.gt(object_cursor.to_vec()));
826            }
827
828            query
829                .load::<StoredObject>(conn)
830                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
831        })
832    }
833
834    fn filter_object_id_with_type(
835        &self,
836        object_ids: Vec<ObjectID>,
837        object_type: String,
838    ) -> Result<Vec<ObjectID>, IndexerError> {
839        let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
840        let filtered_ids = run_query!(&self.pool, |conn| {
841            objects::dsl::objects
842                .filter(objects::object_id.eq_any(object_ids))
843                .filter(objects::object_type.eq(object_type))
844                .select(objects::object_id)
845                .load::<Vec<u8>>(conn)
846        })?;
847
848        filtered_ids
849            .into_iter()
850            .map(|id| {
851                ObjectID::from_bytes(id.clone()).map_err(|_e| {
852                    IndexerError::PersistentStorageDataCorruption(format!(
853                        "Can't convert {:?} to ObjectID",
854                        id,
855                    ))
856                })
857            })
858            .collect::<Result<Vec<_>, _>>()
859    }
860
861    pub async fn multi_get_objects_in_blocking_task(
862        &self,
863        object_ids: Vec<ObjectID>,
864    ) -> Result<Vec<StoredObject>, IndexerError> {
865        self.spawn_blocking(move |this| this.multi_get_objects_impl(object_ids))
866            .await
867    }
868
869    fn multi_get_objects_impl(
870        &self,
871        object_ids: Vec<ObjectID>,
872    ) -> Result<Vec<StoredObject>, IndexerError> {
873        let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
874        run_query!(&self.pool, |conn| {
875            objects::dsl::objects
876                .filter(objects::object_id.eq_any(object_ids))
877                .load::<StoredObject>(conn)
878        })
879    }
880
881    async fn query_transaction_blocks_by_checkpoint_impl(
882        &self,
883        checkpoint_seq: u64,
884        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
885        cursor_tx_seq: Option<i64>,
886        limit: usize,
887        is_descending: bool,
888    ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
889        let pool = self.get_pool();
890        let tx_range: (i64, i64) = run_query_async!(&pool, move |conn| {
891            pruner_cp_watermark::dsl::pruner_cp_watermark
892                .select((
893                    pruner_cp_watermark::min_tx_sequence_number,
894                    pruner_cp_watermark::max_tx_sequence_number,
895                ))
896                // we filter the pruner_cp_watermark table because it is indexed by
897                // checkpoint_sequence_number, transactions is not
898                .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint_seq as i64))
899                .first::<(i64, i64)>(conn)
900        })?;
901
902        let mut query = transactions::dsl::transactions
903            .filter(transactions::tx_sequence_number.between(tx_range.0, tx_range.1))
904            .into_boxed();
905
906        // Translate transaction digest cursor to tx sequence number
907        if let Some(cursor_tx_seq) = cursor_tx_seq {
908            if is_descending {
909                query = query.filter(transactions::dsl::tx_sequence_number.lt(cursor_tx_seq));
910            } else {
911                query = query.filter(transactions::dsl::tx_sequence_number.gt(cursor_tx_seq));
912            }
913        }
914        if is_descending {
915            query = query.order(transactions::dsl::tx_sequence_number.desc());
916        } else {
917            query = query.order(transactions::dsl::tx_sequence_number.asc());
918        }
919        let pool = self.get_pool();
920        let stored_txes = run_query_async!(&pool, move |conn| query
921            .limit(limit as i64)
922            .load::<StoredTransaction>(conn))?;
923
924        self.stored_transaction_to_transaction_block(stored_txes, options)
925            .await
926    }
927
928    pub async fn query_transaction_blocks_in_blocking_task(
929        &self,
930        filter: Option<TransactionFilter>,
931        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
932        cursor: Option<TransactionDigest>,
933        limit: usize,
934        is_descending: bool,
935    ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
936        self.query_transaction_blocks_impl(filter, options, cursor, limit, is_descending)
937            .await
938    }
939
940    async fn query_transaction_blocks_impl(
941        &self,
942        filter: Option<TransactionFilter>,
943        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
944        cursor: Option<TransactionDigest>,
945        limit: usize,
946        is_descending: bool,
947    ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
948        let cursor_tx_seq = if let Some(cursor) = cursor {
949            let pool = self.get_pool();
950            let tx_seq = run_query_async!(&pool, move |conn| {
951                tx_digests::table
952                    .select(tx_digests::tx_sequence_number)
953                    // we filter the tx_digests table because it is indexed by digest,
954                    // transactions (and other tables) are not
955                    .filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec()))
956                    .first::<i64>(conn)
957            })?;
958            Some(tx_seq)
959        } else {
960            None
961        };
962        let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
963            if is_descending {
964                format!("AND {TX_SEQUENCE_NUMBER_STR} < {}", cursor_tx_seq)
965            } else {
966                format!("AND {TX_SEQUENCE_NUMBER_STR} > {}", cursor_tx_seq)
967            }
968        } else {
969            "".to_string()
970        };
971        let order_str = if is_descending { "DESC" } else { "ASC" };
972        let (table_name, main_where_clause) = match filter {
973            // Processed above
974            Some(TransactionFilter::Checkpoint(seq)) => {
975                return self
976                    .query_transaction_blocks_by_checkpoint_impl(
977                        seq,
978                        options,
979                        cursor_tx_seq,
980                        limit,
981                        is_descending,
982                    )
983                    .await;
984            }
985            // FIXME: sanitize module & function
986            Some(TransactionFilter::MoveFunction {
987                package,
988                module,
989                function,
990            }) => {
991                let package = Hex::encode(package.to_vec());
992                match (module, function) {
993                    (Some(module), Some(function)) => (
994                        "tx_calls_fun".into(),
995                        format!(
996                            "package = '\\x{}'::bytea AND module = '{}' AND func = '{}'",
997                            package, module, function
998                        ),
999                    ),
1000                    (Some(module), None) => (
1001                        "tx_calls_mod".into(),
1002                        format!(
1003                            "package = '\\x{}'::bytea AND module = '{}'",
1004                            package, module
1005                        ),
1006                    ),
1007                    (None, Some(_)) => {
1008                        return Err(IndexerError::InvalidArgument(
1009                            "Function cannot be present without Module.".into(),
1010                        ));
1011                    }
1012                    (None, None) => (
1013                        "tx_calls_pkg".into(),
1014                        format!("package = '\\x{}'::bytea", package),
1015                    ),
1016                }
1017            }
1018            Some(TransactionFilter::InputObject(object_id)) => {
1019                let object_id = Hex::encode(object_id.to_vec());
1020                (
1021                    "tx_input_objects".into(),
1022                    format!("object_id = '\\x{}'::bytea", object_id),
1023                )
1024            }
1025            Some(TransactionFilter::ChangedObject(object_id)) => {
1026                let object_id = Hex::encode(object_id.to_vec());
1027                (
1028                    "tx_changed_objects".into(),
1029                    format!("object_id = '\\x{}'::bytea", object_id),
1030                )
1031            }
1032            Some(TransactionFilter::FromAddress(from_address)) => {
1033                let from_address = Hex::encode(from_address.to_vec());
1034                (
1035                    "tx_senders".into(),
1036                    format!("sender = '\\x{}'::bytea", from_address),
1037                )
1038            }
1039            Some(TransactionFilter::ToAddress(to_address)) => {
1040                let to_address = Hex::encode(to_address.to_vec());
1041                (
1042                    "tx_recipients".into(),
1043                    format!("recipient = '\\x{}'::bytea", to_address),
1044                )
1045            }
1046            Some(TransactionFilter::FromAndToAddress { from, to }) => {
1047                let from_address = Hex::encode(from.to_vec());
1048                let to_address = Hex::encode(to.to_vec());
1049                // Need to remove ambiguities for tx_sequence_number column
1050                let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
1051                    if is_descending {
1052                        format!(
1053                            "AND tx_senders.{TX_SEQUENCE_NUMBER_STR} < {}",
1054                            cursor_tx_seq
1055                        )
1056                    } else {
1057                        format!(
1058                            "AND tx_senders.{TX_SEQUENCE_NUMBER_STR} > {}",
1059                            cursor_tx_seq
1060                        )
1061                    }
1062                } else {
1063                    "".to_string()
1064                };
1065                let inner_query = format!(
1066                    "(SELECT tx_senders.{TX_SEQUENCE_NUMBER_STR} \
1067                    FROM tx_senders \
1068                    JOIN tx_recipients \
1069                    ON tx_senders.{TX_SEQUENCE_NUMBER_STR} = tx_recipients.{TX_SEQUENCE_NUMBER_STR} \
1070                    WHERE tx_senders.sender = '\\x{}'::BYTEA \
1071                    AND tx_recipients.recipient = '\\x{}'::BYTEA \
1072                    {} \
1073                    ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1074                    LIMIT {}) AS inner_query
1075                    ",
1076                    from_address,
1077                    to_address,
1078                    cursor_clause,
1079                    order_str,
1080                    limit,
1081                );
1082                (inner_query, "1 = 1".into())
1083            }
1084            Some(TransactionFilter::FromOrToAddress { addr }) => {
1085                let address = Hex::encode(addr.to_vec());
1086                let inner_query = format!(
1087                    "( \
1088                        ( \
1089                            SELECT {TX_SEQUENCE_NUMBER_STR} FROM tx_senders \
1090                            WHERE sender = '\\x{}'::BYTEA {} \
1091                            ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1092                            LIMIT {} \
1093                        ) \
1094                        UNION \
1095                        ( \
1096                            SELECT {TX_SEQUENCE_NUMBER_STR} FROM tx_recipients \
1097                            WHERE recipient = '\\x{}'::BYTEA {} \
1098                            ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1099                            LIMIT {} \
1100                        ) \
1101                    ) AS combined",
1102                    address,
1103                    cursor_clause,
1104                    order_str,
1105                    limit,
1106                    address,
1107                    cursor_clause,
1108                    order_str,
1109                    limit,
1110                );
1111                (inner_query, "1 = 1".into())
1112            }
1113            Some(TransactionFilter::TransactionKind(kind)) => {
1114                // The `SystemTransaction` variant can be used to filter for all types of system
1115                // transactions.
1116                if kind == IotaTransactionKind::SystemTransaction {
1117                    ("tx_kinds".into(), "tx_kind != 1".to_string())
1118                } else {
1119                    ("tx_kinds".into(), format!("tx_kind = {}", kind as u8))
1120                }
1121            }
1122            Some(TransactionFilter::TransactionKindIn(kind_vec)) => {
1123                if kind_vec.is_empty() {
1124                    return Err(IndexerError::InvalidArgument(
1125                        "no transaction kind provided".into(),
1126                    ));
1127                }
1128
1129                let mut has_system_transaction = false;
1130                let mut has_programmable_transaction = false;
1131                let mut other_kinds = HashSet::new();
1132
1133                for kind in kind_vec.iter() {
1134                    match kind {
1135                        IotaTransactionKind::SystemTransaction => has_system_transaction = true,
1136                        IotaTransactionKind::ProgrammableTransaction => {
1137                            has_programmable_transaction = true
1138                        }
1139                        other => {
1140                            other_kinds.insert(*other as u8);
1141                        }
1142                    }
1143                }
1144
1145                let query = if has_system_transaction {
1146                    // Case: If `SystemTransaction` is present but `ProgrammableTransaction` is not,
1147                    // we need to filter out `ProgrammableTransaction`.
1148                    if !has_programmable_transaction {
1149                        "tx_kind != 1".to_string()
1150                    } else {
1151                        // No filter applied if both exist
1152                        "1 = 1".to_string()
1153                    }
1154                } else {
1155                    // Case: `ProgrammableTransaction` is present
1156                    if has_programmable_transaction {
1157                        other_kinds.insert(IotaTransactionKind::ProgrammableTransaction as u8);
1158                    }
1159
1160                    if other_kinds.is_empty() {
1161                        // If there's nothing to filter on, return an empty query
1162                        "1 = 1".to_string()
1163                    } else {
1164                        let mut query = String::from("tx_kind IN (");
1165                        query.push_str(
1166                            &other_kinds
1167                                .iter()
1168                                .map(ToString::to_string)
1169                                .collect::<Vec<_>>()
1170                                .join(", "),
1171                        );
1172                        query.push(')');
1173                        query
1174                    }
1175                };
1176
1177                ("tx_kinds".into(), query)
1178            }
1179            None => {
1180                // apply no filter
1181                ("transactions".into(), "1 = 1".into())
1182            }
1183        };
1184
1185        let query = format!(
1186            "SELECT {TX_SEQUENCE_NUMBER_STR} FROM {} WHERE {} {} ORDER BY {TX_SEQUENCE_NUMBER_STR} {} LIMIT {}",
1187            table_name, main_where_clause, cursor_clause, order_str, limit,
1188        );
1189
1190        tracing::debug!("query transaction blocks: {}", query);
1191        let pool = self.get_pool();
1192        let tx_sequence_numbers = run_query_async!(&pool, move |conn| {
1193            diesel::sql_query(query.clone()).load::<TxSequenceNumber>(conn)
1194        })?
1195        .into_iter()
1196        .map(|tsn| tsn.tx_sequence_number)
1197        .collect::<Vec<i64>>();
1198        self.multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
1199            tx_sequence_numbers,
1200            options,
1201            Some(is_descending),
1202        )
1203        .await
1204    }
1205
1206    async fn multi_get_transaction_block_response_in_blocking_task_impl(
1207        &self,
1208        digests: &[TransactionDigest],
1209        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1210    ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1211        let stored_txes = self
1212            .multi_get_transactions_in_blocking_task(digests.to_vec())
1213            .await?;
1214        self.stored_transaction_to_transaction_block(stored_txes, options)
1215            .await
1216    }
1217
1218    async fn multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
1219        &self,
1220        tx_sequence_numbers: Vec<i64>,
1221        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1222        // Some(true) for desc, Some(false) for asc, None for undefined order
1223        is_descending: Option<bool>,
1224    ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1225        let stored_txes: Vec<StoredTransaction> = self
1226            .spawn_blocking(move |this| {
1227                this.multi_get_transactions_with_sequence_numbers(
1228                    tx_sequence_numbers,
1229                    is_descending,
1230                )
1231            })
1232            .await?;
1233        self.stored_transaction_to_transaction_block(stored_txes, options)
1234            .await
1235    }
1236
1237    pub async fn multi_get_transaction_block_response_in_blocking_task(
1238        &self,
1239        digests: Vec<TransactionDigest>,
1240        options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1241    ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1242        self.multi_get_transaction_block_response_in_blocking_task_impl(&digests, options)
1243            .await
1244    }
1245
1246    pub async fn get_transaction_events_in_blocking_task(
1247        &self,
1248        digest: TransactionDigest,
1249    ) -> Result<Vec<iota_json_rpc_types::IotaEvent>, IndexerError> {
1250        let pool = self.get_pool();
1251        let (timestamp_ms, serialized_events) = run_query_async!(&pool, move |conn| {
1252            transactions::table
1253                .filter(
1254                    transactions::tx_sequence_number
1255                        .nullable()
1256                        .eq(tx_digests::table
1257                            .select(tx_digests::tx_sequence_number)
1258                            // we filter the tx_digests table because it is indexed by digest,
1259                            // transactions table is not
1260                            .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
1261                            .single_value()),
1262                )
1263                .select((transactions::timestamp_ms, transactions::events))
1264                .first::<(i64, StoredTransactionEvents)>(conn)
1265        })?;
1266
1267        let events = stored_events_to_events(serialized_events)?;
1268        let tx_events = TransactionEvents { data: events };
1269
1270        let iota_tx_events = tx_events_to_iota_tx_events(
1271            tx_events,
1272            self.package_resolver(),
1273            digest,
1274            timestamp_ms as u64,
1275        )
1276        .await?;
1277        Ok(iota_tx_events.map_or(vec![], |ste| ste.data))
1278    }
1279
1280    async fn query_events_by_tx_digest(
1281        &self,
1282        tx_digest: TransactionDigest,
1283        cursor: Option<EventID>,
1284        limit: usize,
1285        descending_order: bool,
1286    ) -> IndexerResult<Vec<IotaEvent>> {
1287        let mut query = events::table.into_boxed();
1288
1289        if let Some(cursor) = cursor {
1290            if cursor.tx_digest != tx_digest {
1291                return Err(IndexerError::InvalidArgument(
1292                    "Cursor tx_digest does not match the tx_digest in the query.".into(),
1293                ));
1294            }
1295            if descending_order {
1296                query = query.filter(events::event_sequence_number.lt(cursor.event_seq as i64));
1297            } else {
1298                query = query.filter(events::event_sequence_number.gt(cursor.event_seq as i64));
1299            }
1300        } else if descending_order {
1301            query = query.filter(events::event_sequence_number.le(i64::MAX));
1302        } else {
1303            query = query.filter(events::event_sequence_number.ge(0));
1304        };
1305
1306        if descending_order {
1307            query = query.order(events::event_sequence_number.desc());
1308        } else {
1309            query = query.order(events::event_sequence_number.asc());
1310        }
1311
1312        query = query.filter(
1313            events::tx_sequence_number.nullable().eq(tx_digests::table
1314                .select(tx_digests::tx_sequence_number)
1315                // we filter the tx_digests table because it is indexed by digest,
1316                // events table is not
1317                .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1318                .single_value()),
1319        );
1320
1321        let pool = self.get_pool();
1322        let stored_events = run_query_async!(&pool, move |conn| {
1323            query.limit(limit as i64).load::<StoredEvent>(conn)
1324        })?;
1325
1326        let mut iota_event_futures = vec![];
1327        for stored_event in stored_events {
1328            iota_event_futures.push(tokio::task::spawn(
1329                stored_event.try_into_iota_event(self.package_resolver.clone()),
1330            ));
1331        }
1332
1333        let iota_events = futures::future::join_all(iota_event_futures)
1334            .await
1335            .into_iter()
1336            .collect::<Result<Vec<_>, _>>()
1337            .tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))?
1338            .into_iter()
1339            .collect::<Result<Vec<_>, _>>()
1340            .tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?;
1341        Ok(iota_events)
1342    }
1343
1344    pub async fn query_events_in_blocking_task(
1345        &self,
1346        filter: EventFilter,
1347        cursor: Option<EventID>,
1348        limit: usize,
1349        descending_order: bool,
1350    ) -> IndexerResult<Vec<IotaEvent>> {
1351        let pool = self.get_pool();
1352        let (tx_seq, event_seq) = if let Some(cursor) = cursor {
1353            let EventID {
1354                tx_digest,
1355                event_seq,
1356            } = cursor;
1357            let tx_seq = run_query_async!(&pool, move |conn| {
1358                transactions::dsl::transactions
1359                    .select(transactions::tx_sequence_number)
1360                    .filter(
1361                        transactions::tx_sequence_number
1362                            .nullable()
1363                            .eq(tx_digests::table
1364                                .select(tx_digests::tx_sequence_number)
1365                                // we filter the tx_digests table because it is indexed by digest,
1366                                // transactions table is not
1367                                .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1368                                .single_value()),
1369                    )
1370                    .first::<i64>(conn)
1371            })?;
1372            (tx_seq, event_seq as i64)
1373        } else if descending_order {
1374            let max_tx_seq = i64::MAX;
1375            let max_event_seq = i64::MAX;
1376            (max_tx_seq, max_event_seq)
1377        } else {
1378            (-1, 0)
1379        };
1380
1381        let query = if let EventFilter::Sender(sender) = &filter {
1382            // Need to remove ambiguities for tx_sequence_number column
1383            let cursor_clause = if descending_order {
1384                format!(
1385                    "(e.{TX_SEQUENCE_NUMBER_STR} < {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} < {}))",
1386                    tx_seq, tx_seq, event_seq
1387                )
1388            } else {
1389                format!(
1390                    "(e.{TX_SEQUENCE_NUMBER_STR} > {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} > {}))",
1391                    tx_seq, tx_seq, event_seq
1392                )
1393            };
1394            let order_clause = if descending_order {
1395                format!("e.{TX_SEQUENCE_NUMBER_STR} DESC, e.{EVENT_SEQUENCE_NUMBER_STR} DESC")
1396            } else {
1397                format!("e.{TX_SEQUENCE_NUMBER_STR} ASC, e.{EVENT_SEQUENCE_NUMBER_STR} ASC")
1398            };
1399            format!(
1400                "( \
1401                    SELECT *
1402                    FROM tx_senders s
1403                    JOIN events e
1404                    ON e.tx_sequence_number = s.tx_sequence_number
1405                    AND s.sender = '\\x{}'::bytea
1406                    WHERE {} \
1407                    ORDER BY {} \
1408                    LIMIT {}
1409                )",
1410                Hex::encode(sender.to_vec()),
1411                cursor_clause,
1412                order_clause,
1413                limit,
1414            )
1415        } else if let EventFilter::Transaction(tx_digest) = filter {
1416            return self
1417                .query_events_by_tx_digest(tx_digest, cursor, limit, descending_order)
1418                .await;
1419        } else {
1420            let main_where_clause = match filter {
1421                EventFilter::Package(package_id) => {
1422                    format!("package = '\\x{}'::bytea", package_id.to_hex())
1423                }
1424                EventFilter::MoveModule { package, module } => {
1425                    format!(
1426                        "package = '\\x{}'::bytea AND module = '{}'",
1427                        package.to_hex(),
1428                        module,
1429                    )
1430                }
1431                EventFilter::MoveEventType(struct_tag) => {
1432                    let formatted_struct_tag = struct_tag.to_canonical_string(true);
1433                    format!("event_type = '{formatted_struct_tag}'")
1434                }
1435                EventFilter::MoveEventModule { package, module } => {
1436                    let package_module_prefix = format!("{}::{}", package.to_hex_literal(), module);
1437                    format!("event_type LIKE '{package_module_prefix}::%'")
1438                }
1439                EventFilter::Sender(_) => {
1440                    // Processed above
1441                    unreachable!()
1442                }
1443                EventFilter::Transaction(_) => {
1444                    // Processed above
1445                    unreachable!()
1446                }
1447                EventFilter::MoveEventField { .. }
1448                | EventFilter::All(_)
1449                | EventFilter::Any(_)
1450                | EventFilter::And(_, _)
1451                | EventFilter::Or(_, _)
1452                | EventFilter::TimeRange { .. } => {
1453                    return Err(IndexerError::NotSupported(
1454                        "This type of EventFilter is not supported.".into(),
1455                    ));
1456                }
1457            };
1458
1459            let cursor_clause = if descending_order {
1460                format!(
1461                    "AND ({TX_SEQUENCE_NUMBER_STR} < {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} < {}))",
1462                    tx_seq, tx_seq, event_seq
1463                )
1464            } else {
1465                format!(
1466                    "AND ({TX_SEQUENCE_NUMBER_STR} > {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} > {}))",
1467                    tx_seq, tx_seq, event_seq
1468                )
1469            };
1470            let order_clause = if descending_order {
1471                format!("{TX_SEQUENCE_NUMBER_STR} DESC, {EVENT_SEQUENCE_NUMBER_STR} DESC")
1472            } else {
1473                format!("{TX_SEQUENCE_NUMBER_STR} ASC, {EVENT_SEQUENCE_NUMBER_STR} ASC")
1474            };
1475
1476            format!(
1477                "
1478                    SELECT * FROM events \
1479                    WHERE {} {} \
1480                    ORDER BY {} \
1481                    LIMIT {}
1482                ",
1483                main_where_clause, cursor_clause, order_clause, limit,
1484            )
1485        };
1486        tracing::debug!("query events: {}", query);
1487        let pool = self.get_pool();
1488        let stored_events = run_query_async!(&pool, move |conn| diesel::sql_query(query)
1489            .load::<StoredEvent>(conn))?;
1490
1491        let mut iota_event_futures = vec![];
1492        for stored_event in stored_events {
1493            iota_event_futures.push(tokio::task::spawn(
1494                stored_event.try_into_iota_event(self.package_resolver.clone()),
1495            ));
1496        }
1497
1498        let iota_events = futures::future::join_all(iota_event_futures)
1499            .await
1500            .into_iter()
1501            .collect::<Result<Vec<_>, _>>()
1502            .tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))?
1503            .into_iter()
1504            .collect::<Result<Vec<_>, _>>()
1505            .tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?;
1506        Ok(iota_events)
1507    }
1508
1509    pub async fn get_dynamic_fields_in_blocking_task(
1510        &self,
1511        parent_object_id: ObjectID,
1512        cursor: Option<ObjectID>,
1513        limit: usize,
1514    ) -> Result<Vec<DynamicFieldInfo>, IndexerError> {
1515        let stored_objects = self
1516            .spawn_blocking(move |this| {
1517                this.get_dynamic_fields_raw(parent_object_id, cursor, limit)
1518            })
1519            .await?;
1520
1521        let mut df_futures = vec![];
1522        let indexer_reader_arc = Arc::new(self.clone());
1523        for stored_object in stored_objects {
1524            let indexer_reader_arc_clone = Arc::clone(&indexer_reader_arc);
1525            df_futures.push(tokio::task::spawn(async move {
1526                indexer_reader_arc_clone
1527                    .try_create_dynamic_field_info(stored_object)
1528                    .await
1529            }));
1530        }
1531        let df_infos = futures::future::try_join_all(df_futures)
1532            .await
1533            .tap_err(|e| tracing::error!("Error joining DF futures: {:?}", e))?
1534            .into_iter()
1535            .collect::<Result<Vec<_>, _>>()
1536            .tap_err(|e| {
1537                tracing::error!(
1538                    "Error calling DF try_create_dynamic_field_info function: {:?}",
1539                    e
1540                )
1541            })?
1542            .into_iter()
1543            .flatten()
1544            .collect::<Vec<_>>();
1545        Ok(df_infos)
1546    }
1547
1548    pub async fn get_dynamic_fields_raw_in_blocking_task(
1549        &self,
1550        parent_object_id: ObjectID,
1551        cursor: Option<ObjectID>,
1552        limit: usize,
1553    ) -> Result<Vec<StoredObject>, IndexerError> {
1554        self.spawn_blocking(move |this| {
1555            this.get_dynamic_fields_raw(parent_object_id, cursor, limit)
1556        })
1557        .await
1558    }
1559
1560    fn get_dynamic_fields_raw(
1561        &self,
1562        parent_object_id: ObjectID,
1563        cursor: Option<ObjectID>,
1564        limit: usize,
1565    ) -> Result<Vec<StoredObject>, IndexerError> {
1566        let objects: Vec<StoredObject> = run_query!(&self.pool, |conn| {
1567            let mut query = objects::dsl::objects
1568                .filter(objects::dsl::owner_type.eq(OwnerType::Object as i16))
1569                .filter(objects::dsl::owner_id.eq(parent_object_id.to_vec()))
1570                .order(objects::dsl::object_id.asc())
1571                .limit(limit as i64)
1572                .into_boxed();
1573            if let Some(object_cursor) = cursor {
1574                query = query.filter(objects::dsl::object_id.gt(object_cursor.to_vec()));
1575            }
1576            query.load::<StoredObject>(conn)
1577        })?;
1578
1579        Ok(objects)
1580    }
1581
1582    async fn try_create_dynamic_field_info(
1583        &self,
1584        stored_object: StoredObject,
1585    ) -> Result<Option<DynamicFieldInfo>, IndexerError> {
1586        if stored_object.df_kind.is_none() {
1587            return Ok(None);
1588        }
1589
1590        let object: Object = stored_object.try_into()?;
1591        let Some(move_object) = object.data.try_as_move().cloned() else {
1592            return Err(IndexerError::ResolveMoveStruct(
1593                "Object is not a MoveObject".to_string(),
1594            ));
1595        };
1596        let type_tag: TypeTag = move_object.type_().clone().into();
1597        let layout = self
1598            .package_resolver
1599            .type_layout(type_tag.clone())
1600            .await
1601            .map_err(|e| {
1602                IndexerError::ResolveMoveStruct(format!(
1603                    "Failed to get type layout for type {}: {e}",
1604                    type_tag.to_canonical_display(/* with_prefix */ true),
1605                ))
1606            })?;
1607
1608        let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
1609            .tap_err(|e| tracing::warn!("{e}"))?;
1610
1611        let type_ = field.kind;
1612        let name_type: TypeTag = field.name_layout.into();
1613        let bcs_name = field.name_bytes.to_owned();
1614
1615        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
1616            .tap_err(|e| tracing::warn!("{e}"))?;
1617
1618        let name = DynamicFieldName {
1619            type_: name_type,
1620            value: IotaMoveValue::from(name_value).to_json_value(),
1621        };
1622
1623        let value_metadata = field.value_metadata().map_err(|e| {
1624            tracing::warn!("{e}");
1625            IndexerError::Uncategorized(anyhow!(e))
1626        })?;
1627
1628        Ok(Some(match value_metadata {
1629            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
1630                name,
1631                bcs_name,
1632                type_,
1633                object_type: object_type.to_canonical_string(/* with_prefix */ true),
1634                object_id: object.id(),
1635                version: object.version(),
1636                digest: object.digest(),
1637            },
1638
1639            DFV::ValueMetadata::DynamicObjectField(object_id) => {
1640                let object = self
1641                    .get_object_in_blocking_task(object_id)
1642                    .await?
1643                    .ok_or_else(|| {
1644                        IndexerError::Uncategorized(anyhow!(
1645                            "Failed to find object_id {} when trying to create dynamic field info",
1646                            object_id.to_canonical_display(/* with_prefix */ true),
1647                        ))
1648                    })?;
1649
1650                let object_type = object.data.type_().unwrap().clone();
1651                DynamicFieldInfo {
1652                    name,
1653                    bcs_name,
1654                    type_,
1655                    object_type: object_type.to_canonical_string(/* with_prefix */ true),
1656                    object_id,
1657                    version: object.version(),
1658                    digest: object.digest(),
1659                }
1660            }
1661        }))
1662    }
1663
1664    pub async fn bcs_name_from_dynamic_field_name(
1665        &self,
1666        name: &DynamicFieldName,
1667    ) -> Result<Vec<u8>, IndexerError> {
1668        let move_type_layout = self
1669            .package_resolver()
1670            .type_layout(name.type_.clone())
1671            .await
1672            .map_err(|e| {
1673                IndexerError::ResolveMoveStruct(format!(
1674                    "Failed to get type layout for type {}: {}",
1675                    name.type_, e
1676                ))
1677            })?;
1678        let iota_json_value = iota_json::IotaJsonValue::new(name.value.clone())?;
1679        let name_bcs_value = iota_json_value.to_bcs_bytes(&move_type_layout)?;
1680        Ok(name_bcs_value)
1681    }
1682
1683    pub async fn get_display_object_by_type(
1684        &self,
1685        object_type: &move_core_types::language_storage::StructTag,
1686    ) -> Result<Option<iota_types::display::DisplayVersionUpdatedEvent>, IndexerError> {
1687        let object_type = object_type.to_canonical_string(/* with_prefix */ true);
1688        self.spawn_blocking(move |this| this.get_display_update_event(object_type))
1689            .await
1690    }
1691
1692    fn get_display_update_event(
1693        &self,
1694        object_type: String,
1695    ) -> Result<Option<iota_types::display::DisplayVersionUpdatedEvent>, IndexerError> {
1696        let stored_display = run_query!(&self.pool, |conn| {
1697            display::table
1698                .filter(display::object_type.eq(object_type))
1699                .first::<StoredDisplay>(conn)
1700                .optional()
1701        })?;
1702
1703        let stored_display = match stored_display {
1704            Some(display) => display,
1705            None => return Ok(None),
1706        };
1707
1708        let display_update = stored_display.to_display_update_event()?;
1709
1710        Ok(Some(display_update))
1711    }
1712
1713    pub async fn get_owned_coins_in_blocking_task(
1714        &self,
1715        owner: IotaAddress,
1716        coin_type: Option<String>,
1717        cursor: ObjectID,
1718        limit: usize,
1719    ) -> Result<Vec<IotaCoin>, IndexerError> {
1720        self.spawn_blocking(move |this| this.get_owned_coins(owner, coin_type, cursor, limit))
1721            .await
1722    }
1723
1724    fn get_owned_coins(
1725        &self,
1726        owner: IotaAddress,
1727        // If coin_type is None, look for all coins.
1728        coin_type: Option<String>,
1729        cursor: ObjectID,
1730        limit: usize,
1731    ) -> Result<Vec<IotaCoin>, IndexerError> {
1732        let mut query = objects::dsl::objects
1733            .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
1734            .filter(objects::dsl::owner_id.eq(owner.to_vec()))
1735            .filter(objects::dsl::object_id.gt(cursor.to_vec()))
1736            .into_boxed();
1737        if let Some(coin_type) = coin_type {
1738            query = query.filter(objects::dsl::coin_type.eq(Some(coin_type)));
1739        } else {
1740            query = query.filter(objects::dsl::coin_type.is_not_null());
1741        }
1742        query = query
1743            .order(objects::dsl::object_id.asc())
1744            .limit(limit as i64);
1745
1746        let stored_objects = run_query!(&self.pool, |conn| query.load::<StoredObject>(conn))?;
1747
1748        stored_objects
1749            .into_iter()
1750            .map(|o| o.try_into())
1751            .collect::<IndexerResult<Vec<_>>>()
1752    }
1753
1754    pub async fn get_coin_balances_in_blocking_task(
1755        &self,
1756        owner: IotaAddress,
1757        // If coin_type is None, look for all coins.
1758        coin_type: Option<String>,
1759    ) -> Result<Vec<Balance>, IndexerError> {
1760        self.spawn_blocking(move |this| this.get_coin_balances(owner, coin_type))
1761            .await
1762    }
1763
1764    fn get_coin_balances(
1765        &self,
1766        owner: IotaAddress,
1767        // If coin_type is None, look for all coins.
1768        coin_type: Option<String>,
1769    ) -> Result<Vec<Balance>, IndexerError> {
1770        let coin_type_filter = if let Some(coin_type) = coin_type {
1771            format!("= '{}'", coin_type)
1772        } else {
1773            "IS NOT NULL".to_string()
1774        };
1775        // Note: important to cast to BIGINT to avoid deserialize confusion
1776        let query = format!(
1777            "
1778            SELECT coin_type, \
1779            CAST(COUNT(*) AS BIGINT) AS coin_num, \
1780            CAST(SUM(coin_balance) AS BIGINT) AS coin_balance \
1781            FROM objects \
1782            WHERE owner_type = {} \
1783            AND owner_id = '\\x{}'::BYTEA \
1784            AND coin_type {} \
1785            GROUP BY coin_type \
1786            ORDER BY coin_type ASC
1787        ",
1788            OwnerType::Address as i16,
1789            Hex::encode(owner.to_vec()),
1790            coin_type_filter,
1791        );
1792
1793        tracing::debug!("get coin balances query: {query}");
1794        let coin_balances = run_query!(&self.pool, |conn| diesel::sql_query(query)
1795            .load::<CoinBalance>(conn))?;
1796        coin_balances
1797            .into_iter()
1798            .map(|cb| cb.try_into())
1799            .collect::<IndexerResult<Vec<_>>>()
1800    }
1801
1802    pub fn get_latest_network_metrics(&self) -> IndexerResult<NetworkMetrics> {
1803        let mut metrics = run_query!(&self.pool, |conn| {
1804            diesel::sql_query("SELECT * FROM network_metrics;")
1805                .get_result::<StoredNetworkMetrics>(conn)
1806        })?;
1807        if metrics.total_addresses == -1 {
1808            // this implies that the estimate is not available in the db
1809            // so we fallback to the more expensive count query
1810            metrics.total_addresses = run_query!(&self.pool, |conn| {
1811                addresses::dsl::addresses.count().get_result::<i64>(conn)
1812            })?;
1813        }
1814        if metrics.total_packages == -1 {
1815            // this implies that the estimate is not available in the db
1816            // so we fallback to the more expensive count query
1817            metrics.total_packages = run_query!(&self.pool, |conn| {
1818                packages::dsl::packages.count().get_result::<i64>(conn)
1819            })?;
1820        }
1821        Ok(metrics.into())
1822    }
1823
1824    /// Get the latest move call metrics.
1825    pub fn get_latest_move_call_metrics(&self) -> IndexerResult<MoveCallMetrics> {
1826        let latest_3_days = self.get_latest_move_call_metrics_by_day(3)?;
1827        let latest_7_days = self.get_latest_move_call_metrics_by_day(7)?;
1828        let latest_30_days = self.get_latest_move_call_metrics_by_day(30)?;
1829
1830        // sort by call count desc.
1831        let rank_3_days = latest_3_days
1832            .into_iter()
1833            .sorted_by(|a, b| b.1.cmp(&a.1))
1834            .collect::<Vec<_>>();
1835        let rank_7_days = latest_7_days
1836            .into_iter()
1837            .sorted_by(|a, b| b.1.cmp(&a.1))
1838            .collect::<Vec<_>>();
1839        let rank_30_days = latest_30_days
1840            .into_iter()
1841            .sorted_by(|a, b| b.1.cmp(&a.1))
1842            .collect::<Vec<_>>();
1843
1844        Ok(MoveCallMetrics {
1845            rank_3_days,
1846            rank_7_days,
1847            rank_30_days,
1848        })
1849    }
1850
1851    /// Get the latest move call metrics by day.
1852    pub fn get_latest_move_call_metrics_by_day(
1853        &self,
1854        day_value: i64,
1855    ) -> IndexerResult<Vec<(MoveFunctionName, usize)>> {
1856        let query = "
1857            SELECT id, epoch, day, move_package, move_module, move_function, count
1858            FROM move_call_metrics
1859            WHERE day = $1
1860              AND epoch = (SELECT MAX(epoch) FROM move_call_metrics WHERE day = $1)
1861            ORDER BY count DESC
1862            LIMIT 10
1863        ";
1864
1865        let queried_metrics = run_query!(&self.pool, |conn| sql_query(query)
1866            .bind::<BigInt, _>(day_value)
1867            .load::<QueriedMoveCallMetrics>(conn))?;
1868
1869        let metrics = queried_metrics
1870            .into_iter()
1871            .map(|m| {
1872                m.try_into()
1873                    .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))
1874            })
1875            .collect::<Result<Vec<_>, _>>()?;
1876
1877        Ok(metrics)
1878    }
1879
1880    pub fn get_latest_address_metrics(&self) -> IndexerResult<AddressMetrics> {
1881        let stored_address_metrics = run_query!(&self.pool, |conn| {
1882            address_metrics::table
1883                .order(address_metrics::dsl::checkpoint.desc())
1884                .first::<StoredAddressMetrics>(conn)
1885        })?;
1886        Ok(stored_address_metrics.into())
1887    }
1888
1889    pub fn get_checkpoint_address_metrics(
1890        &self,
1891        checkpoint_seq: u64,
1892    ) -> IndexerResult<AddressMetrics> {
1893        let stored_address_metrics = run_query!(&self.pool, |conn| {
1894            address_metrics::table
1895                .filter(address_metrics::dsl::checkpoint.eq(checkpoint_seq as i64))
1896                .first::<StoredAddressMetrics>(conn)
1897        })?;
1898        Ok(stored_address_metrics.into())
1899    }
1900
1901    pub fn get_all_epoch_address_metrics(
1902        &self,
1903        descending_order: Option<bool>,
1904    ) -> IndexerResult<Vec<AddressMetrics>> {
1905        let is_descending = descending_order.unwrap_or_default();
1906        let epoch_address_metrics_query = format!(
1907            "WITH ranked_rows AS (
1908                SELECT
1909                  checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses,
1910                  row_number() OVER(PARTITION BY epoch ORDER BY checkpoint DESC) as row_num
1911                FROM
1912                  address_metrics
1913              )
1914              SELECT
1915                checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses
1916              FROM ranked_rows
1917              WHERE row_num = 1 ORDER BY epoch {}",
1918            if is_descending { "DESC" } else { "ASC" },
1919        );
1920        let epoch_address_metrics = run_query!(&self.pool, |conn| {
1921            diesel::sql_query(epoch_address_metrics_query).load::<StoredAddressMetrics>(conn)
1922        })?;
1923
1924        Ok(epoch_address_metrics
1925            .into_iter()
1926            .map(|stored_address_metrics| stored_address_metrics.into())
1927            .collect())
1928    }
1929
1930    pub(crate) async fn get_display_fields(
1931        &self,
1932        original_object: &iota_types::object::Object,
1933        original_layout: &Option<MoveStructLayout>,
1934    ) -> Result<DisplayFieldsResponse, IndexerError> {
1935        let (object_type, layout) = if let Some((object_type, layout)) =
1936            iota_json_rpc::read_api::get_object_type_and_struct(original_object, original_layout)
1937                .map_err(|e| IndexerError::Generic(e.to_string()))?
1938        {
1939            (object_type, layout)
1940        } else {
1941            return Ok(DisplayFieldsResponse {
1942                data: None,
1943                error: None,
1944            });
1945        };
1946
1947        if let Some(display_object) = self.get_display_object_by_type(&object_type).await? {
1948            return iota_json_rpc::read_api::get_rendered_fields(display_object.fields, &layout)
1949                .map_err(|e| IndexerError::Generic(e.to_string()));
1950        }
1951        Ok(DisplayFieldsResponse {
1952            data: None,
1953            error: None,
1954        })
1955    }
1956
1957    pub async fn get_coin_metadata_in_blocking_task(
1958        &self,
1959        coin_struct: StructTag,
1960    ) -> Result<Option<IotaCoinMetadata>, IndexerError> {
1961        self.spawn_blocking(move |this| this.get_coin_metadata(coin_struct))
1962            .await
1963    }
1964
1965    fn get_coin_metadata(
1966        &self,
1967        coin_struct: StructTag,
1968    ) -> Result<Option<IotaCoinMetadata>, IndexerError> {
1969        let package_id = coin_struct.address.into();
1970        let coin_metadata_type =
1971            CoinMetadata::type_(coin_struct).to_canonical_string(/* with_prefix */ true);
1972        let coin_metadata_obj_id = *self
1973            .package_obj_type_cache
1974            .lock()
1975            .unwrap()
1976            .cache_get_or_set_with(format!("{}{}", package_id, coin_metadata_type), || {
1977                get_single_obj_id_from_package_publish(self, package_id, coin_metadata_type.clone())
1978                    .unwrap()
1979            });
1980        if let Some(id) = coin_metadata_obj_id {
1981            let metadata_object = self.get_object(&id, None)?;
1982            Ok(metadata_object.and_then(|v| IotaCoinMetadata::try_from(v).ok()))
1983        } else {
1984            Ok(None)
1985        }
1986    }
1987
1988    pub async fn get_total_supply_in_blocking_task(
1989        &self,
1990        coin_struct: StructTag,
1991    ) -> Result<Supply, IndexerError> {
1992        self.spawn_blocking(move |this| this.get_total_supply(coin_struct))
1993            .await
1994    }
1995
1996    fn get_total_supply(&self, coin_struct: StructTag) -> Result<Supply, IndexerError> {
1997        let package_id = coin_struct.address.into();
1998        let treasury_cap_type =
1999            TreasuryCap::type_(coin_struct).to_canonical_string(/* with_prefix */ true);
2000        let treasury_cap_obj_id = self
2001            .package_obj_type_cache
2002            .lock()
2003            .unwrap()
2004            .cache_get_or_set_with(format!("{}{}", package_id, treasury_cap_type), || {
2005                get_single_obj_id_from_package_publish(self, package_id, treasury_cap_type.clone())
2006                    .unwrap()
2007            })
2008            .ok_or(IndexerError::Generic(format!(
2009                "Cannot find treasury cap for type {}",
2010                treasury_cap_type
2011            )))?;
2012        let treasury_cap_obj_object =
2013            self.get_object(&treasury_cap_obj_id, None)?
2014                .ok_or(IndexerError::Generic(format!(
2015                    "Cannot find treasury cap object with id {}",
2016                    treasury_cap_obj_id
2017                )))?;
2018        Ok(TreasuryCap::try_from(treasury_cap_obj_object)?.total_supply)
2019    }
2020
2021    pub fn get_consistent_read_range(&self) -> Result<(i64, i64), IndexerError> {
2022        let latest_checkpoint_sequence = run_query!(&self.pool, |conn| {
2023            checkpoints::table
2024                .select(checkpoints::sequence_number)
2025                .order(checkpoints::sequence_number.desc())
2026                .first::<i64>(conn)
2027                .optional()
2028        })?
2029        .unwrap_or_default();
2030        let latest_object_snapshot_checkpoint_sequence = run_query!(&self.pool, |conn| {
2031            objects_snapshot::table
2032                .select(objects_snapshot::checkpoint_sequence_number)
2033                .order(objects_snapshot::checkpoint_sequence_number.desc())
2034                .first::<i64>(conn)
2035                .optional()
2036        })?
2037        .unwrap_or_default();
2038        Ok((
2039            latest_object_snapshot_checkpoint_sequence,
2040            latest_checkpoint_sequence,
2041        ))
2042    }
2043
2044    pub fn package_resolver(&self) -> PackageResolver {
2045        self.package_resolver.clone()
2046    }
2047
2048    pub async fn pending_active_validators(
2049        &self,
2050    ) -> Result<Vec<IotaValidatorSummary>, IndexerError> {
2051        self.spawn_blocking(move |this| {
2052            iota_types::iota_system_state::get_iota_system_state(&this)
2053                .and_then(|system_state| system_state.get_pending_active_validators(&this))
2054        })
2055        .await
2056        .map_err(Into::into)
2057    }
2058
2059    /// Get the participation metrics. Participation is defined as the total
2060    /// number of unique addresses that have delegated stake in the current
2061    /// epoch. Includes both staked and timelocked staked IOTA.
2062    pub fn get_participation_metrics(&self) -> IndexerResult<ParticipationMetrics> {
2063        run_query!(&self.pool, |conn| {
2064            diesel::sql_query("SELECT * FROM participation_metrics")
2065                .get_result::<StoredParticipationMetrics>(conn)
2066        })
2067        .map(Into::into)
2068    }
2069}
2070
2071impl iota_types::storage::ObjectStore for IndexerReader {
2072    fn get_object(
2073        &self,
2074        object_id: &ObjectID,
2075    ) -> Result<Option<iota_types::object::Object>, iota_types::storage::error::Error> {
2076        self.get_object(object_id, None)
2077            .map_err(iota_types::storage::error::Error::custom)
2078    }
2079
2080    fn get_object_by_key(
2081        &self,
2082        object_id: &ObjectID,
2083        version: iota_types::base_types::VersionNumber,
2084    ) -> Result<Option<iota_types::object::Object>, iota_types::storage::error::Error> {
2085        self.get_object(object_id, Some(version))
2086            .map_err(iota_types::storage::error::Error::custom)
2087    }
2088}
2089
2090fn get_single_obj_id_from_package_publish(
2091    reader: &IndexerReader,
2092    package_id: ObjectID,
2093    obj_type: String,
2094) -> Result<Option<ObjectID>, IndexerError> {
2095    let publish_txn_effects_opt = if is_system_package(package_id) {
2096        Some(reader.get_transaction_effects_with_sequence_number(0))
2097    } else {
2098        reader.get_object(&package_id, None)?.map(|o| {
2099            let publish_txn_digest = o.previous_transaction;
2100            reader.get_transaction_effects_with_digest(publish_txn_digest)
2101        })
2102    };
2103    if let Some(publish_txn_effects) = publish_txn_effects_opt {
2104        let created_objs = publish_txn_effects?
2105            .created()
2106            .iter()
2107            .map(|o| o.object_id())
2108            .collect::<Vec<_>>();
2109        let obj_ids_with_type =
2110            reader.filter_object_id_with_type(created_objs, obj_type.clone())?;
2111        if obj_ids_with_type.len() == 1 {
2112            Ok(Some(obj_ids_with_type[0]))
2113        } else if obj_ids_with_type.is_empty() {
2114            // The package exists but no such object is created in that transaction. Or
2115            // maybe it is wrapped and we don't know yet.
2116            Ok(None)
2117        } else {
2118            // We expect there to be only one object of this type created by the package but
2119            // more than one is found.
2120            tracing::error!(
2121                "There are more than one objects found for type {}",
2122                obj_type
2123            );
2124            Ok(None)
2125        }
2126    } else {
2127        // The coin package does not exist.
2128        Ok(None)
2129    }
2130}