1use std::{
6 collections::HashSet,
7 sync::{Arc, Mutex},
8};
9
10use anyhow::{Result, anyhow};
11use cached::{Cached, SizedCache};
12use diesel::{
13 ExpressionMethods, JoinOnDsl, NullableExpressionMethods, OptionalExtension, PgConnection,
14 QueryDsl, RunQueryDsl, SelectableHelper, TextExpressionMethods,
15 dsl::sql,
16 r2d2::ConnectionManager,
17 sql_query,
18 sql_types::{BigInt, Bool},
19};
20use fastcrypto::encoding::{Encoding, Hex};
21use iota_json_rpc_types::{
22 AddressMetrics, Balance, CheckpointId, Coin as IotaCoin, DisplayFieldsResponse, EpochInfo,
23 EventFilter, IotaCoinMetadata, IotaEvent, IotaMoveValue, IotaObjectDataFilter,
24 IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
25 IotaTransactionKind, MoveCallMetrics, MoveFunctionName, NetworkMetrics, ParticipationMetrics,
26 TransactionFilter,
27};
28use iota_package_resolver::{Package, PackageStore, PackageStoreWithLruCache, Resolver};
29use iota_types::{
30 TypeTag,
31 balance::Supply,
32 base_types::{IotaAddress, ObjectID, SequenceNumber, VersionNumber},
33 coin::{CoinMetadata, TreasuryCap},
34 coin_manager::CoinManager,
35 committee::EpochId,
36 digests::{ChainIdentifier, TransactionDigest},
37 dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
38 effects::TransactionEvents,
39 event::EventID,
40 iota_system_state::{
41 IotaSystemStateTrait,
42 iota_system_state_summary::{IotaSystemStateSummary, IotaValidatorSummary},
43 },
44 is_system_package,
45 messages_checkpoint::CheckpointDigest,
46 object::{Object, ObjectRead, PastObjectRead, bounded_visitor::BoundedVisitor},
47};
48use itertools::Itertools;
49use move_core_types::{annotated_value::MoveStructLayout, language_storage::StructTag};
50use tap::TapFallible;
51
52use crate::{
53 db::{ConnectionConfig, ConnectionPool, ConnectionPoolConfig},
54 errors::IndexerError,
55 models::{
56 address_metrics::StoredAddressMetrics,
57 checkpoints::{StoredChainIdentifier, StoredCheckpoint},
58 display::StoredDisplay,
59 epoch::StoredEpochInfo,
60 events::StoredEvent,
61 move_call_metrics::QueriedMoveCallMetrics,
62 network_metrics::StoredNetworkMetrics,
63 obj_indices::StoredObjectVersion,
64 objects::{CoinBalance, StoredHistoryObject, StoredObject},
65 participation_metrics::StoredParticipationMetrics,
66 transactions::{
67 OptimisticTransaction, StoredTransaction, StoredTransactionEvents,
68 stored_events_to_events, tx_events_to_iota_tx_events,
69 },
70 tx_indices::TxSequenceNumber,
71 },
72 schema::{
73 address_metrics, addresses, chain_identifier, checkpoints, display, epochs, events,
74 objects, objects_history, objects_snapshot, objects_version, optimistic_transactions,
75 packages, pruner_cp_watermark, transactions, tx_digests, tx_insertion_order,
76 },
77 store::{diesel_macro::*, package_resolver::IndexerStorePackageResolver},
78 types::{IndexerResult, OwnerType},
79};
80
81pub const TX_SEQUENCE_NUMBER_STR: &str = "tx_sequence_number";
82pub const TRANSACTION_DIGEST_STR: &str = "transaction_digest";
83pub const EVENT_SEQUENCE_NUMBER_STR: &str = "event_sequence_number";
84
85pub struct IndexerReader {
86 pool: ConnectionPool,
87 package_resolver: PackageResolver,
88 package_obj_type_cache: Arc<Mutex<SizedCache<String, Option<ObjectID>>>>,
89}
90
91impl Clone for IndexerReader {
92 fn clone(&self) -> IndexerReader {
93 IndexerReader {
94 pool: self.pool.clone(),
95 package_resolver: self.package_resolver.clone(),
96 package_obj_type_cache: self.package_obj_type_cache.clone(),
97 }
98 }
99}
100
101pub type PackageResolver = Arc<Resolver<PackageStoreWithLruCache<IndexerStorePackageResolver>>>;
102
103impl IndexerReader {
105 pub fn new<T: Into<String>>(db_url: T) -> Result<Self> {
106 let config = ConnectionPoolConfig::default();
107 Self::new_with_config(db_url, config)
108 }
109
110 pub fn new_with_config<T: Into<String>>(
111 db_url: T,
112 config: ConnectionPoolConfig,
113 ) -> Result<Self> {
114 let manager = ConnectionManager::<PgConnection>::new(db_url);
115
116 let connection_config = ConnectionConfig {
117 statement_timeout: config.statement_timeout,
118 read_only: true,
119 };
120
121 let pool = diesel::r2d2::Pool::builder()
122 .max_size(config.pool_size)
123 .connection_timeout(config.connection_timeout)
124 .connection_customizer(Box::new(connection_config))
125 .build(manager)
126 .map_err(|e| anyhow!("Failed to initialize connection pool. Error: {:?}. If Error is None, please check whether the configured pool size (currently {}) exceeds the maximum number of connections allowed by the database.", e, config.pool_size))?;
127
128 let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
129 let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
130 let package_resolver = Arc::new(Resolver::new(package_cache));
131 let package_obj_type_cache = Arc::new(Mutex::new(SizedCache::with_size(10000)));
132 Ok(Self {
133 pool,
134 package_resolver,
135 package_obj_type_cache,
136 })
137 }
138
139 pub async fn spawn_blocking<F, R, E>(&self, f: F) -> Result<R, E>
140 where
141 F: FnOnce(Self) -> Result<R, E> + Send + 'static,
142 R: Send + 'static,
143 E: Send + 'static,
144 {
145 let this = self.clone();
146 let current_span = tracing::Span::current();
147 tokio::task::spawn_blocking(move || {
148 CALLED_FROM_BLOCKING_POOL
149 .with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
150 let _guard = current_span.enter();
151 f(this)
152 })
153 .await
154 .expect("propagate any panics")
155 }
156
157 pub fn get_pool(&self) -> ConnectionPool {
158 self.pool.clone()
159 }
160}
161
162impl IndexerReader {
164 fn get_object_from_db(
165 &self,
166 object_id: &ObjectID,
167 version: Option<VersionNumber>,
168 ) -> Result<Option<StoredObject>, IndexerError> {
169 let object_id = object_id.to_vec();
170
171 let stored_object = run_query!(&self.pool, |conn| {
172 if let Some(version) = version {
173 objects::dsl::objects
174 .filter(objects::dsl::object_id.eq(object_id))
175 .filter(objects::dsl::object_version.eq(version.value() as i64))
176 .first::<StoredObject>(conn)
177 .optional()
178 } else {
179 objects::dsl::objects
180 .filter(objects::dsl::object_id.eq(object_id))
181 .first::<StoredObject>(conn)
182 .optional()
183 }
184 })?;
185 Ok(stored_object)
186 }
187
188 fn get_object(
189 &self,
190 object_id: &ObjectID,
191 version: Option<VersionNumber>,
192 ) -> Result<Option<Object>, IndexerError> {
193 let Some(stored_package) = self.get_object_from_db(object_id, version)? else {
194 return Ok(None);
195 };
196
197 let object = stored_package.try_into()?;
198 Ok(Some(object))
199 }
200
201 pub async fn get_object_in_blocking_task(
202 &self,
203 object_id: ObjectID,
204 ) -> Result<Option<Object>, IndexerError> {
205 self.spawn_blocking(move |this| this.get_object(&object_id, None))
206 .await
207 }
208
209 pub async fn get_object_read_in_blocking_task(
210 &self,
211 object_id: ObjectID,
212 ) -> Result<ObjectRead, IndexerError> {
213 let stored_object = self
214 .spawn_blocking(move |this| this.get_object_raw(object_id))
215 .await?;
216
217 if let Some(object) = stored_object {
218 object
219 .try_into_object_read(self.package_resolver.clone())
220 .await
221 } else {
222 Ok(ObjectRead::NotExists(object_id))
223 }
224 }
225
226 fn get_object_raw(&self, object_id: ObjectID) -> Result<Option<StoredObject>, IndexerError> {
227 let id = object_id.to_vec();
228 let stored_object = run_query!(&self.pool, |conn| {
229 objects::dsl::objects
230 .filter(objects::dsl::object_id.eq(id))
231 .first::<StoredObject>(conn)
232 .optional()
233 })?;
234 Ok(stored_object)
235 }
236
237 pub(crate) async fn get_past_object_read(
246 &self,
247 object_id: ObjectID,
248 object_version: SequenceNumber,
249 before_version: bool,
250 ) -> Result<PastObjectRead, IndexerError> {
251 let object_version_num = object_version.value() as i64;
252
253 let pool = self.get_pool();
256 let object_id_bytes = object_id.to_vec();
257 let object_version_info: Option<StoredObjectVersion> =
258 run_query_async!(&pool, move |conn| {
259 let mut query = objects_version::dsl::objects_version
260 .filter(objects_version::object_id.eq(&object_id_bytes))
261 .into_boxed();
262
263 if before_version {
264 query = query.filter(objects_version::object_version.lt(object_version_num));
265 } else {
266 query = query.filter(objects_version::object_version.eq(object_version_num));
267 }
268
269 query
270 .order_by(objects_version::object_version.desc())
271 .limit(1)
272 .first::<StoredObjectVersion>(conn)
273 .optional()
274 })?;
275
276 let Some(object_version_info) = object_version_info else {
277 let pool = self.get_pool();
279 let object_id_bytes = object_id.to_vec();
280 let latest_existing_version: Option<i64> = run_query_async!(&pool, move |conn| {
281 objects_version::dsl::objects_version
282 .filter(objects_version::object_id.eq(&object_id_bytes))
283 .order_by(objects_version::object_version.desc())
284 .select(objects_version::object_version)
285 .limit(1)
286 .first::<i64>(conn)
287 .optional()
288 })?;
289
290 return match latest_existing_version {
291 Some(latest) if object_version_num > latest => Ok(PastObjectRead::VersionTooHigh {
292 object_id,
293 asked_version: object_version,
294 latest_version: SequenceNumber::from(latest as u64),
295 }),
296 Some(_) => Ok(PastObjectRead::VersionNotFound(object_id, object_version)),
297 None => Ok(PastObjectRead::ObjectNotExists(object_id)),
298 };
299 };
300
301 let history_object = self
303 .get_stored_history_object(
304 object_id,
305 object_version_info.object_version,
306 object_version_info.cp_sequence_number,
307 )
308 .await?;
309
310 match history_object {
311 Some(obj) => {
312 obj.try_into_past_object_read(self.package_resolver.clone())
313 .await
314 }
315 None => Err(IndexerError::PersistentStorageDataCorruption(format!(
316 "Object version {} not found in objects_history for object {}",
317 object_version_info.object_version, object_id
318 ))),
319 }
320 }
321
322 pub async fn get_stored_history_object(
323 &self,
324 object_id: ObjectID,
325 object_version: i64,
326 checkpoint_sequence_number: i64,
327 ) -> Result<Option<StoredHistoryObject>, IndexerError> {
328 let pool = self.get_pool();
329 let object_id_bytes = object_id.to_vec();
330 run_query_async!(&pool, move |conn| {
331 let query = objects_history::dsl::objects_history
333 .filter(objects_history::checkpoint_sequence_number.eq(checkpoint_sequence_number))
334 .filter(objects_history::object_id.eq(&object_id_bytes))
335 .filter(objects_history::object_version.eq(object_version))
336 .into_boxed();
337
338 query
339 .order_by(objects_history::object_version.desc())
340 .limit(1)
341 .first::<StoredHistoryObject>(conn)
342 .optional()
343 })
344 }
345
346 pub async fn get_package(&self, package_id: ObjectID) -> Result<Package, IndexerError> {
347 let store = self.package_resolver.package_store();
348 let pkg = store
349 .fetch(package_id.into())
350 .await
351 .map_err(|e| {
352 IndexerError::PostgresRead(format!(
353 "Fail to fetch package from package store with error {:?}",
354 e
355 ))
356 })?
357 .as_ref()
358 .clone();
359 Ok(pkg)
360 }
361
362 pub fn get_epoch_info_from_db(
363 &self,
364 epoch: Option<EpochId>,
365 ) -> Result<Option<StoredEpochInfo>, IndexerError> {
366 let stored_epoch = run_query!(&self.pool, |conn| {
367 if let Some(epoch) = epoch {
368 epochs::dsl::epochs
369 .filter(epochs::epoch.eq(epoch as i64))
370 .first::<StoredEpochInfo>(conn)
371 .optional()
372 } else {
373 epochs::dsl::epochs
374 .order_by(epochs::epoch.desc())
375 .first::<StoredEpochInfo>(conn)
376 .optional()
377 }
378 })?;
379
380 Ok(stored_epoch)
381 }
382
383 pub fn get_latest_epoch_info_from_db(&self) -> Result<StoredEpochInfo, IndexerError> {
384 let stored_epoch = run_query!(&self.pool, |conn| {
385 epochs::dsl::epochs
386 .order_by(epochs::epoch.desc())
387 .first::<StoredEpochInfo>(conn)
388 })?;
389
390 Ok(stored_epoch)
391 }
392
393 pub fn get_epoch_info(
394 &self,
395 epoch: Option<EpochId>,
396 ) -> Result<Option<EpochInfo>, IndexerError> {
397 let stored_epoch = self.get_epoch_info_from_db(epoch)?;
398
399 let stored_epoch = match stored_epoch {
400 Some(stored_epoch) => stored_epoch,
401 None => return Ok(None),
402 };
403
404 let epoch_info = EpochInfo::try_from(stored_epoch)?;
405 Ok(Some(epoch_info))
406 }
407
408 fn get_epochs_from_db(
409 &self,
410 cursor: Option<u64>,
411 limit: usize,
412 descending_order: bool,
413 ) -> Result<Vec<StoredEpochInfo>, IndexerError> {
414 run_query!(&self.pool, |conn| {
415 let mut boxed_query = epochs::table.into_boxed();
416 if let Some(cursor) = cursor {
417 if descending_order {
418 boxed_query = boxed_query.filter(epochs::epoch.lt(cursor as i64));
419 } else {
420 boxed_query = boxed_query.filter(epochs::epoch.gt(cursor as i64));
421 }
422 }
423 if descending_order {
424 boxed_query = boxed_query.order_by(epochs::epoch.desc());
425 } else {
426 boxed_query = boxed_query.order_by(epochs::epoch.asc());
427 }
428
429 boxed_query.limit(limit as i64).load(conn)
430 })
431 }
432
433 pub fn get_epochs(
434 &self,
435 cursor: Option<u64>,
436 limit: usize,
437 descending_order: bool,
438 ) -> Result<Vec<EpochInfo>, IndexerError> {
439 self.get_epochs_from_db(cursor, limit, descending_order)?
440 .into_iter()
441 .map(EpochInfo::try_from)
442 .collect::<Result<Vec<_>, _>>()
443 }
444
445 pub fn get_latest_iota_system_state(&self) -> Result<IotaSystemStateSummary, IndexerError> {
446 let system_state: IotaSystemStateSummary =
447 iota_types::iota_system_state::get_iota_system_state(self)?
448 .into_iota_system_state_summary();
449 Ok(system_state)
450 }
451
452 pub fn get_epoch_iota_system_state(
459 &self,
460 epoch: Option<EpochId>,
461 ) -> Result<IotaSystemStateSummary, IndexerError> {
462 let stored_epoch = self.get_epoch_info_from_db(epoch)?;
463 let stored_epoch = match stored_epoch {
464 Some(stored_epoch) => stored_epoch,
465 None => return Err(IndexerError::InvalidArgument("Invalid epoch".into())),
466 };
467
468 let system_state: IotaSystemStateSummary = bcs::from_bytes(&stored_epoch.system_state)
469 .map_err(|_| {
470 IndexerError::PersistentStorageDataCorruption(format!(
471 "Failed to deserialize `system_state` for epoch {:?}",
472 epoch,
473 ))
474 })?;
475 Ok(system_state)
476 }
477
478 pub async fn get_chain_identifier_in_blocking_task(
479 &self,
480 ) -> Result<ChainIdentifier, IndexerError> {
481 self.spawn_blocking(|this| this.get_chain_identifier())
482 .await
483 }
484
485 pub fn get_chain_identifier(&self) -> Result<ChainIdentifier, IndexerError> {
486 let stored_chain_identifier = run_query!(&self.pool, |conn| {
487 chain_identifier::dsl::chain_identifier
488 .first::<StoredChainIdentifier>(conn)
489 .optional()
490 })?
491 .ok_or(IndexerError::PostgresRead(
492 "chain identifier not found".to_string(),
493 ))?;
494
495 let checkpoint_digest =
496 CheckpointDigest::try_from(stored_chain_identifier.checkpoint_digest).map_err(|e| {
497 IndexerError::PersistentStorageDataCorruption(format!(
498 "failed to decode chain identifier with err: {e:?}"
499 ))
500 })?;
501
502 Ok(checkpoint_digest.into())
503 }
504
505 pub fn get_checkpoint_from_db(
506 &self,
507 checkpoint_id: CheckpointId,
508 ) -> Result<Option<StoredCheckpoint>, IndexerError> {
509 let stored_checkpoint = run_query!(&self.pool, |conn| {
510 match checkpoint_id {
511 CheckpointId::SequenceNumber(seq) => checkpoints::dsl::checkpoints
512 .filter(checkpoints::sequence_number.eq(seq as i64))
513 .first::<StoredCheckpoint>(conn)
514 .optional(),
515 CheckpointId::Digest(digest) => checkpoints::dsl::checkpoints
516 .filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec()))
517 .first::<StoredCheckpoint>(conn)
518 .optional(),
519 }
520 })?;
521
522 Ok(stored_checkpoint)
523 }
524
525 pub fn get_latest_checkpoint_from_db(&self) -> Result<StoredCheckpoint, IndexerError> {
526 let stored_checkpoint = run_query!(&self.pool, |conn| {
527 checkpoints::dsl::checkpoints
528 .order_by(checkpoints::sequence_number.desc())
529 .first::<StoredCheckpoint>(conn)
530 })?;
531
532 Ok(stored_checkpoint)
533 }
534
535 pub fn get_checkpoint(
536 &self,
537 checkpoint_id: CheckpointId,
538 ) -> Result<Option<iota_json_rpc_types::Checkpoint>, IndexerError> {
539 let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id)? {
540 Some(stored_checkpoint) => stored_checkpoint,
541 None => return Ok(None),
542 };
543
544 let checkpoint = iota_json_rpc_types::Checkpoint::try_from(stored_checkpoint)?;
545 Ok(Some(checkpoint))
546 }
547
548 pub fn get_latest_checkpoint(&self) -> Result<iota_json_rpc_types::Checkpoint, IndexerError> {
549 let stored_checkpoint = self.get_latest_checkpoint_from_db()?;
550
551 iota_json_rpc_types::Checkpoint::try_from(stored_checkpoint)
552 }
553
554 pub async fn get_latest_checkpoint_timestamp_ms_in_blocking_task(
555 &self,
556 ) -> Result<u64, IndexerError> {
557 self.spawn_blocking(|this| this.get_latest_checkpoint_timestamp_ms())
558 .await
559 }
560
561 pub fn get_latest_checkpoint_timestamp_ms(&self) -> Result<u64, IndexerError> {
562 Ok(self.get_latest_checkpoint()?.timestamp_ms)
563 }
564
565 fn get_checkpoints_from_db(
566 &self,
567 cursor: Option<u64>,
568 limit: usize,
569 descending_order: bool,
570 ) -> Result<Vec<StoredCheckpoint>, IndexerError> {
571 run_query!(&self.pool, |conn| {
572 let mut boxed_query = checkpoints::table.into_boxed();
573 if let Some(cursor) = cursor {
574 if descending_order {
575 boxed_query =
576 boxed_query.filter(checkpoints::sequence_number.lt(cursor as i64));
577 } else {
578 boxed_query =
579 boxed_query.filter(checkpoints::sequence_number.gt(cursor as i64));
580 }
581 }
582 if descending_order {
583 boxed_query = boxed_query.order_by(checkpoints::sequence_number.desc());
584 } else {
585 boxed_query = boxed_query.order_by(checkpoints::sequence_number.asc());
586 }
587
588 boxed_query
589 .limit(limit as i64)
590 .load::<StoredCheckpoint>(conn)
591 })
592 }
593
594 pub fn get_checkpoints(
595 &self,
596 cursor: Option<u64>,
597 limit: usize,
598 descending_order: bool,
599 ) -> Result<Vec<iota_json_rpc_types::Checkpoint>, IndexerError> {
600 self.get_checkpoints_from_db(cursor, limit, descending_order)?
601 .into_iter()
602 .map(iota_json_rpc_types::Checkpoint::try_from)
603 .collect()
604 }
605
606 fn get_transaction_effects_with_digest(
607 &self,
608 digest: TransactionDigest,
609 ) -> Result<IotaTransactionBlockEffects, IndexerError> {
610 let stored_txn: StoredTransaction = run_query!(&self.pool, |conn| {
611 transactions::table
612 .filter(
613 transactions::tx_sequence_number
614 .nullable()
615 .eq(tx_digests::table
616 .select(tx_digests::tx_sequence_number)
617 .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
620 .single_value()),
621 )
622 .first::<StoredTransaction>(conn)
623 })?;
624
625 stored_txn.try_into_iota_transaction_effects()
626 }
627
628 fn get_transaction_effects_with_sequence_number(
629 &self,
630 sequence_number: i64,
631 ) -> Result<IotaTransactionBlockEffects, IndexerError> {
632 let stored_txn: StoredTransaction = run_query!(&self.pool, |conn| {
633 transactions::table
634 .filter(transactions::tx_sequence_number.eq(sequence_number))
635 .first::<StoredTransaction>(conn)
636 })?;
637
638 stored_txn.try_into_iota_transaction_effects()
639 }
640
641 fn multi_get_transactions(
642 &self,
643 digests: &[TransactionDigest],
644 ) -> Result<Vec<StoredTransaction>, IndexerError> {
645 let digests = digests
646 .iter()
647 .map(|digest| digest.inner().to_vec())
648 .collect::<HashSet<_>>();
649 let checkpointed_txs = run_query!(&self.pool, |conn| {
650 transactions::table
651 .inner_join(
652 tx_digests::table
653 .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
654 )
655 .filter(tx_digests::tx_digest.eq_any(&digests))
658 .select(StoredTransaction::as_select())
659 .load::<StoredTransaction>(conn)
660 })?;
661 if checkpointed_txs.len() == digests.len() {
662 return Ok(checkpointed_txs);
663 }
664 let mut missing_digests = digests;
665 for tx in &checkpointed_txs {
666 missing_digests.remove(&tx.transaction_digest);
667 }
668 let optimistic_txs = run_query!(&self.pool, |conn| {
669 optimistic_transactions::table
670 .inner_join(
671 tx_insertion_order::table.on(optimistic_transactions::insertion_order
672 .eq(tx_insertion_order::insertion_order)),
673 )
674 .filter(tx_insertion_order::tx_digest.eq_any(missing_digests))
677 .select(OptimisticTransaction::as_select())
678 .load::<OptimisticTransaction>(conn)
679 })?;
680 Ok(checkpointed_txs
681 .into_iter()
682 .chain(optimistic_txs.into_iter().map(Into::into))
683 .collect())
684 }
685
686 async fn multi_get_transactions_in_blocking_task(
687 &self,
688 digests: Vec<TransactionDigest>,
689 ) -> Result<Vec<StoredTransaction>, IndexerError> {
690 self.spawn_blocking(move |this| this.multi_get_transactions(&digests))
691 .await
692 }
693
694 async fn stored_transaction_to_transaction_block(
697 &self,
698 stored_txes: Vec<StoredTransaction>,
699 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
700 ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
701 let mut tx_block_responses_futures = vec![];
702 for stored_tx in stored_txes {
703 let package_resolver_clone = self.package_resolver();
704 let options_clone = options.clone();
705 tx_block_responses_futures.push(tokio::task::spawn(
706 stored_tx.try_into_iota_transaction_block_response(
707 options_clone,
708 package_resolver_clone,
709 ),
710 ));
711 }
712
713 let tx_blocks = futures::future::join_all(tx_block_responses_futures)
714 .await
715 .into_iter()
716 .collect::<Result<Vec<_>, _>>()
717 .tap_err(|e| tracing::error!("Failed to join all tx block futures: {}", e))?
718 .into_iter()
719 .collect::<Result<Vec<_>, _>>()
720 .tap_err(|e| tracing::error!("Failed to collect tx block futures: {}", e))?;
721 Ok(tx_blocks)
722 }
723
724 fn multi_get_transactions_with_sequence_numbers(
725 &self,
726 tx_sequence_numbers: Vec<i64>,
727 is_descending: Option<bool>,
729 ) -> Result<Vec<StoredTransaction>, IndexerError> {
730 let mut query = transactions::table
731 .filter(transactions::tx_sequence_number.eq_any(tx_sequence_numbers))
732 .into_boxed();
733 match is_descending {
734 Some(true) => {
735 query = query.order(transactions::dsl::tx_sequence_number.desc());
736 }
737 Some(false) => {
738 query = query.order(transactions::dsl::tx_sequence_number.asc());
739 }
740 None => (),
741 }
742 run_query!(&self.pool, |conn| query.load::<StoredTransaction>(conn))
743 }
744
745 pub async fn get_owned_objects_in_blocking_task(
746 &self,
747 address: IotaAddress,
748 filter: Option<IotaObjectDataFilter>,
749 cursor: Option<ObjectID>,
750 limit: usize,
751 ) -> Result<Vec<StoredObject>, IndexerError> {
752 self.spawn_blocking(move |this| this.get_owned_objects_impl(address, filter, cursor, limit))
753 .await
754 }
755
756 fn get_owned_objects_impl(
757 &self,
758 address: IotaAddress,
759 filter: Option<IotaObjectDataFilter>,
760 cursor: Option<ObjectID>,
761 limit: usize,
762 ) -> Result<Vec<StoredObject>, IndexerError> {
763 run_query!(&self.pool, |conn| {
764 let mut query = objects::dsl::objects
765 .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
766 .filter(objects::dsl::owner_id.eq(address.to_vec()))
767 .order(objects::dsl::object_id.asc())
768 .limit(limit as i64)
769 .into_boxed();
770 if let Some(filter) = filter {
771 match filter {
772 IotaObjectDataFilter::StructType(struct_tag) => {
773 let object_type =
774 struct_tag.to_canonical_string(true);
775 query =
776 query.filter(objects::object_type.like(format!("{}%", object_type)));
777 }
778 IotaObjectDataFilter::MatchAny(filters) => {
779 let mut condition = "(".to_string();
780 for (i, filter) in filters.iter().enumerate() {
781 if let IotaObjectDataFilter::StructType(struct_tag) = filter {
782 let object_type =
783 struct_tag.to_canonical_string(true);
784 if i == 0 {
785 condition +=
786 format!("objects.object_type LIKE '{}%'", object_type)
787 .as_str();
788 } else {
789 condition +=
790 format!(" OR objects.object_type LIKE '{}%'", object_type)
791 .as_str();
792 }
793 } else {
794 return Err(IndexerError::InvalidArgument(
795 "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
796 ));
797 }
798 }
799 condition += ")";
800 query = query.filter(sql::<Bool>(&condition));
801 }
802 IotaObjectDataFilter::MatchNone(filters) => {
803 for filter in filters {
804 if let IotaObjectDataFilter::StructType(struct_tag) = filter {
805 let object_type =
806 struct_tag.to_canonical_string(true);
807 query = query.filter(
808 objects::object_type.not_like(format!("{}%", object_type)),
809 );
810 } else {
811 return Err(IndexerError::InvalidArgument(
812 "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
813 ));
814 }
815 }
816 }
817 _ => {
818 return Err(IndexerError::InvalidArgument(
819 "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
820 ));
821 }
822 }
823 }
824
825 if let Some(object_cursor) = cursor {
826 query = query.filter(objects::dsl::object_id.gt(object_cursor.to_vec()));
827 }
828
829 query
830 .load::<StoredObject>(conn)
831 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
832 })
833 }
834
835 fn get_singleton_object(&self, struct_tag: StructTag) -> Result<Option<Object>, IndexerError> {
836 let object_type = struct_tag.to_canonical_string(true);
837
838 run_query!(&self.pool, |conn| {
839 let object = match objects::dsl::objects
840 .filter(objects::object_type_package.eq(struct_tag.address.to_vec()))
841 .filter(objects::object_type_module.eq(struct_tag.module.to_string()))
842 .filter(objects::object_type_name.eq(struct_tag.name.to_string()))
843 .filter(objects::object_type.eq(object_type))
844 .first::<StoredObject>(conn)
845 .optional()
846 .map_err(|e| IndexerError::PostgresRead(e.to_string()))?
847 {
848 Some(object) => object,
849 None => return Ok::<Option<Object>, IndexerError>(None),
850 }
851 .try_into()?;
852 Ok(Some(object))
853 })
854 }
855
856 fn filter_object_id_with_type(
857 &self,
858 object_ids: Vec<ObjectID>,
859 object_type: String,
860 ) -> Result<Vec<ObjectID>, IndexerError> {
861 let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
862 let filtered_ids = run_query!(&self.pool, |conn| {
863 objects::dsl::objects
864 .filter(objects::object_id.eq_any(object_ids))
865 .filter(objects::object_type.eq(object_type))
866 .select(objects::object_id)
867 .load::<Vec<u8>>(conn)
868 })?;
869
870 filtered_ids
871 .into_iter()
872 .map(|id| {
873 ObjectID::from_bytes(id.clone()).map_err(|_e| {
874 IndexerError::PersistentStorageDataCorruption(format!(
875 "Can't convert {:?} to ObjectID",
876 id,
877 ))
878 })
879 })
880 .collect::<Result<Vec<_>, _>>()
881 }
882
883 pub async fn multi_get_objects_in_blocking_task(
884 &self,
885 object_ids: Vec<ObjectID>,
886 ) -> Result<Vec<StoredObject>, IndexerError> {
887 self.spawn_blocking(move |this| this.multi_get_objects_impl(object_ids))
888 .await
889 }
890
891 fn multi_get_objects_impl(
892 &self,
893 object_ids: Vec<ObjectID>,
894 ) -> Result<Vec<StoredObject>, IndexerError> {
895 let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
896 run_query!(&self.pool, |conn| {
897 objects::dsl::objects
898 .filter(objects::object_id.eq_any(object_ids))
899 .load::<StoredObject>(conn)
900 })
901 }
902
903 async fn query_transaction_blocks_by_checkpoint_impl(
904 &self,
905 checkpoint_seq: u64,
906 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
907 cursor_tx_seq: Option<i64>,
908 limit: usize,
909 is_descending: bool,
910 ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
911 let pool = self.get_pool();
912 let tx_range: (i64, i64) = run_query_async!(&pool, move |conn| {
913 pruner_cp_watermark::dsl::pruner_cp_watermark
914 .select((
915 pruner_cp_watermark::min_tx_sequence_number,
916 pruner_cp_watermark::max_tx_sequence_number,
917 ))
918 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint_seq as i64))
921 .first::<(i64, i64)>(conn)
922 })?;
923
924 let mut query = transactions::dsl::transactions
925 .filter(transactions::tx_sequence_number.between(tx_range.0, tx_range.1))
926 .into_boxed();
927
928 if let Some(cursor_tx_seq) = cursor_tx_seq {
930 if is_descending {
931 query = query.filter(transactions::dsl::tx_sequence_number.lt(cursor_tx_seq));
932 } else {
933 query = query.filter(transactions::dsl::tx_sequence_number.gt(cursor_tx_seq));
934 }
935 }
936 if is_descending {
937 query = query.order(transactions::dsl::tx_sequence_number.desc());
938 } else {
939 query = query.order(transactions::dsl::tx_sequence_number.asc());
940 }
941 let pool = self.get_pool();
942 let stored_txes = run_query_async!(&pool, move |conn| query
943 .limit(limit as i64)
944 .load::<StoredTransaction>(conn))?;
945
946 self.stored_transaction_to_transaction_block(stored_txes, options)
947 .await
948 }
949
950 pub async fn query_transaction_blocks_in_blocking_task(
951 &self,
952 filter: Option<TransactionFilter>,
953 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
954 cursor: Option<TransactionDigest>,
955 limit: usize,
956 is_descending: bool,
957 ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
958 self.query_transaction_blocks_impl(filter, options, cursor, limit, is_descending)
959 .await
960 }
961
962 async fn query_transaction_blocks_impl(
963 &self,
964 filter: Option<TransactionFilter>,
965 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
966 cursor: Option<TransactionDigest>,
967 limit: usize,
968 is_descending: bool,
969 ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
970 let cursor_tx_seq = if let Some(cursor) = cursor {
971 let pool = self.get_pool();
972 let tx_seq = run_query_async!(&pool, move |conn| {
973 tx_digests::table
974 .select(tx_digests::tx_sequence_number)
975 .filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec()))
978 .first::<i64>(conn)
979 })?;
980 Some(tx_seq)
981 } else {
982 None
983 };
984 let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
985 if is_descending {
986 format!("AND {TX_SEQUENCE_NUMBER_STR} < {}", cursor_tx_seq)
987 } else {
988 format!("AND {TX_SEQUENCE_NUMBER_STR} > {}", cursor_tx_seq)
989 }
990 } else {
991 "".to_string()
992 };
993 let order_str = if is_descending { "DESC" } else { "ASC" };
994 let (table_name, main_where_clause) = match filter {
995 Some(TransactionFilter::Checkpoint(seq)) => {
997 return self
998 .query_transaction_blocks_by_checkpoint_impl(
999 seq,
1000 options,
1001 cursor_tx_seq,
1002 limit,
1003 is_descending,
1004 )
1005 .await;
1006 }
1007 Some(TransactionFilter::MoveFunction {
1009 package,
1010 module,
1011 function,
1012 }) => {
1013 let package = Hex::encode(package.to_vec());
1014 match (module, function) {
1015 (Some(module), Some(function)) => (
1016 "tx_calls_fun".into(),
1017 format!(
1018 "package = '\\x{}'::bytea AND module = '{}' AND func = '{}'",
1019 package, module, function
1020 ),
1021 ),
1022 (Some(module), None) => (
1023 "tx_calls_mod".into(),
1024 format!(
1025 "package = '\\x{}'::bytea AND module = '{}'",
1026 package, module
1027 ),
1028 ),
1029 (None, Some(_)) => {
1030 return Err(IndexerError::InvalidArgument(
1031 "Function cannot be present without Module.".into(),
1032 ));
1033 }
1034 (None, None) => (
1035 "tx_calls_pkg".into(),
1036 format!("package = '\\x{}'::bytea", package),
1037 ),
1038 }
1039 }
1040 Some(TransactionFilter::InputObject(object_id)) => {
1041 let object_id = Hex::encode(object_id.to_vec());
1042 (
1043 "tx_input_objects".into(),
1044 format!("object_id = '\\x{}'::bytea", object_id),
1045 )
1046 }
1047 Some(TransactionFilter::ChangedObject(object_id)) => {
1048 let object_id = Hex::encode(object_id.to_vec());
1049 (
1050 "tx_changed_objects".into(),
1051 format!("object_id = '\\x{}'::bytea", object_id),
1052 )
1053 }
1054 Some(TransactionFilter::FromAddress(from_address)) => {
1055 let from_address = Hex::encode(from_address.to_vec());
1056 (
1057 "tx_senders".into(),
1058 format!("sender = '\\x{}'::bytea", from_address),
1059 )
1060 }
1061 Some(TransactionFilter::ToAddress(to_address)) => {
1062 let to_address = Hex::encode(to_address.to_vec());
1063 (
1064 "tx_recipients".into(),
1065 format!("recipient = '\\x{}'::bytea", to_address),
1066 )
1067 }
1068 Some(TransactionFilter::FromAndToAddress { from, to }) => {
1069 let from_address = Hex::encode(from.to_vec());
1070 let to_address = Hex::encode(to.to_vec());
1071 let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
1073 if is_descending {
1074 format!(
1075 "AND tx_senders.{TX_SEQUENCE_NUMBER_STR} < {}",
1076 cursor_tx_seq
1077 )
1078 } else {
1079 format!(
1080 "AND tx_senders.{TX_SEQUENCE_NUMBER_STR} > {}",
1081 cursor_tx_seq
1082 )
1083 }
1084 } else {
1085 "".to_string()
1086 };
1087 let inner_query = format!(
1088 "(SELECT tx_senders.{TX_SEQUENCE_NUMBER_STR} \
1089 FROM tx_senders \
1090 JOIN tx_recipients \
1091 ON tx_senders.{TX_SEQUENCE_NUMBER_STR} = tx_recipients.{TX_SEQUENCE_NUMBER_STR} \
1092 WHERE tx_senders.sender = '\\x{}'::BYTEA \
1093 AND tx_recipients.recipient = '\\x{}'::BYTEA \
1094 {} \
1095 ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1096 LIMIT {}) AS inner_query
1097 ",
1098 from_address,
1099 to_address,
1100 cursor_clause,
1101 order_str,
1102 limit,
1103 );
1104 (inner_query, "1 = 1".into())
1105 }
1106 Some(TransactionFilter::FromOrToAddress { addr }) => {
1107 let address = Hex::encode(addr.to_vec());
1108 let inner_query = format!(
1109 "( \
1110 ( \
1111 SELECT {TX_SEQUENCE_NUMBER_STR} FROM tx_senders \
1112 WHERE sender = '\\x{}'::BYTEA {} \
1113 ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1114 LIMIT {} \
1115 ) \
1116 UNION \
1117 ( \
1118 SELECT {TX_SEQUENCE_NUMBER_STR} FROM tx_recipients \
1119 WHERE recipient = '\\x{}'::BYTEA {} \
1120 ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1121 LIMIT {} \
1122 ) \
1123 ) AS combined",
1124 address,
1125 cursor_clause,
1126 order_str,
1127 limit,
1128 address,
1129 cursor_clause,
1130 order_str,
1131 limit,
1132 );
1133 (inner_query, "1 = 1".into())
1134 }
1135 Some(TransactionFilter::TransactionKind(kind)) => {
1136 if kind == IotaTransactionKind::SystemTransaction {
1139 ("tx_kinds".into(), "tx_kind != 1".to_string())
1140 } else {
1141 ("tx_kinds".into(), format!("tx_kind = {}", kind as u8))
1142 }
1143 }
1144 Some(TransactionFilter::TransactionKindIn(kind_vec)) => {
1145 if kind_vec.is_empty() {
1146 return Err(IndexerError::InvalidArgument(
1147 "no transaction kind provided".into(),
1148 ));
1149 }
1150
1151 let mut has_system_transaction = false;
1152 let mut has_programmable_transaction = false;
1153 let mut other_kinds = HashSet::new();
1154
1155 for kind in kind_vec.iter() {
1156 match kind {
1157 IotaTransactionKind::SystemTransaction => has_system_transaction = true,
1158 IotaTransactionKind::ProgrammableTransaction => {
1159 has_programmable_transaction = true
1160 }
1161 other => {
1162 other_kinds.insert(*other as u8);
1163 }
1164 }
1165 }
1166
1167 let query = if has_system_transaction {
1168 if !has_programmable_transaction {
1171 "tx_kind != 1".to_string()
1172 } else {
1173 "1 = 1".to_string()
1175 }
1176 } else {
1177 if has_programmable_transaction {
1179 other_kinds.insert(IotaTransactionKind::ProgrammableTransaction as u8);
1180 }
1181
1182 if other_kinds.is_empty() {
1183 "1 = 1".to_string()
1185 } else {
1186 let mut query = String::from("tx_kind IN (");
1187 query.push_str(
1188 &other_kinds
1189 .iter()
1190 .map(ToString::to_string)
1191 .collect::<Vec<_>>()
1192 .join(", "),
1193 );
1194 query.push(')');
1195 query
1196 }
1197 };
1198
1199 ("tx_kinds".into(), query)
1200 }
1201 None => {
1202 ("transactions".into(), "1 = 1".into())
1204 }
1205 };
1206
1207 let query = format!(
1208 "SELECT {TX_SEQUENCE_NUMBER_STR} FROM {} WHERE {} {} ORDER BY {TX_SEQUENCE_NUMBER_STR} {} LIMIT {}",
1209 table_name, main_where_clause, cursor_clause, order_str, limit,
1210 );
1211
1212 tracing::debug!("query transaction blocks: {}", query);
1213 let pool = self.get_pool();
1214 let tx_sequence_numbers = run_query_async!(&pool, move |conn| {
1215 diesel::sql_query(query.clone()).load::<TxSequenceNumber>(conn)
1216 })?
1217 .into_iter()
1218 .map(|tsn| tsn.tx_sequence_number)
1219 .collect::<Vec<i64>>();
1220 self.multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
1221 tx_sequence_numbers,
1222 options,
1223 Some(is_descending),
1224 )
1225 .await
1226 }
1227
1228 async fn multi_get_transaction_block_response_in_blocking_task_impl(
1229 &self,
1230 digests: &[TransactionDigest],
1231 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1232 ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1233 let stored_txes = self
1234 .multi_get_transactions_in_blocking_task(digests.to_vec())
1235 .await?;
1236 self.stored_transaction_to_transaction_block(stored_txes, options)
1237 .await
1238 }
1239
1240 async fn multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
1241 &self,
1242 tx_sequence_numbers: Vec<i64>,
1243 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1244 is_descending: Option<bool>,
1246 ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1247 let stored_txes: Vec<StoredTransaction> = self
1248 .spawn_blocking(move |this| {
1249 this.multi_get_transactions_with_sequence_numbers(
1250 tx_sequence_numbers,
1251 is_descending,
1252 )
1253 })
1254 .await?;
1255 self.stored_transaction_to_transaction_block(stored_txes, options)
1256 .await
1257 }
1258
1259 pub async fn multi_get_transaction_block_response_in_blocking_task(
1260 &self,
1261 digests: Vec<TransactionDigest>,
1262 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1263 ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1264 self.multi_get_transaction_block_response_in_blocking_task_impl(&digests, options)
1265 .await
1266 }
1267
1268 pub async fn get_transaction_events_in_blocking_task(
1269 &self,
1270 digest: TransactionDigest,
1271 ) -> Result<Vec<iota_json_rpc_types::IotaEvent>, IndexerError> {
1272 let pool = self.get_pool();
1273 let (timestamp_ms, serialized_events) = run_query_async!(&pool, move |conn| {
1274 transactions::table
1275 .filter(
1276 transactions::tx_sequence_number
1277 .nullable()
1278 .eq(tx_digests::table
1279 .select(tx_digests::tx_sequence_number)
1280 .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
1283 .single_value()),
1284 )
1285 .select((transactions::timestamp_ms, transactions::events))
1286 .first::<(i64, StoredTransactionEvents)>(conn)
1287 })?;
1288
1289 let events = stored_events_to_events(serialized_events)?;
1290 let tx_events = TransactionEvents { data: events };
1291
1292 let iota_tx_events = tx_events_to_iota_tx_events(
1293 tx_events,
1294 self.package_resolver(),
1295 digest,
1296 timestamp_ms as u64,
1297 )
1298 .await?;
1299 Ok(iota_tx_events.map_or(vec![], |ste| ste.data))
1300 }
1301
1302 async fn query_events_by_tx_digest(
1303 &self,
1304 tx_digest: TransactionDigest,
1305 cursor: Option<EventID>,
1306 limit: usize,
1307 descending_order: bool,
1308 ) -> IndexerResult<Vec<IotaEvent>> {
1309 let mut query = events::table.into_boxed();
1310
1311 if let Some(cursor) = cursor {
1312 if cursor.tx_digest != tx_digest {
1313 return Err(IndexerError::InvalidArgument(
1314 "Cursor tx_digest does not match the tx_digest in the query.".into(),
1315 ));
1316 }
1317 if descending_order {
1318 query = query.filter(events::event_sequence_number.lt(cursor.event_seq as i64));
1319 } else {
1320 query = query.filter(events::event_sequence_number.gt(cursor.event_seq as i64));
1321 }
1322 } else if descending_order {
1323 query = query.filter(events::event_sequence_number.le(i64::MAX));
1324 } else {
1325 query = query.filter(events::event_sequence_number.ge(0));
1326 };
1327
1328 if descending_order {
1329 query = query.order(events::event_sequence_number.desc());
1330 } else {
1331 query = query.order(events::event_sequence_number.asc());
1332 }
1333
1334 query = query.filter(
1335 events::tx_sequence_number.nullable().eq(tx_digests::table
1336 .select(tx_digests::tx_sequence_number)
1337 .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1340 .single_value()),
1341 );
1342
1343 let pool = self.get_pool();
1344 let stored_events = run_query_async!(&pool, move |conn| {
1345 query.limit(limit as i64).load::<StoredEvent>(conn)
1346 })?;
1347
1348 let mut iota_event_futures = vec![];
1349 for stored_event in stored_events {
1350 iota_event_futures.push(tokio::task::spawn(
1351 stored_event.try_into_iota_event(self.package_resolver.clone()),
1352 ));
1353 }
1354
1355 let iota_events = futures::future::join_all(iota_event_futures)
1356 .await
1357 .into_iter()
1358 .collect::<Result<Vec<_>, _>>()
1359 .tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))?
1360 .into_iter()
1361 .collect::<Result<Vec<_>, _>>()
1362 .tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?;
1363 Ok(iota_events)
1364 }
1365
1366 pub async fn query_events_in_blocking_task(
1367 &self,
1368 filter: EventFilter,
1369 cursor: Option<EventID>,
1370 limit: usize,
1371 descending_order: bool,
1372 ) -> IndexerResult<Vec<IotaEvent>> {
1373 let pool = self.get_pool();
1374 let (tx_seq, event_seq) = if let Some(cursor) = cursor {
1375 let EventID {
1376 tx_digest,
1377 event_seq,
1378 } = cursor;
1379 let tx_seq = run_query_async!(&pool, move |conn| {
1380 transactions::dsl::transactions
1381 .select(transactions::tx_sequence_number)
1382 .filter(
1383 transactions::tx_sequence_number
1384 .nullable()
1385 .eq(tx_digests::table
1386 .select(tx_digests::tx_sequence_number)
1387 .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1390 .single_value()),
1391 )
1392 .first::<i64>(conn)
1393 })?;
1394 (tx_seq, event_seq as i64)
1395 } else if descending_order {
1396 let max_tx_seq = i64::MAX;
1397 let max_event_seq = i64::MAX;
1398 (max_tx_seq, max_event_seq)
1399 } else {
1400 (-1, 0)
1401 };
1402
1403 let query = if let EventFilter::Sender(sender) = &filter {
1404 let cursor_clause = if descending_order {
1406 format!(
1407 "(e.{TX_SEQUENCE_NUMBER_STR} < {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} < {}))",
1408 tx_seq, tx_seq, event_seq
1409 )
1410 } else {
1411 format!(
1412 "(e.{TX_SEQUENCE_NUMBER_STR} > {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} > {}))",
1413 tx_seq, tx_seq, event_seq
1414 )
1415 };
1416 let order_clause = if descending_order {
1417 format!("e.{TX_SEQUENCE_NUMBER_STR} DESC, e.{EVENT_SEQUENCE_NUMBER_STR} DESC")
1418 } else {
1419 format!("e.{TX_SEQUENCE_NUMBER_STR} ASC, e.{EVENT_SEQUENCE_NUMBER_STR} ASC")
1420 };
1421 format!(
1422 "( \
1423 SELECT *
1424 FROM tx_senders s
1425 JOIN events e
1426 ON e.tx_sequence_number = s.tx_sequence_number
1427 AND s.sender = '\\x{}'::bytea
1428 WHERE {} \
1429 ORDER BY {} \
1430 LIMIT {}
1431 )",
1432 Hex::encode(sender.to_vec()),
1433 cursor_clause,
1434 order_clause,
1435 limit,
1436 )
1437 } else if let EventFilter::Transaction(tx_digest) = filter {
1438 return self
1439 .query_events_by_tx_digest(tx_digest, cursor, limit, descending_order)
1440 .await;
1441 } else {
1442 let main_where_clause = match filter {
1443 EventFilter::Package(package_id) => {
1444 format!("package = '\\x{}'::bytea", package_id.to_hex())
1445 }
1446 EventFilter::MoveModule { package, module } => {
1447 format!(
1448 "package = '\\x{}'::bytea AND module = '{}'",
1449 package.to_hex(),
1450 module,
1451 )
1452 }
1453 EventFilter::MoveEventType(struct_tag) => {
1454 let formatted_struct_tag = struct_tag.to_canonical_string(true);
1455 format!("event_type = '{formatted_struct_tag}'")
1456 }
1457 EventFilter::MoveEventModule { package, module } => {
1458 let package_module_prefix = format!("{}::{}", package.to_hex_literal(), module);
1459 format!("event_type LIKE '{package_module_prefix}::%'")
1460 }
1461 EventFilter::Sender(_) => {
1462 unreachable!()
1464 }
1465 EventFilter::Transaction(_) => {
1466 unreachable!()
1468 }
1469 EventFilter::MoveEventField { .. }
1470 | EventFilter::All(_)
1471 | EventFilter::Any(_)
1472 | EventFilter::And(_, _)
1473 | EventFilter::Or(_, _)
1474 | EventFilter::TimeRange { .. } => {
1475 return Err(IndexerError::NotSupported(
1476 "This type of EventFilter is not supported.".into(),
1477 ));
1478 }
1479 };
1480
1481 let cursor_clause = if descending_order {
1482 format!(
1483 "AND ({TX_SEQUENCE_NUMBER_STR} < {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} < {}))",
1484 tx_seq, tx_seq, event_seq
1485 )
1486 } else {
1487 format!(
1488 "AND ({TX_SEQUENCE_NUMBER_STR} > {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} > {}))",
1489 tx_seq, tx_seq, event_seq
1490 )
1491 };
1492 let order_clause = if descending_order {
1493 format!("{TX_SEQUENCE_NUMBER_STR} DESC, {EVENT_SEQUENCE_NUMBER_STR} DESC")
1494 } else {
1495 format!("{TX_SEQUENCE_NUMBER_STR} ASC, {EVENT_SEQUENCE_NUMBER_STR} ASC")
1496 };
1497
1498 format!(
1499 "
1500 SELECT * FROM events \
1501 WHERE {} {} \
1502 ORDER BY {} \
1503 LIMIT {}
1504 ",
1505 main_where_clause, cursor_clause, order_clause, limit,
1506 )
1507 };
1508 tracing::debug!("query events: {}", query);
1509 let pool = self.get_pool();
1510 let stored_events = run_query_async!(&pool, move |conn| diesel::sql_query(query)
1511 .load::<StoredEvent>(conn))?;
1512
1513 let mut iota_event_futures = vec![];
1514 for stored_event in stored_events {
1515 iota_event_futures.push(tokio::task::spawn(
1516 stored_event.try_into_iota_event(self.package_resolver.clone()),
1517 ));
1518 }
1519
1520 let iota_events = futures::future::join_all(iota_event_futures)
1521 .await
1522 .into_iter()
1523 .collect::<Result<Vec<_>, _>>()
1524 .tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))?
1525 .into_iter()
1526 .collect::<Result<Vec<_>, _>>()
1527 .tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?;
1528 Ok(iota_events)
1529 }
1530
1531 pub async fn get_dynamic_fields_in_blocking_task(
1532 &self,
1533 parent_object_id: ObjectID,
1534 cursor: Option<ObjectID>,
1535 limit: usize,
1536 ) -> Result<Vec<DynamicFieldInfo>, IndexerError> {
1537 let stored_objects = self
1538 .spawn_blocking(move |this| {
1539 this.get_dynamic_fields_raw(parent_object_id, cursor, limit)
1540 })
1541 .await?;
1542
1543 let mut df_futures = vec![];
1544 let indexer_reader_arc = Arc::new(self.clone());
1545 for stored_object in stored_objects {
1546 let indexer_reader_arc_clone = Arc::clone(&indexer_reader_arc);
1547 df_futures.push(tokio::task::spawn(async move {
1548 indexer_reader_arc_clone
1549 .try_create_dynamic_field_info(stored_object)
1550 .await
1551 }));
1552 }
1553 let df_infos = futures::future::try_join_all(df_futures)
1554 .await
1555 .tap_err(|e| tracing::error!("Error joining DF futures: {:?}", e))?
1556 .into_iter()
1557 .collect::<Result<Vec<_>, _>>()
1558 .tap_err(|e| {
1559 tracing::error!(
1560 "Error calling DF try_create_dynamic_field_info function: {:?}",
1561 e
1562 )
1563 })?
1564 .into_iter()
1565 .flatten()
1566 .collect::<Vec<_>>();
1567 Ok(df_infos)
1568 }
1569
1570 pub async fn get_dynamic_fields_raw_in_blocking_task(
1571 &self,
1572 parent_object_id: ObjectID,
1573 cursor: Option<ObjectID>,
1574 limit: usize,
1575 ) -> Result<Vec<StoredObject>, IndexerError> {
1576 self.spawn_blocking(move |this| {
1577 this.get_dynamic_fields_raw(parent_object_id, cursor, limit)
1578 })
1579 .await
1580 }
1581
1582 fn get_dynamic_fields_raw(
1583 &self,
1584 parent_object_id: ObjectID,
1585 cursor: Option<ObjectID>,
1586 limit: usize,
1587 ) -> Result<Vec<StoredObject>, IndexerError> {
1588 let objects: Vec<StoredObject> = run_query!(&self.pool, |conn| {
1589 let mut query = objects::dsl::objects
1590 .filter(objects::dsl::owner_type.eq(OwnerType::Object as i16))
1591 .filter(objects::dsl::owner_id.eq(parent_object_id.to_vec()))
1592 .order(objects::dsl::object_id.asc())
1593 .limit(limit as i64)
1594 .into_boxed();
1595 if let Some(object_cursor) = cursor {
1596 query = query.filter(objects::dsl::object_id.gt(object_cursor.to_vec()));
1597 }
1598 query.load::<StoredObject>(conn)
1599 })?;
1600
1601 Ok(objects)
1602 }
1603
1604 async fn try_create_dynamic_field_info(
1605 &self,
1606 stored_object: StoredObject,
1607 ) -> Result<Option<DynamicFieldInfo>, IndexerError> {
1608 if stored_object.df_kind.is_none() {
1609 return Ok(None);
1610 }
1611
1612 let object: Object = stored_object.try_into()?;
1613 let Some(move_object) = object.data.try_as_move().cloned() else {
1614 return Err(IndexerError::ResolveMoveStruct(
1615 "Object is not a MoveObject".to_string(),
1616 ));
1617 };
1618 let type_tag: TypeTag = move_object.type_().clone().into();
1619 let layout = self
1620 .package_resolver
1621 .type_layout(type_tag.clone())
1622 .await
1623 .map_err(|e| {
1624 IndexerError::ResolveMoveStruct(format!(
1625 "Failed to get type layout for type {}: {e}",
1626 type_tag.to_canonical_display(true),
1627 ))
1628 })?;
1629
1630 let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
1631 .tap_err(|e| tracing::warn!("{e}"))?;
1632
1633 let type_ = field.kind;
1634 let name_type: TypeTag = field.name_layout.into();
1635 let bcs_name = field.name_bytes.to_owned();
1636
1637 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
1638 .tap_err(|e| tracing::warn!("{e}"))?;
1639
1640 let name = DynamicFieldName {
1641 type_: name_type,
1642 value: IotaMoveValue::from(name_value).to_json_value(),
1643 };
1644
1645 let value_metadata = field.value_metadata().map_err(|e| {
1646 tracing::warn!("{e}");
1647 IndexerError::Uncategorized(anyhow!(e))
1648 })?;
1649
1650 Ok(Some(match value_metadata {
1651 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
1652 name,
1653 bcs_name,
1654 type_,
1655 object_type: object_type.to_canonical_string(true),
1656 object_id: object.id(),
1657 version: object.version(),
1658 digest: object.digest(),
1659 },
1660
1661 DFV::ValueMetadata::DynamicObjectField(object_id) => {
1662 let object = self
1663 .get_object_in_blocking_task(object_id)
1664 .await?
1665 .ok_or_else(|| {
1666 IndexerError::Uncategorized(anyhow!(
1667 "Failed to find object_id {} when trying to create dynamic field info",
1668 object_id.to_canonical_display(true),
1669 ))
1670 })?;
1671
1672 let object_type = object.data.type_().unwrap().clone();
1673 DynamicFieldInfo {
1674 name,
1675 bcs_name,
1676 type_,
1677 object_type: object_type.to_canonical_string(true),
1678 object_id,
1679 version: object.version(),
1680 digest: object.digest(),
1681 }
1682 }
1683 }))
1684 }
1685
1686 pub async fn bcs_name_from_dynamic_field_name(
1687 &self,
1688 name: &DynamicFieldName,
1689 ) -> Result<Vec<u8>, IndexerError> {
1690 let move_type_layout = self
1691 .package_resolver()
1692 .type_layout(name.type_.clone())
1693 .await
1694 .map_err(|e| {
1695 IndexerError::ResolveMoveStruct(format!(
1696 "Failed to get type layout for type {}: {}",
1697 name.type_, e
1698 ))
1699 })?;
1700 let iota_json_value = iota_json::IotaJsonValue::new(name.value.clone())?;
1701 let name_bcs_value = iota_json_value.to_bcs_bytes(&move_type_layout)?;
1702 Ok(name_bcs_value)
1703 }
1704
1705 pub async fn get_display_object_by_type(
1706 &self,
1707 object_type: &move_core_types::language_storage::StructTag,
1708 ) -> Result<Option<iota_types::display::DisplayVersionUpdatedEvent>, IndexerError> {
1709 let object_type = object_type.to_canonical_string(true);
1710 self.spawn_blocking(move |this| this.get_display_update_event(object_type))
1711 .await
1712 }
1713
1714 fn get_display_update_event(
1715 &self,
1716 object_type: String,
1717 ) -> Result<Option<iota_types::display::DisplayVersionUpdatedEvent>, IndexerError> {
1718 let stored_display = run_query!(&self.pool, |conn| {
1719 display::table
1720 .filter(display::object_type.eq(object_type))
1721 .first::<StoredDisplay>(conn)
1722 .optional()
1723 })?;
1724
1725 let stored_display = match stored_display {
1726 Some(display) => display,
1727 None => return Ok(None),
1728 };
1729
1730 let display_update = stored_display.to_display_update_event()?;
1731
1732 Ok(Some(display_update))
1733 }
1734
1735 pub async fn get_owned_coins_in_blocking_task(
1736 &self,
1737 owner: IotaAddress,
1738 coin_type: Option<String>,
1739 cursor: ObjectID,
1740 limit: usize,
1741 ) -> Result<Vec<IotaCoin>, IndexerError> {
1742 self.spawn_blocking(move |this| this.get_owned_coins(owner, coin_type, cursor, limit))
1743 .await
1744 }
1745
1746 fn get_owned_coins(
1747 &self,
1748 owner: IotaAddress,
1749 coin_type: Option<String>,
1751 cursor: ObjectID,
1752 limit: usize,
1753 ) -> Result<Vec<IotaCoin>, IndexerError> {
1754 let mut query = objects::dsl::objects
1755 .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
1756 .filter(objects::dsl::owner_id.eq(owner.to_vec()))
1757 .filter(objects::dsl::object_id.gt(cursor.to_vec()))
1758 .into_boxed();
1759 if let Some(coin_type) = coin_type {
1760 query = query.filter(objects::dsl::coin_type.eq(Some(coin_type)));
1761 } else {
1762 query = query.filter(objects::dsl::coin_type.is_not_null());
1763 }
1764 query = query
1765 .order(objects::dsl::object_id.asc())
1766 .limit(limit as i64);
1767
1768 let stored_objects = run_query!(&self.pool, |conn| query.load::<StoredObject>(conn))?;
1769
1770 stored_objects
1771 .into_iter()
1772 .map(|o| o.try_into())
1773 .collect::<IndexerResult<Vec<_>>>()
1774 }
1775
1776 pub async fn get_coin_balances_in_blocking_task(
1777 &self,
1778 owner: IotaAddress,
1779 coin_type: Option<String>,
1781 ) -> Result<Vec<Balance>, IndexerError> {
1782 self.spawn_blocking(move |this| this.get_coin_balances(owner, coin_type))
1783 .await
1784 }
1785
1786 fn get_coin_balances(
1787 &self,
1788 owner: IotaAddress,
1789 coin_type: Option<String>,
1791 ) -> Result<Vec<Balance>, IndexerError> {
1792 let coin_type_filter = if let Some(coin_type) = coin_type {
1793 format!("= '{}'", coin_type)
1794 } else {
1795 "IS NOT NULL".to_string()
1796 };
1797 let query = format!(
1799 "
1800 SELECT coin_type, \
1801 CAST(COUNT(*) AS BIGINT) AS coin_num, \
1802 CAST(SUM(coin_balance) AS BIGINT) AS coin_balance \
1803 FROM objects \
1804 WHERE owner_type = {} \
1805 AND owner_id = '\\x{}'::BYTEA \
1806 AND coin_type {} \
1807 GROUP BY coin_type \
1808 ORDER BY coin_type ASC
1809 ",
1810 OwnerType::Address as i16,
1811 Hex::encode(owner.to_vec()),
1812 coin_type_filter,
1813 );
1814
1815 tracing::debug!("get coin balances query: {query}");
1816 let coin_balances = run_query!(&self.pool, |conn| diesel::sql_query(query)
1817 .load::<CoinBalance>(conn))?;
1818 coin_balances
1819 .into_iter()
1820 .map(|cb| cb.try_into())
1821 .collect::<IndexerResult<Vec<_>>>()
1822 }
1823
1824 pub fn get_latest_network_metrics(&self) -> IndexerResult<NetworkMetrics> {
1825 let mut metrics = run_query!(&self.pool, |conn| {
1826 diesel::sql_query("SELECT * FROM network_metrics;")
1827 .get_result::<StoredNetworkMetrics>(conn)
1828 })?;
1829 if metrics.total_addresses == -1 {
1830 metrics.total_addresses = run_query!(&self.pool, |conn| {
1833 addresses::dsl::addresses.count().get_result::<i64>(conn)
1834 })?;
1835 }
1836 if metrics.total_packages == -1 {
1837 metrics.total_packages = run_query!(&self.pool, |conn| {
1840 packages::dsl::packages.count().get_result::<i64>(conn)
1841 })?;
1842 }
1843 Ok(metrics.into())
1844 }
1845
1846 pub fn get_latest_move_call_metrics(&self) -> IndexerResult<MoveCallMetrics> {
1848 let latest_3_days = self.get_latest_move_call_metrics_by_day(3)?;
1849 let latest_7_days = self.get_latest_move_call_metrics_by_day(7)?;
1850 let latest_30_days = self.get_latest_move_call_metrics_by_day(30)?;
1851
1852 let rank_3_days = latest_3_days
1854 .into_iter()
1855 .sorted_by(|a, b| b.1.cmp(&a.1))
1856 .collect::<Vec<_>>();
1857 let rank_7_days = latest_7_days
1858 .into_iter()
1859 .sorted_by(|a, b| b.1.cmp(&a.1))
1860 .collect::<Vec<_>>();
1861 let rank_30_days = latest_30_days
1862 .into_iter()
1863 .sorted_by(|a, b| b.1.cmp(&a.1))
1864 .collect::<Vec<_>>();
1865
1866 Ok(MoveCallMetrics {
1867 rank_3_days,
1868 rank_7_days,
1869 rank_30_days,
1870 })
1871 }
1872
1873 pub fn get_latest_move_call_metrics_by_day(
1875 &self,
1876 day_value: i64,
1877 ) -> IndexerResult<Vec<(MoveFunctionName, usize)>> {
1878 let query = "
1879 SELECT id, epoch, day, move_package, move_module, move_function, count
1880 FROM move_call_metrics
1881 WHERE day = $1
1882 AND epoch = (SELECT MAX(epoch) FROM move_call_metrics WHERE day = $1)
1883 ORDER BY count DESC
1884 LIMIT 10
1885 ";
1886
1887 let queried_metrics = run_query!(&self.pool, |conn| sql_query(query)
1888 .bind::<BigInt, _>(day_value)
1889 .load::<QueriedMoveCallMetrics>(conn))?;
1890
1891 let metrics = queried_metrics
1892 .into_iter()
1893 .map(|m| {
1894 m.try_into()
1895 .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))
1896 })
1897 .collect::<Result<Vec<_>, _>>()?;
1898
1899 Ok(metrics)
1900 }
1901
1902 pub fn get_latest_address_metrics(&self) -> IndexerResult<AddressMetrics> {
1903 let stored_address_metrics = run_query!(&self.pool, |conn| {
1904 address_metrics::table
1905 .order(address_metrics::dsl::checkpoint.desc())
1906 .first::<StoredAddressMetrics>(conn)
1907 })?;
1908 Ok(stored_address_metrics.into())
1909 }
1910
1911 pub fn get_checkpoint_address_metrics(
1912 &self,
1913 checkpoint_seq: u64,
1914 ) -> IndexerResult<AddressMetrics> {
1915 let stored_address_metrics = run_query!(&self.pool, |conn| {
1916 address_metrics::table
1917 .filter(address_metrics::dsl::checkpoint.eq(checkpoint_seq as i64))
1918 .first::<StoredAddressMetrics>(conn)
1919 })?;
1920 Ok(stored_address_metrics.into())
1921 }
1922
1923 pub fn get_all_epoch_address_metrics(
1924 &self,
1925 descending_order: Option<bool>,
1926 ) -> IndexerResult<Vec<AddressMetrics>> {
1927 let is_descending = descending_order.unwrap_or_default();
1928 let epoch_address_metrics_query = format!(
1929 "WITH ranked_rows AS (
1930 SELECT
1931 checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses,
1932 row_number() OVER(PARTITION BY epoch ORDER BY checkpoint DESC) as row_num
1933 FROM
1934 address_metrics
1935 )
1936 SELECT
1937 checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses
1938 FROM ranked_rows
1939 WHERE row_num = 1 ORDER BY epoch {}",
1940 if is_descending { "DESC" } else { "ASC" },
1941 );
1942 let epoch_address_metrics = run_query!(&self.pool, |conn| {
1943 diesel::sql_query(epoch_address_metrics_query).load::<StoredAddressMetrics>(conn)
1944 })?;
1945
1946 Ok(epoch_address_metrics
1947 .into_iter()
1948 .map(|stored_address_metrics| stored_address_metrics.into())
1949 .collect())
1950 }
1951
1952 pub(crate) async fn get_display_fields(
1953 &self,
1954 original_object: &iota_types::object::Object,
1955 original_layout: &Option<MoveStructLayout>,
1956 ) -> Result<DisplayFieldsResponse, IndexerError> {
1957 let (object_type, layout) = if let Some((object_type, layout)) =
1958 iota_json_rpc::read_api::get_object_type_and_struct(original_object, original_layout)
1959 .map_err(|e| IndexerError::Generic(e.to_string()))?
1960 {
1961 (object_type, layout)
1962 } else {
1963 return Ok(DisplayFieldsResponse {
1964 data: None,
1965 error: None,
1966 });
1967 };
1968
1969 if let Some(display_object) = self.get_display_object_by_type(&object_type).await? {
1970 return iota_json_rpc::read_api::get_rendered_fields(display_object.fields, &layout)
1971 .map_err(|e| IndexerError::Generic(e.to_string()));
1972 }
1973 Ok(DisplayFieldsResponse {
1974 data: None,
1975 error: None,
1976 })
1977 }
1978
1979 pub async fn get_coin_metadata_in_blocking_task(
1980 &self,
1981 coin_struct: StructTag,
1982 ) -> Result<Option<IotaCoinMetadata>, IndexerError> {
1983 self.spawn_blocking(move |this| this.get_coin_metadata(coin_struct))
1984 .await
1985 }
1986
1987 fn get_coin_metadata(
1988 &self,
1989 coin_struct: StructTag,
1990 ) -> Result<Option<IotaCoinMetadata>, IndexerError> {
1991 let coin_metadata_type = CoinMetadata::type_(coin_struct.clone());
1992 let metadata_object = self
1993 .get_singleton_object(coin_metadata_type)?
1994 .and_then(|o| IotaCoinMetadata::try_from(o).ok());
1995
1996 if let Some(metadata_object) = metadata_object {
1997 Ok(Some(metadata_object))
1998 } else {
1999 let coin_manager_obj = self.get_coin_manager_obj(coin_struct)?;
2000 Ok(
2001 coin_manager_obj.and_then(|m| match (m.metadata, m.immutable_metadata) {
2002 (Some(metadata), _) => Some(metadata.into()),
2003 (_, Some(immutable_metadata)) => Some(IotaCoinMetadata {
2004 decimals: immutable_metadata.decimals,
2005 name: immutable_metadata.name,
2006 symbol: immutable_metadata.symbol,
2007 description: immutable_metadata.description,
2008 icon_url: immutable_metadata.icon_url,
2009 id: None,
2010 }),
2011 (None, None) => None,
2012 }),
2013 )
2014 }
2015 }
2016
2017 fn get_coin_manager_obj(
2018 &self,
2019 coin_type: StructTag,
2020 ) -> Result<Option<CoinManager>, IndexerError> {
2021 let coin_manager_type = CoinManager::type_(coin_type);
2022 let coin_manager_object = self
2023 .get_singleton_object(coin_manager_type)?
2024 .and_then(|o| CoinManager::try_from(o).ok());
2025 Ok(coin_manager_object)
2026 }
2027
2028 pub async fn get_total_supply_in_blocking_task(
2029 &self,
2030 coin_struct: StructTag,
2031 ) -> Result<Supply, IndexerError> {
2032 self.spawn_blocking(move |this| this.get_total_supply(coin_struct))
2033 .await
2034 }
2035
2036 fn get_total_supply(&self, coin_struct: StructTag) -> Result<Supply, IndexerError> {
2037 let package_id = coin_struct.address.into();
2038 let treasury_cap_type =
2039 TreasuryCap::type_(coin_struct).to_canonical_string(true);
2040 let treasury_cap_obj_id = self
2041 .package_obj_type_cache
2042 .lock()
2043 .unwrap()
2044 .cache_get_or_set_with(format!("{}{}", package_id, treasury_cap_type), || {
2045 get_single_obj_id_from_package_publish(self, package_id, treasury_cap_type.clone())
2046 .unwrap()
2047 })
2048 .ok_or(IndexerError::Generic(format!(
2049 "Cannot find treasury cap for type {}",
2050 treasury_cap_type
2051 )))?;
2052 let treasury_cap_obj_object =
2053 self.get_object(&treasury_cap_obj_id, None)?
2054 .ok_or(IndexerError::Generic(format!(
2055 "Cannot find treasury cap object with id {}",
2056 treasury_cap_obj_id
2057 )))?;
2058 Ok(TreasuryCap::try_from(treasury_cap_obj_object)?.total_supply)
2059 }
2060
2061 pub fn get_consistent_read_range(&self) -> Result<(i64, i64), IndexerError> {
2062 let latest_checkpoint_sequence = run_query!(&self.pool, |conn| {
2063 checkpoints::table
2064 .select(checkpoints::sequence_number)
2065 .order(checkpoints::sequence_number.desc())
2066 .first::<i64>(conn)
2067 .optional()
2068 })?
2069 .unwrap_or_default();
2070 let latest_object_snapshot_checkpoint_sequence = run_query!(&self.pool, |conn| {
2071 objects_snapshot::table
2072 .select(objects_snapshot::checkpoint_sequence_number)
2073 .order(objects_snapshot::checkpoint_sequence_number.desc())
2074 .first::<i64>(conn)
2075 .optional()
2076 })?
2077 .unwrap_or_default();
2078 Ok((
2079 latest_object_snapshot_checkpoint_sequence,
2080 latest_checkpoint_sequence,
2081 ))
2082 }
2083
2084 pub fn package_resolver(&self) -> PackageResolver {
2085 self.package_resolver.clone()
2086 }
2087
2088 pub async fn pending_active_validators(
2089 &self,
2090 ) -> Result<Vec<IotaValidatorSummary>, IndexerError> {
2091 self.spawn_blocking(move |this| {
2092 iota_types::iota_system_state::get_iota_system_state(&this)
2093 .and_then(|system_state| system_state.get_pending_active_validators(&this))
2094 })
2095 .await
2096 .map_err(Into::into)
2097 }
2098
2099 pub fn get_participation_metrics(&self) -> IndexerResult<ParticipationMetrics> {
2103 run_query!(&self.pool, |conn| {
2104 diesel::sql_query("SELECT * FROM participation_metrics")
2105 .get_result::<StoredParticipationMetrics>(conn)
2106 })
2107 .map(Into::into)
2108 }
2109}
2110
2111impl iota_types::storage::ObjectStore for IndexerReader {
2112 fn get_object(
2113 &self,
2114 object_id: &ObjectID,
2115 ) -> Result<Option<iota_types::object::Object>, iota_types::storage::error::Error> {
2116 self.get_object(object_id, None)
2117 .map_err(iota_types::storage::error::Error::custom)
2118 }
2119
2120 fn get_object_by_key(
2121 &self,
2122 object_id: &ObjectID,
2123 version: iota_types::base_types::VersionNumber,
2124 ) -> Result<Option<iota_types::object::Object>, iota_types::storage::error::Error> {
2125 self.get_object(object_id, Some(version))
2126 .map_err(iota_types::storage::error::Error::custom)
2127 }
2128}
2129
2130fn get_single_obj_id_from_package_publish(
2131 reader: &IndexerReader,
2132 package_id: ObjectID,
2133 obj_type: String,
2134) -> Result<Option<ObjectID>, IndexerError> {
2135 let publish_txn_effects_opt = if is_system_package(package_id) {
2136 Some(reader.get_transaction_effects_with_sequence_number(0))
2137 } else {
2138 reader.get_object(&package_id, None)?.map(|o| {
2139 let publish_txn_digest = o.previous_transaction;
2140 reader.get_transaction_effects_with_digest(publish_txn_digest)
2141 })
2142 };
2143 if let Some(publish_txn_effects) = publish_txn_effects_opt {
2144 let created_objs = publish_txn_effects?
2145 .created()
2146 .iter()
2147 .map(|o| o.object_id())
2148 .collect::<Vec<_>>();
2149 let obj_ids_with_type =
2150 reader.filter_object_id_with_type(created_objs, obj_type.clone())?;
2151 if obj_ids_with_type.len() == 1 {
2152 Ok(Some(obj_ids_with_type[0]))
2153 } else if obj_ids_with_type.is_empty() {
2154 Ok(None)
2157 } else {
2158 tracing::error!(
2161 "There are more than one objects found for type {}",
2162 obj_type
2163 );
2164 Ok(None)
2165 }
2166 } else {
2167 Ok(None)
2169 }
2170}