1use std::{
6 collections::HashSet,
7 sync::{Arc, Mutex},
8};
9
10use anyhow::{Result, anyhow};
11use cached::{Cached, SizedCache};
12use diesel::{
13 ExpressionMethods, JoinOnDsl, NullableExpressionMethods, OptionalExtension, PgConnection,
14 QueryDsl, RunQueryDsl, SelectableHelper, TextExpressionMethods,
15 dsl::sql,
16 r2d2::ConnectionManager,
17 sql_query,
18 sql_types::{BigInt, Bool},
19};
20use fastcrypto::encoding::{Encoding, Hex};
21use iota_json_rpc_types::{
22 AddressMetrics, Balance, CheckpointId, Coin as IotaCoin, DisplayFieldsResponse, EpochInfo,
23 EventFilter, IotaCoinMetadata, IotaEvent, IotaMoveValue, IotaObjectDataFilter,
24 IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
25 IotaTransactionKind, MoveCallMetrics, MoveFunctionName, NetworkMetrics, ParticipationMetrics,
26 TransactionFilter,
27};
28use iota_package_resolver::{Package, PackageStore, PackageStoreWithLruCache, Resolver};
29use iota_types::{
30 TypeTag,
31 balance::Supply,
32 base_types::{IotaAddress, ObjectID, SequenceNumber, VersionNumber},
33 coin::{CoinMetadata, TreasuryCap},
34 committee::EpochId,
35 digests::{ChainIdentifier, TransactionDigest},
36 dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
37 effects::TransactionEvents,
38 event::EventID,
39 iota_system_state::{
40 IotaSystemStateTrait,
41 iota_system_state_summary::{IotaSystemStateSummary, IotaValidatorSummary},
42 },
43 is_system_package,
44 messages_checkpoint::CheckpointDigest,
45 object::{Object, ObjectRead, PastObjectRead, bounded_visitor::BoundedVisitor},
46};
47use itertools::Itertools;
48use move_core_types::{annotated_value::MoveStructLayout, language_storage::StructTag};
49use tap::TapFallible;
50
51use crate::{
52 db::{ConnectionConfig, ConnectionPool, ConnectionPoolConfig},
53 errors::IndexerError,
54 models::{
55 address_metrics::StoredAddressMetrics,
56 checkpoints::{StoredChainIdentifier, StoredCheckpoint},
57 display::StoredDisplay,
58 epoch::StoredEpochInfo,
59 events::StoredEvent,
60 move_call_metrics::QueriedMoveCallMetrics,
61 network_metrics::StoredNetworkMetrics,
62 obj_indices::StoredObjectVersion,
63 objects::{CoinBalance, StoredHistoryObject, StoredObject},
64 participation_metrics::StoredParticipationMetrics,
65 transactions::{
66 OptimisticTransaction, StoredTransaction, StoredTransactionEvents,
67 stored_events_to_events, tx_events_to_iota_tx_events,
68 },
69 tx_indices::TxSequenceNumber,
70 },
71 schema::{
72 address_metrics, addresses, chain_identifier, checkpoints, display, epochs, events,
73 objects, objects_history, objects_snapshot, objects_version, optimistic_transactions,
74 packages, pruner_cp_watermark, transactions, tx_digests, tx_insertion_order,
75 },
76 store::{diesel_macro::*, package_resolver::IndexerStorePackageResolver},
77 types::{IndexerResult, OwnerType},
78};
79
80pub const TX_SEQUENCE_NUMBER_STR: &str = "tx_sequence_number";
81pub const TRANSACTION_DIGEST_STR: &str = "transaction_digest";
82pub const EVENT_SEQUENCE_NUMBER_STR: &str = "event_sequence_number";
83
84pub struct IndexerReader {
85 pool: ConnectionPool,
86 package_resolver: PackageResolver,
87 package_obj_type_cache: Arc<Mutex<SizedCache<String, Option<ObjectID>>>>,
88}
89
90impl Clone for IndexerReader {
91 fn clone(&self) -> IndexerReader {
92 IndexerReader {
93 pool: self.pool.clone(),
94 package_resolver: self.package_resolver.clone(),
95 package_obj_type_cache: self.package_obj_type_cache.clone(),
96 }
97 }
98}
99
100pub type PackageResolver = Arc<Resolver<PackageStoreWithLruCache<IndexerStorePackageResolver>>>;
101
102impl IndexerReader {
104 pub fn new<T: Into<String>>(db_url: T) -> Result<Self> {
105 let config = ConnectionPoolConfig::default();
106 Self::new_with_config(db_url, config)
107 }
108
109 pub fn new_with_config<T: Into<String>>(
110 db_url: T,
111 config: ConnectionPoolConfig,
112 ) -> Result<Self> {
113 let manager = ConnectionManager::<PgConnection>::new(db_url);
114
115 let connection_config = ConnectionConfig {
116 statement_timeout: config.statement_timeout,
117 read_only: true,
118 };
119
120 let pool = diesel::r2d2::Pool::builder()
121 .max_size(config.pool_size)
122 .connection_timeout(config.connection_timeout)
123 .connection_customizer(Box::new(connection_config))
124 .build(manager)
125 .map_err(|e| anyhow!("Failed to initialize connection pool. Error: {:?}. If Error is None, please check whether the configured pool size (currently {}) exceeds the maximum number of connections allowed by the database.", e, config.pool_size))?;
126
127 let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
128 let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
129 let package_resolver = Arc::new(Resolver::new(package_cache));
130 let package_obj_type_cache = Arc::new(Mutex::new(SizedCache::with_size(10000)));
131 Ok(Self {
132 pool,
133 package_resolver,
134 package_obj_type_cache,
135 })
136 }
137
138 pub async fn spawn_blocking<F, R, E>(&self, f: F) -> Result<R, E>
139 where
140 F: FnOnce(Self) -> Result<R, E> + Send + 'static,
141 R: Send + 'static,
142 E: Send + 'static,
143 {
144 let this = self.clone();
145 let current_span = tracing::Span::current();
146 tokio::task::spawn_blocking(move || {
147 CALLED_FROM_BLOCKING_POOL
148 .with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
149 let _guard = current_span.enter();
150 f(this)
151 })
152 .await
153 .expect("propagate any panics")
154 }
155
156 pub fn get_pool(&self) -> ConnectionPool {
157 self.pool.clone()
158 }
159}
160
161impl IndexerReader {
163 fn get_object_from_db(
164 &self,
165 object_id: &ObjectID,
166 version: Option<VersionNumber>,
167 ) -> Result<Option<StoredObject>, IndexerError> {
168 let object_id = object_id.to_vec();
169
170 let stored_object = run_query!(&self.pool, |conn| {
171 if let Some(version) = version {
172 objects::dsl::objects
173 .filter(objects::dsl::object_id.eq(object_id))
174 .filter(objects::dsl::object_version.eq(version.value() as i64))
175 .first::<StoredObject>(conn)
176 .optional()
177 } else {
178 objects::dsl::objects
179 .filter(objects::dsl::object_id.eq(object_id))
180 .first::<StoredObject>(conn)
181 .optional()
182 }
183 })?;
184 Ok(stored_object)
185 }
186
187 fn get_object(
188 &self,
189 object_id: &ObjectID,
190 version: Option<VersionNumber>,
191 ) -> Result<Option<Object>, IndexerError> {
192 let Some(stored_package) = self.get_object_from_db(object_id, version)? else {
193 return Ok(None);
194 };
195
196 let object = stored_package.try_into()?;
197 Ok(Some(object))
198 }
199
200 pub async fn get_object_in_blocking_task(
201 &self,
202 object_id: ObjectID,
203 ) -> Result<Option<Object>, IndexerError> {
204 self.spawn_blocking(move |this| this.get_object(&object_id, None))
205 .await
206 }
207
208 pub async fn get_object_read_in_blocking_task(
209 &self,
210 object_id: ObjectID,
211 ) -> Result<ObjectRead, IndexerError> {
212 let stored_object = self
213 .spawn_blocking(move |this| this.get_object_raw(object_id))
214 .await?;
215
216 if let Some(object) = stored_object {
217 object
218 .try_into_object_read(self.package_resolver.clone())
219 .await
220 } else {
221 Ok(ObjectRead::NotExists(object_id))
222 }
223 }
224
225 fn get_object_raw(&self, object_id: ObjectID) -> Result<Option<StoredObject>, IndexerError> {
226 let id = object_id.to_vec();
227 let stored_object = run_query!(&self.pool, |conn| {
228 objects::dsl::objects
229 .filter(objects::dsl::object_id.eq(id))
230 .first::<StoredObject>(conn)
231 .optional()
232 })?;
233 Ok(stored_object)
234 }
235
236 pub(crate) async fn get_past_object_read(
245 &self,
246 object_id: ObjectID,
247 object_version: SequenceNumber,
248 before_version: bool,
249 ) -> Result<PastObjectRead, IndexerError> {
250 let object_version_num = object_version.value() as i64;
251
252 let pool = self.get_pool();
255 let object_id_bytes = object_id.to_vec();
256 let object_version_info: Option<StoredObjectVersion> =
257 run_query_async!(&pool, move |conn| {
258 let mut query = objects_version::dsl::objects_version
259 .filter(objects_version::object_id.eq(&object_id_bytes))
260 .into_boxed();
261
262 if before_version {
263 query = query.filter(objects_version::object_version.lt(object_version_num));
264 } else {
265 query = query.filter(objects_version::object_version.eq(object_version_num));
266 }
267
268 query
269 .order_by(objects_version::object_version.desc())
270 .limit(1)
271 .first::<StoredObjectVersion>(conn)
272 .optional()
273 })?;
274
275 let Some(object_version_info) = object_version_info else {
276 let pool = self.get_pool();
278 let object_id_bytes = object_id.to_vec();
279 let latest_existing_version: Option<i64> = run_query_async!(&pool, move |conn| {
280 objects_version::dsl::objects_version
281 .filter(objects_version::object_id.eq(&object_id_bytes))
282 .order_by(objects_version::object_version.desc())
283 .select(objects_version::object_version)
284 .limit(1)
285 .first::<i64>(conn)
286 .optional()
287 })?;
288
289 return match latest_existing_version {
290 Some(latest) if object_version_num > latest => Ok(PastObjectRead::VersionTooHigh {
291 object_id,
292 asked_version: object_version,
293 latest_version: SequenceNumber::from(latest as u64),
294 }),
295 Some(_) => Ok(PastObjectRead::VersionNotFound(object_id, object_version)),
296 None => Ok(PastObjectRead::ObjectNotExists(object_id)),
297 };
298 };
299
300 let history_object = self
302 .get_stored_history_object(
303 object_id,
304 object_version_info.object_version,
305 object_version_info.cp_sequence_number,
306 )
307 .await?;
308
309 match history_object {
310 Some(obj) => {
311 obj.try_into_past_object_read(self.package_resolver.clone())
312 .await
313 }
314 None => Err(IndexerError::PersistentStorageDataCorruption(format!(
315 "Object version {} not found in objects_history for object {}",
316 object_version_info.object_version, object_id
317 ))),
318 }
319 }
320
321 pub async fn get_stored_history_object(
322 &self,
323 object_id: ObjectID,
324 object_version: i64,
325 checkpoint_sequence_number: i64,
326 ) -> Result<Option<StoredHistoryObject>, IndexerError> {
327 let pool = self.get_pool();
328 let object_id_bytes = object_id.to_vec();
329 run_query_async!(&pool, move |conn| {
330 let query = objects_history::dsl::objects_history
332 .filter(objects_history::checkpoint_sequence_number.eq(checkpoint_sequence_number))
333 .filter(objects_history::object_id.eq(&object_id_bytes))
334 .filter(objects_history::object_version.eq(object_version))
335 .into_boxed();
336
337 query
338 .order_by(objects_history::object_version.desc())
339 .limit(1)
340 .first::<StoredHistoryObject>(conn)
341 .optional()
342 })
343 }
344
345 pub async fn get_package(&self, package_id: ObjectID) -> Result<Package, IndexerError> {
346 let store = self.package_resolver.package_store();
347 let pkg = store
348 .fetch(package_id.into())
349 .await
350 .map_err(|e| {
351 IndexerError::PostgresRead(format!(
352 "Fail to fetch package from package store with error {:?}",
353 e
354 ))
355 })?
356 .as_ref()
357 .clone();
358 Ok(pkg)
359 }
360
361 pub fn get_epoch_info_from_db(
362 &self,
363 epoch: Option<EpochId>,
364 ) -> Result<Option<StoredEpochInfo>, IndexerError> {
365 let stored_epoch = run_query!(&self.pool, |conn| {
366 if let Some(epoch) = epoch {
367 epochs::dsl::epochs
368 .filter(epochs::epoch.eq(epoch as i64))
369 .first::<StoredEpochInfo>(conn)
370 .optional()
371 } else {
372 epochs::dsl::epochs
373 .order_by(epochs::epoch.desc())
374 .first::<StoredEpochInfo>(conn)
375 .optional()
376 }
377 })?;
378
379 Ok(stored_epoch)
380 }
381
382 pub fn get_latest_epoch_info_from_db(&self) -> Result<StoredEpochInfo, IndexerError> {
383 let stored_epoch = run_query!(&self.pool, |conn| {
384 epochs::dsl::epochs
385 .order_by(epochs::epoch.desc())
386 .first::<StoredEpochInfo>(conn)
387 })?;
388
389 Ok(stored_epoch)
390 }
391
392 pub fn get_epoch_info(
393 &self,
394 epoch: Option<EpochId>,
395 ) -> Result<Option<EpochInfo>, IndexerError> {
396 let stored_epoch = self.get_epoch_info_from_db(epoch)?;
397
398 let stored_epoch = match stored_epoch {
399 Some(stored_epoch) => stored_epoch,
400 None => return Ok(None),
401 };
402
403 let epoch_info = EpochInfo::try_from(stored_epoch)?;
404 Ok(Some(epoch_info))
405 }
406
407 fn get_epochs_from_db(
408 &self,
409 cursor: Option<u64>,
410 limit: usize,
411 descending_order: bool,
412 ) -> Result<Vec<StoredEpochInfo>, IndexerError> {
413 run_query!(&self.pool, |conn| {
414 let mut boxed_query = epochs::table.into_boxed();
415 if let Some(cursor) = cursor {
416 if descending_order {
417 boxed_query = boxed_query.filter(epochs::epoch.lt(cursor as i64));
418 } else {
419 boxed_query = boxed_query.filter(epochs::epoch.gt(cursor as i64));
420 }
421 }
422 if descending_order {
423 boxed_query = boxed_query.order_by(epochs::epoch.desc());
424 } else {
425 boxed_query = boxed_query.order_by(epochs::epoch.asc());
426 }
427
428 boxed_query.limit(limit as i64).load(conn)
429 })
430 }
431
432 pub fn get_epochs(
433 &self,
434 cursor: Option<u64>,
435 limit: usize,
436 descending_order: bool,
437 ) -> Result<Vec<EpochInfo>, IndexerError> {
438 self.get_epochs_from_db(cursor, limit, descending_order)?
439 .into_iter()
440 .map(EpochInfo::try_from)
441 .collect::<Result<Vec<_>, _>>()
442 }
443
444 pub fn get_latest_iota_system_state(&self) -> Result<IotaSystemStateSummary, IndexerError> {
445 let system_state: IotaSystemStateSummary =
446 iota_types::iota_system_state::get_iota_system_state(self)?
447 .into_iota_system_state_summary();
448 Ok(system_state)
449 }
450
451 pub fn get_epoch_iota_system_state(
458 &self,
459 epoch: Option<EpochId>,
460 ) -> Result<IotaSystemStateSummary, IndexerError> {
461 let stored_epoch = self.get_epoch_info_from_db(epoch)?;
462 let stored_epoch = match stored_epoch {
463 Some(stored_epoch) => stored_epoch,
464 None => return Err(IndexerError::InvalidArgument("Invalid epoch".into())),
465 };
466
467 let system_state: IotaSystemStateSummary = bcs::from_bytes(&stored_epoch.system_state)
468 .map_err(|_| {
469 IndexerError::PersistentStorageDataCorruption(format!(
470 "Failed to deserialize `system_state` for epoch {:?}",
471 epoch,
472 ))
473 })?;
474 Ok(system_state)
475 }
476
477 pub async fn get_chain_identifier_in_blocking_task(
478 &self,
479 ) -> Result<ChainIdentifier, IndexerError> {
480 self.spawn_blocking(|this| this.get_chain_identifier())
481 .await
482 }
483
484 pub fn get_chain_identifier(&self) -> Result<ChainIdentifier, IndexerError> {
485 let stored_chain_identifier = run_query!(&self.pool, |conn| {
486 chain_identifier::dsl::chain_identifier
487 .first::<StoredChainIdentifier>(conn)
488 .optional()
489 })?
490 .ok_or(IndexerError::PostgresRead(
491 "chain identifier not found".to_string(),
492 ))?;
493
494 let checkpoint_digest =
495 CheckpointDigest::try_from(stored_chain_identifier.checkpoint_digest).map_err(|e| {
496 IndexerError::PersistentStorageDataCorruption(format!(
497 "failed to decode chain identifier with err: {e:?}"
498 ))
499 })?;
500
501 Ok(checkpoint_digest.into())
502 }
503
504 pub fn get_checkpoint_from_db(
505 &self,
506 checkpoint_id: CheckpointId,
507 ) -> Result<Option<StoredCheckpoint>, IndexerError> {
508 let stored_checkpoint = run_query!(&self.pool, |conn| {
509 match checkpoint_id {
510 CheckpointId::SequenceNumber(seq) => checkpoints::dsl::checkpoints
511 .filter(checkpoints::sequence_number.eq(seq as i64))
512 .first::<StoredCheckpoint>(conn)
513 .optional(),
514 CheckpointId::Digest(digest) => checkpoints::dsl::checkpoints
515 .filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec()))
516 .first::<StoredCheckpoint>(conn)
517 .optional(),
518 }
519 })?;
520
521 Ok(stored_checkpoint)
522 }
523
524 pub fn get_latest_checkpoint_from_db(&self) -> Result<StoredCheckpoint, IndexerError> {
525 let stored_checkpoint = run_query!(&self.pool, |conn| {
526 checkpoints::dsl::checkpoints
527 .order_by(checkpoints::sequence_number.desc())
528 .first::<StoredCheckpoint>(conn)
529 })?;
530
531 Ok(stored_checkpoint)
532 }
533
534 pub fn get_checkpoint(
535 &self,
536 checkpoint_id: CheckpointId,
537 ) -> Result<Option<iota_json_rpc_types::Checkpoint>, IndexerError> {
538 let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id)? {
539 Some(stored_checkpoint) => stored_checkpoint,
540 None => return Ok(None),
541 };
542
543 let checkpoint = iota_json_rpc_types::Checkpoint::try_from(stored_checkpoint)?;
544 Ok(Some(checkpoint))
545 }
546
547 pub fn get_latest_checkpoint(&self) -> Result<iota_json_rpc_types::Checkpoint, IndexerError> {
548 let stored_checkpoint = self.get_latest_checkpoint_from_db()?;
549
550 iota_json_rpc_types::Checkpoint::try_from(stored_checkpoint)
551 }
552
553 pub async fn get_latest_checkpoint_timestamp_ms_in_blocking_task(
554 &self,
555 ) -> Result<u64, IndexerError> {
556 self.spawn_blocking(|this| this.get_latest_checkpoint_timestamp_ms())
557 .await
558 }
559
560 pub fn get_latest_checkpoint_timestamp_ms(&self) -> Result<u64, IndexerError> {
561 Ok(self.get_latest_checkpoint()?.timestamp_ms)
562 }
563
564 fn get_checkpoints_from_db(
565 &self,
566 cursor: Option<u64>,
567 limit: usize,
568 descending_order: bool,
569 ) -> Result<Vec<StoredCheckpoint>, IndexerError> {
570 run_query!(&self.pool, |conn| {
571 let mut boxed_query = checkpoints::table.into_boxed();
572 if let Some(cursor) = cursor {
573 if descending_order {
574 boxed_query =
575 boxed_query.filter(checkpoints::sequence_number.lt(cursor as i64));
576 } else {
577 boxed_query =
578 boxed_query.filter(checkpoints::sequence_number.gt(cursor as i64));
579 }
580 }
581 if descending_order {
582 boxed_query = boxed_query.order_by(checkpoints::sequence_number.desc());
583 } else {
584 boxed_query = boxed_query.order_by(checkpoints::sequence_number.asc());
585 }
586
587 boxed_query
588 .limit(limit as i64)
589 .load::<StoredCheckpoint>(conn)
590 })
591 }
592
593 pub fn get_checkpoints(
594 &self,
595 cursor: Option<u64>,
596 limit: usize,
597 descending_order: bool,
598 ) -> Result<Vec<iota_json_rpc_types::Checkpoint>, IndexerError> {
599 self.get_checkpoints_from_db(cursor, limit, descending_order)?
600 .into_iter()
601 .map(iota_json_rpc_types::Checkpoint::try_from)
602 .collect()
603 }
604
605 fn get_transaction_effects_with_digest(
606 &self,
607 digest: TransactionDigest,
608 ) -> Result<IotaTransactionBlockEffects, IndexerError> {
609 let stored_txn: StoredTransaction = run_query!(&self.pool, |conn| {
610 transactions::table
611 .filter(
612 transactions::tx_sequence_number
613 .nullable()
614 .eq(tx_digests::table
615 .select(tx_digests::tx_sequence_number)
616 .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
619 .single_value()),
620 )
621 .first::<StoredTransaction>(conn)
622 })?;
623
624 stored_txn.try_into_iota_transaction_effects()
625 }
626
627 fn get_transaction_effects_with_sequence_number(
628 &self,
629 sequence_number: i64,
630 ) -> Result<IotaTransactionBlockEffects, IndexerError> {
631 let stored_txn: StoredTransaction = run_query!(&self.pool, |conn| {
632 transactions::table
633 .filter(transactions::tx_sequence_number.eq(sequence_number))
634 .first::<StoredTransaction>(conn)
635 })?;
636
637 stored_txn.try_into_iota_transaction_effects()
638 }
639
640 fn multi_get_transactions(
641 &self,
642 digests: &[TransactionDigest],
643 ) -> Result<Vec<StoredTransaction>, IndexerError> {
644 let digests = digests
645 .iter()
646 .map(|digest| digest.inner().to_vec())
647 .collect::<HashSet<_>>();
648 let checkpointed_txs = run_query!(&self.pool, |conn| {
649 transactions::table
650 .inner_join(
651 tx_digests::table
652 .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
653 )
654 .filter(tx_digests::tx_digest.eq_any(&digests))
657 .select(StoredTransaction::as_select())
658 .load::<StoredTransaction>(conn)
659 })?;
660 if checkpointed_txs.len() == digests.len() {
661 return Ok(checkpointed_txs);
662 }
663 let mut missing_digests = digests;
664 for tx in &checkpointed_txs {
665 missing_digests.remove(&tx.transaction_digest);
666 }
667 let optimistic_txs = run_query!(&self.pool, |conn| {
668 optimistic_transactions::table
669 .inner_join(
670 tx_insertion_order::table.on(optimistic_transactions::insertion_order
671 .eq(tx_insertion_order::insertion_order)),
672 )
673 .filter(tx_insertion_order::tx_digest.eq_any(missing_digests))
676 .select(OptimisticTransaction::as_select())
677 .load::<OptimisticTransaction>(conn)
678 })?;
679 Ok(checkpointed_txs
680 .into_iter()
681 .chain(optimistic_txs.into_iter().map(Into::into))
682 .collect())
683 }
684
685 async fn multi_get_transactions_in_blocking_task(
686 &self,
687 digests: Vec<TransactionDigest>,
688 ) -> Result<Vec<StoredTransaction>, IndexerError> {
689 self.spawn_blocking(move |this| this.multi_get_transactions(&digests))
690 .await
691 }
692
693 async fn stored_transaction_to_transaction_block(
696 &self,
697 stored_txes: Vec<StoredTransaction>,
698 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
699 ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
700 let mut tx_block_responses_futures = vec![];
701 for stored_tx in stored_txes {
702 let package_resolver_clone = self.package_resolver();
703 let options_clone = options.clone();
704 tx_block_responses_futures.push(tokio::task::spawn(
705 stored_tx.try_into_iota_transaction_block_response(
706 options_clone,
707 package_resolver_clone,
708 ),
709 ));
710 }
711
712 let tx_blocks = futures::future::join_all(tx_block_responses_futures)
713 .await
714 .into_iter()
715 .collect::<Result<Vec<_>, _>>()
716 .tap_err(|e| tracing::error!("Failed to join all tx block futures: {}", e))?
717 .into_iter()
718 .collect::<Result<Vec<_>, _>>()
719 .tap_err(|e| tracing::error!("Failed to collect tx block futures: {}", e))?;
720 Ok(tx_blocks)
721 }
722
723 fn multi_get_transactions_with_sequence_numbers(
724 &self,
725 tx_sequence_numbers: Vec<i64>,
726 is_descending: Option<bool>,
728 ) -> Result<Vec<StoredTransaction>, IndexerError> {
729 let mut query = transactions::table
730 .filter(transactions::tx_sequence_number.eq_any(tx_sequence_numbers))
731 .into_boxed();
732 match is_descending {
733 Some(true) => {
734 query = query.order(transactions::dsl::tx_sequence_number.desc());
735 }
736 Some(false) => {
737 query = query.order(transactions::dsl::tx_sequence_number.asc());
738 }
739 None => (),
740 }
741 run_query!(&self.pool, |conn| query.load::<StoredTransaction>(conn))
742 }
743
744 pub async fn get_owned_objects_in_blocking_task(
745 &self,
746 address: IotaAddress,
747 filter: Option<IotaObjectDataFilter>,
748 cursor: Option<ObjectID>,
749 limit: usize,
750 ) -> Result<Vec<StoredObject>, IndexerError> {
751 self.spawn_blocking(move |this| this.get_owned_objects_impl(address, filter, cursor, limit))
752 .await
753 }
754
755 fn get_owned_objects_impl(
756 &self,
757 address: IotaAddress,
758 filter: Option<IotaObjectDataFilter>,
759 cursor: Option<ObjectID>,
760 limit: usize,
761 ) -> Result<Vec<StoredObject>, IndexerError> {
762 run_query!(&self.pool, |conn| {
763 let mut query = objects::dsl::objects
764 .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
765 .filter(objects::dsl::owner_id.eq(address.to_vec()))
766 .order(objects::dsl::object_id.asc())
767 .limit(limit as i64)
768 .into_boxed();
769 if let Some(filter) = filter {
770 match filter {
771 IotaObjectDataFilter::StructType(struct_tag) => {
772 let object_type =
773 struct_tag.to_canonical_string(true);
774 query =
775 query.filter(objects::object_type.like(format!("{}%", object_type)));
776 }
777 IotaObjectDataFilter::MatchAny(filters) => {
778 let mut condition = "(".to_string();
779 for (i, filter) in filters.iter().enumerate() {
780 if let IotaObjectDataFilter::StructType(struct_tag) = filter {
781 let object_type =
782 struct_tag.to_canonical_string(true);
783 if i == 0 {
784 condition +=
785 format!("objects.object_type LIKE '{}%'", object_type)
786 .as_str();
787 } else {
788 condition +=
789 format!(" OR objects.object_type LIKE '{}%'", object_type)
790 .as_str();
791 }
792 } else {
793 return Err(IndexerError::InvalidArgument(
794 "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
795 ));
796 }
797 }
798 condition += ")";
799 query = query.filter(sql::<Bool>(&condition));
800 }
801 IotaObjectDataFilter::MatchNone(filters) => {
802 for filter in filters {
803 if let IotaObjectDataFilter::StructType(struct_tag) = filter {
804 let object_type =
805 struct_tag.to_canonical_string(true);
806 query = query.filter(
807 objects::object_type.not_like(format!("{}%", object_type)),
808 );
809 } else {
810 return Err(IndexerError::InvalidArgument(
811 "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
812 ));
813 }
814 }
815 }
816 _ => {
817 return Err(IndexerError::InvalidArgument(
818 "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
819 ));
820 }
821 }
822 }
823
824 if let Some(object_cursor) = cursor {
825 query = query.filter(objects::dsl::object_id.gt(object_cursor.to_vec()));
826 }
827
828 query
829 .load::<StoredObject>(conn)
830 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
831 })
832 }
833
834 fn filter_object_id_with_type(
835 &self,
836 object_ids: Vec<ObjectID>,
837 object_type: String,
838 ) -> Result<Vec<ObjectID>, IndexerError> {
839 let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
840 let filtered_ids = run_query!(&self.pool, |conn| {
841 objects::dsl::objects
842 .filter(objects::object_id.eq_any(object_ids))
843 .filter(objects::object_type.eq(object_type))
844 .select(objects::object_id)
845 .load::<Vec<u8>>(conn)
846 })?;
847
848 filtered_ids
849 .into_iter()
850 .map(|id| {
851 ObjectID::from_bytes(id.clone()).map_err(|_e| {
852 IndexerError::PersistentStorageDataCorruption(format!(
853 "Can't convert {:?} to ObjectID",
854 id,
855 ))
856 })
857 })
858 .collect::<Result<Vec<_>, _>>()
859 }
860
861 pub async fn multi_get_objects_in_blocking_task(
862 &self,
863 object_ids: Vec<ObjectID>,
864 ) -> Result<Vec<StoredObject>, IndexerError> {
865 self.spawn_blocking(move |this| this.multi_get_objects_impl(object_ids))
866 .await
867 }
868
869 fn multi_get_objects_impl(
870 &self,
871 object_ids: Vec<ObjectID>,
872 ) -> Result<Vec<StoredObject>, IndexerError> {
873 let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
874 run_query!(&self.pool, |conn| {
875 objects::dsl::objects
876 .filter(objects::object_id.eq_any(object_ids))
877 .load::<StoredObject>(conn)
878 })
879 }
880
881 async fn query_transaction_blocks_by_checkpoint_impl(
882 &self,
883 checkpoint_seq: u64,
884 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
885 cursor_tx_seq: Option<i64>,
886 limit: usize,
887 is_descending: bool,
888 ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
889 let pool = self.get_pool();
890 let tx_range: (i64, i64) = run_query_async!(&pool, move |conn| {
891 pruner_cp_watermark::dsl::pruner_cp_watermark
892 .select((
893 pruner_cp_watermark::min_tx_sequence_number,
894 pruner_cp_watermark::max_tx_sequence_number,
895 ))
896 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint_seq as i64))
899 .first::<(i64, i64)>(conn)
900 })?;
901
902 let mut query = transactions::dsl::transactions
903 .filter(transactions::tx_sequence_number.between(tx_range.0, tx_range.1))
904 .into_boxed();
905
906 if let Some(cursor_tx_seq) = cursor_tx_seq {
908 if is_descending {
909 query = query.filter(transactions::dsl::tx_sequence_number.lt(cursor_tx_seq));
910 } else {
911 query = query.filter(transactions::dsl::tx_sequence_number.gt(cursor_tx_seq));
912 }
913 }
914 if is_descending {
915 query = query.order(transactions::dsl::tx_sequence_number.desc());
916 } else {
917 query = query.order(transactions::dsl::tx_sequence_number.asc());
918 }
919 let pool = self.get_pool();
920 let stored_txes = run_query_async!(&pool, move |conn| query
921 .limit(limit as i64)
922 .load::<StoredTransaction>(conn))?;
923
924 self.stored_transaction_to_transaction_block(stored_txes, options)
925 .await
926 }
927
928 pub async fn query_transaction_blocks_in_blocking_task(
929 &self,
930 filter: Option<TransactionFilter>,
931 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
932 cursor: Option<TransactionDigest>,
933 limit: usize,
934 is_descending: bool,
935 ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
936 self.query_transaction_blocks_impl(filter, options, cursor, limit, is_descending)
937 .await
938 }
939
940 async fn query_transaction_blocks_impl(
941 &self,
942 filter: Option<TransactionFilter>,
943 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
944 cursor: Option<TransactionDigest>,
945 limit: usize,
946 is_descending: bool,
947 ) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
948 let cursor_tx_seq = if let Some(cursor) = cursor {
949 let pool = self.get_pool();
950 let tx_seq = run_query_async!(&pool, move |conn| {
951 tx_digests::table
952 .select(tx_digests::tx_sequence_number)
953 .filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec()))
956 .first::<i64>(conn)
957 })?;
958 Some(tx_seq)
959 } else {
960 None
961 };
962 let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
963 if is_descending {
964 format!("AND {TX_SEQUENCE_NUMBER_STR} < {}", cursor_tx_seq)
965 } else {
966 format!("AND {TX_SEQUENCE_NUMBER_STR} > {}", cursor_tx_seq)
967 }
968 } else {
969 "".to_string()
970 };
971 let order_str = if is_descending { "DESC" } else { "ASC" };
972 let (table_name, main_where_clause) = match filter {
973 Some(TransactionFilter::Checkpoint(seq)) => {
975 return self
976 .query_transaction_blocks_by_checkpoint_impl(
977 seq,
978 options,
979 cursor_tx_seq,
980 limit,
981 is_descending,
982 )
983 .await;
984 }
985 Some(TransactionFilter::MoveFunction {
987 package,
988 module,
989 function,
990 }) => {
991 let package = Hex::encode(package.to_vec());
992 match (module, function) {
993 (Some(module), Some(function)) => (
994 "tx_calls_fun".into(),
995 format!(
996 "package = '\\x{}'::bytea AND module = '{}' AND func = '{}'",
997 package, module, function
998 ),
999 ),
1000 (Some(module), None) => (
1001 "tx_calls_mod".into(),
1002 format!(
1003 "package = '\\x{}'::bytea AND module = '{}'",
1004 package, module
1005 ),
1006 ),
1007 (None, Some(_)) => {
1008 return Err(IndexerError::InvalidArgument(
1009 "Function cannot be present without Module.".into(),
1010 ));
1011 }
1012 (None, None) => (
1013 "tx_calls_pkg".into(),
1014 format!("package = '\\x{}'::bytea", package),
1015 ),
1016 }
1017 }
1018 Some(TransactionFilter::InputObject(object_id)) => {
1019 let object_id = Hex::encode(object_id.to_vec());
1020 (
1021 "tx_input_objects".into(),
1022 format!("object_id = '\\x{}'::bytea", object_id),
1023 )
1024 }
1025 Some(TransactionFilter::ChangedObject(object_id)) => {
1026 let object_id = Hex::encode(object_id.to_vec());
1027 (
1028 "tx_changed_objects".into(),
1029 format!("object_id = '\\x{}'::bytea", object_id),
1030 )
1031 }
1032 Some(TransactionFilter::FromAddress(from_address)) => {
1033 let from_address = Hex::encode(from_address.to_vec());
1034 (
1035 "tx_senders".into(),
1036 format!("sender = '\\x{}'::bytea", from_address),
1037 )
1038 }
1039 Some(TransactionFilter::ToAddress(to_address)) => {
1040 let to_address = Hex::encode(to_address.to_vec());
1041 (
1042 "tx_recipients".into(),
1043 format!("recipient = '\\x{}'::bytea", to_address),
1044 )
1045 }
1046 Some(TransactionFilter::FromAndToAddress { from, to }) => {
1047 let from_address = Hex::encode(from.to_vec());
1048 let to_address = Hex::encode(to.to_vec());
1049 let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
1051 if is_descending {
1052 format!(
1053 "AND tx_senders.{TX_SEQUENCE_NUMBER_STR} < {}",
1054 cursor_tx_seq
1055 )
1056 } else {
1057 format!(
1058 "AND tx_senders.{TX_SEQUENCE_NUMBER_STR} > {}",
1059 cursor_tx_seq
1060 )
1061 }
1062 } else {
1063 "".to_string()
1064 };
1065 let inner_query = format!(
1066 "(SELECT tx_senders.{TX_SEQUENCE_NUMBER_STR} \
1067 FROM tx_senders \
1068 JOIN tx_recipients \
1069 ON tx_senders.{TX_SEQUENCE_NUMBER_STR} = tx_recipients.{TX_SEQUENCE_NUMBER_STR} \
1070 WHERE tx_senders.sender = '\\x{}'::BYTEA \
1071 AND tx_recipients.recipient = '\\x{}'::BYTEA \
1072 {} \
1073 ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1074 LIMIT {}) AS inner_query
1075 ",
1076 from_address,
1077 to_address,
1078 cursor_clause,
1079 order_str,
1080 limit,
1081 );
1082 (inner_query, "1 = 1".into())
1083 }
1084 Some(TransactionFilter::FromOrToAddress { addr }) => {
1085 let address = Hex::encode(addr.to_vec());
1086 let inner_query = format!(
1087 "( \
1088 ( \
1089 SELECT {TX_SEQUENCE_NUMBER_STR} FROM tx_senders \
1090 WHERE sender = '\\x{}'::BYTEA {} \
1091 ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1092 LIMIT {} \
1093 ) \
1094 UNION \
1095 ( \
1096 SELECT {TX_SEQUENCE_NUMBER_STR} FROM tx_recipients \
1097 WHERE recipient = '\\x{}'::BYTEA {} \
1098 ORDER BY {TX_SEQUENCE_NUMBER_STR} {} \
1099 LIMIT {} \
1100 ) \
1101 ) AS combined",
1102 address,
1103 cursor_clause,
1104 order_str,
1105 limit,
1106 address,
1107 cursor_clause,
1108 order_str,
1109 limit,
1110 );
1111 (inner_query, "1 = 1".into())
1112 }
1113 Some(TransactionFilter::TransactionKind(kind)) => {
1114 if kind == IotaTransactionKind::SystemTransaction {
1117 ("tx_kinds".into(), "tx_kind != 1".to_string())
1118 } else {
1119 ("tx_kinds".into(), format!("tx_kind = {}", kind as u8))
1120 }
1121 }
1122 Some(TransactionFilter::TransactionKindIn(kind_vec)) => {
1123 if kind_vec.is_empty() {
1124 return Err(IndexerError::InvalidArgument(
1125 "no transaction kind provided".into(),
1126 ));
1127 }
1128
1129 let mut has_system_transaction = false;
1130 let mut has_programmable_transaction = false;
1131 let mut other_kinds = HashSet::new();
1132
1133 for kind in kind_vec.iter() {
1134 match kind {
1135 IotaTransactionKind::SystemTransaction => has_system_transaction = true,
1136 IotaTransactionKind::ProgrammableTransaction => {
1137 has_programmable_transaction = true
1138 }
1139 other => {
1140 other_kinds.insert(*other as u8);
1141 }
1142 }
1143 }
1144
1145 let query = if has_system_transaction {
1146 if !has_programmable_transaction {
1149 "tx_kind != 1".to_string()
1150 } else {
1151 "1 = 1".to_string()
1153 }
1154 } else {
1155 if has_programmable_transaction {
1157 other_kinds.insert(IotaTransactionKind::ProgrammableTransaction as u8);
1158 }
1159
1160 if other_kinds.is_empty() {
1161 "1 = 1".to_string()
1163 } else {
1164 let mut query = String::from("tx_kind IN (");
1165 query.push_str(
1166 &other_kinds
1167 .iter()
1168 .map(ToString::to_string)
1169 .collect::<Vec<_>>()
1170 .join(", "),
1171 );
1172 query.push(')');
1173 query
1174 }
1175 };
1176
1177 ("tx_kinds".into(), query)
1178 }
1179 None => {
1180 ("transactions".into(), "1 = 1".into())
1182 }
1183 };
1184
1185 let query = format!(
1186 "SELECT {TX_SEQUENCE_NUMBER_STR} FROM {} WHERE {} {} ORDER BY {TX_SEQUENCE_NUMBER_STR} {} LIMIT {}",
1187 table_name, main_where_clause, cursor_clause, order_str, limit,
1188 );
1189
1190 tracing::debug!("query transaction blocks: {}", query);
1191 let pool = self.get_pool();
1192 let tx_sequence_numbers = run_query_async!(&pool, move |conn| {
1193 diesel::sql_query(query.clone()).load::<TxSequenceNumber>(conn)
1194 })?
1195 .into_iter()
1196 .map(|tsn| tsn.tx_sequence_number)
1197 .collect::<Vec<i64>>();
1198 self.multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
1199 tx_sequence_numbers,
1200 options,
1201 Some(is_descending),
1202 )
1203 .await
1204 }
1205
1206 async fn multi_get_transaction_block_response_in_blocking_task_impl(
1207 &self,
1208 digests: &[TransactionDigest],
1209 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1210 ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1211 let stored_txes = self
1212 .multi_get_transactions_in_blocking_task(digests.to_vec())
1213 .await?;
1214 self.stored_transaction_to_transaction_block(stored_txes, options)
1215 .await
1216 }
1217
1218 async fn multi_get_transaction_block_response_by_sequence_numbers_in_blocking_task(
1219 &self,
1220 tx_sequence_numbers: Vec<i64>,
1221 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1222 is_descending: Option<bool>,
1224 ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1225 let stored_txes: Vec<StoredTransaction> = self
1226 .spawn_blocking(move |this| {
1227 this.multi_get_transactions_with_sequence_numbers(
1228 tx_sequence_numbers,
1229 is_descending,
1230 )
1231 })
1232 .await?;
1233 self.stored_transaction_to_transaction_block(stored_txes, options)
1234 .await
1235 }
1236
1237 pub async fn multi_get_transaction_block_response_in_blocking_task(
1238 &self,
1239 digests: Vec<TransactionDigest>,
1240 options: iota_json_rpc_types::IotaTransactionBlockResponseOptions,
1241 ) -> Result<Vec<iota_json_rpc_types::IotaTransactionBlockResponse>, IndexerError> {
1242 self.multi_get_transaction_block_response_in_blocking_task_impl(&digests, options)
1243 .await
1244 }
1245
1246 pub async fn get_transaction_events_in_blocking_task(
1247 &self,
1248 digest: TransactionDigest,
1249 ) -> Result<Vec<iota_json_rpc_types::IotaEvent>, IndexerError> {
1250 let pool = self.get_pool();
1251 let (timestamp_ms, serialized_events) = run_query_async!(&pool, move |conn| {
1252 transactions::table
1253 .filter(
1254 transactions::tx_sequence_number
1255 .nullable()
1256 .eq(tx_digests::table
1257 .select(tx_digests::tx_sequence_number)
1258 .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
1261 .single_value()),
1262 )
1263 .select((transactions::timestamp_ms, transactions::events))
1264 .first::<(i64, StoredTransactionEvents)>(conn)
1265 })?;
1266
1267 let events = stored_events_to_events(serialized_events)?;
1268 let tx_events = TransactionEvents { data: events };
1269
1270 let iota_tx_events = tx_events_to_iota_tx_events(
1271 tx_events,
1272 self.package_resolver(),
1273 digest,
1274 timestamp_ms as u64,
1275 )
1276 .await?;
1277 Ok(iota_tx_events.map_or(vec![], |ste| ste.data))
1278 }
1279
1280 async fn query_events_by_tx_digest(
1281 &self,
1282 tx_digest: TransactionDigest,
1283 cursor: Option<EventID>,
1284 limit: usize,
1285 descending_order: bool,
1286 ) -> IndexerResult<Vec<IotaEvent>> {
1287 let mut query = events::table.into_boxed();
1288
1289 if let Some(cursor) = cursor {
1290 if cursor.tx_digest != tx_digest {
1291 return Err(IndexerError::InvalidArgument(
1292 "Cursor tx_digest does not match the tx_digest in the query.".into(),
1293 ));
1294 }
1295 if descending_order {
1296 query = query.filter(events::event_sequence_number.lt(cursor.event_seq as i64));
1297 } else {
1298 query = query.filter(events::event_sequence_number.gt(cursor.event_seq as i64));
1299 }
1300 } else if descending_order {
1301 query = query.filter(events::event_sequence_number.le(i64::MAX));
1302 } else {
1303 query = query.filter(events::event_sequence_number.ge(0));
1304 };
1305
1306 if descending_order {
1307 query = query.order(events::event_sequence_number.desc());
1308 } else {
1309 query = query.order(events::event_sequence_number.asc());
1310 }
1311
1312 query = query.filter(
1313 events::tx_sequence_number.nullable().eq(tx_digests::table
1314 .select(tx_digests::tx_sequence_number)
1315 .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1318 .single_value()),
1319 );
1320
1321 let pool = self.get_pool();
1322 let stored_events = run_query_async!(&pool, move |conn| {
1323 query.limit(limit as i64).load::<StoredEvent>(conn)
1324 })?;
1325
1326 let mut iota_event_futures = vec![];
1327 for stored_event in stored_events {
1328 iota_event_futures.push(tokio::task::spawn(
1329 stored_event.try_into_iota_event(self.package_resolver.clone()),
1330 ));
1331 }
1332
1333 let iota_events = futures::future::join_all(iota_event_futures)
1334 .await
1335 .into_iter()
1336 .collect::<Result<Vec<_>, _>>()
1337 .tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))?
1338 .into_iter()
1339 .collect::<Result<Vec<_>, _>>()
1340 .tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?;
1341 Ok(iota_events)
1342 }
1343
1344 pub async fn query_events_in_blocking_task(
1345 &self,
1346 filter: EventFilter,
1347 cursor: Option<EventID>,
1348 limit: usize,
1349 descending_order: bool,
1350 ) -> IndexerResult<Vec<IotaEvent>> {
1351 let pool = self.get_pool();
1352 let (tx_seq, event_seq) = if let Some(cursor) = cursor {
1353 let EventID {
1354 tx_digest,
1355 event_seq,
1356 } = cursor;
1357 let tx_seq = run_query_async!(&pool, move |conn| {
1358 transactions::dsl::transactions
1359 .select(transactions::tx_sequence_number)
1360 .filter(
1361 transactions::tx_sequence_number
1362 .nullable()
1363 .eq(tx_digests::table
1364 .select(tx_digests::tx_sequence_number)
1365 .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
1368 .single_value()),
1369 )
1370 .first::<i64>(conn)
1371 })?;
1372 (tx_seq, event_seq as i64)
1373 } else if descending_order {
1374 let max_tx_seq = i64::MAX;
1375 let max_event_seq = i64::MAX;
1376 (max_tx_seq, max_event_seq)
1377 } else {
1378 (-1, 0)
1379 };
1380
1381 let query = if let EventFilter::Sender(sender) = &filter {
1382 let cursor_clause = if descending_order {
1384 format!(
1385 "(e.{TX_SEQUENCE_NUMBER_STR} < {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} < {}))",
1386 tx_seq, tx_seq, event_seq
1387 )
1388 } else {
1389 format!(
1390 "(e.{TX_SEQUENCE_NUMBER_STR} > {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} > {}))",
1391 tx_seq, tx_seq, event_seq
1392 )
1393 };
1394 let order_clause = if descending_order {
1395 format!("e.{TX_SEQUENCE_NUMBER_STR} DESC, e.{EVENT_SEQUENCE_NUMBER_STR} DESC")
1396 } else {
1397 format!("e.{TX_SEQUENCE_NUMBER_STR} ASC, e.{EVENT_SEQUENCE_NUMBER_STR} ASC")
1398 };
1399 format!(
1400 "( \
1401 SELECT *
1402 FROM tx_senders s
1403 JOIN events e
1404 ON e.tx_sequence_number = s.tx_sequence_number
1405 AND s.sender = '\\x{}'::bytea
1406 WHERE {} \
1407 ORDER BY {} \
1408 LIMIT {}
1409 )",
1410 Hex::encode(sender.to_vec()),
1411 cursor_clause,
1412 order_clause,
1413 limit,
1414 )
1415 } else if let EventFilter::Transaction(tx_digest) = filter {
1416 return self
1417 .query_events_by_tx_digest(tx_digest, cursor, limit, descending_order)
1418 .await;
1419 } else {
1420 let main_where_clause = match filter {
1421 EventFilter::Package(package_id) => {
1422 format!("package = '\\x{}'::bytea", package_id.to_hex())
1423 }
1424 EventFilter::MoveModule { package, module } => {
1425 format!(
1426 "package = '\\x{}'::bytea AND module = '{}'",
1427 package.to_hex(),
1428 module,
1429 )
1430 }
1431 EventFilter::MoveEventType(struct_tag) => {
1432 let formatted_struct_tag = struct_tag.to_canonical_string(true);
1433 format!("event_type = '{formatted_struct_tag}'")
1434 }
1435 EventFilter::MoveEventModule { package, module } => {
1436 let package_module_prefix = format!("{}::{}", package.to_hex_literal(), module);
1437 format!("event_type LIKE '{package_module_prefix}::%'")
1438 }
1439 EventFilter::Sender(_) => {
1440 unreachable!()
1442 }
1443 EventFilter::Transaction(_) => {
1444 unreachable!()
1446 }
1447 EventFilter::MoveEventField { .. }
1448 | EventFilter::All(_)
1449 | EventFilter::Any(_)
1450 | EventFilter::And(_, _)
1451 | EventFilter::Or(_, _)
1452 | EventFilter::TimeRange { .. } => {
1453 return Err(IndexerError::NotSupported(
1454 "This type of EventFilter is not supported.".into(),
1455 ));
1456 }
1457 };
1458
1459 let cursor_clause = if descending_order {
1460 format!(
1461 "AND ({TX_SEQUENCE_NUMBER_STR} < {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} < {}))",
1462 tx_seq, tx_seq, event_seq
1463 )
1464 } else {
1465 format!(
1466 "AND ({TX_SEQUENCE_NUMBER_STR} > {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} > {}))",
1467 tx_seq, tx_seq, event_seq
1468 )
1469 };
1470 let order_clause = if descending_order {
1471 format!("{TX_SEQUENCE_NUMBER_STR} DESC, {EVENT_SEQUENCE_NUMBER_STR} DESC")
1472 } else {
1473 format!("{TX_SEQUENCE_NUMBER_STR} ASC, {EVENT_SEQUENCE_NUMBER_STR} ASC")
1474 };
1475
1476 format!(
1477 "
1478 SELECT * FROM events \
1479 WHERE {} {} \
1480 ORDER BY {} \
1481 LIMIT {}
1482 ",
1483 main_where_clause, cursor_clause, order_clause, limit,
1484 )
1485 };
1486 tracing::debug!("query events: {}", query);
1487 let pool = self.get_pool();
1488 let stored_events = run_query_async!(&pool, move |conn| diesel::sql_query(query)
1489 .load::<StoredEvent>(conn))?;
1490
1491 let mut iota_event_futures = vec![];
1492 for stored_event in stored_events {
1493 iota_event_futures.push(tokio::task::spawn(
1494 stored_event.try_into_iota_event(self.package_resolver.clone()),
1495 ));
1496 }
1497
1498 let iota_events = futures::future::join_all(iota_event_futures)
1499 .await
1500 .into_iter()
1501 .collect::<Result<Vec<_>, _>>()
1502 .tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))?
1503 .into_iter()
1504 .collect::<Result<Vec<_>, _>>()
1505 .tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?;
1506 Ok(iota_events)
1507 }
1508
1509 pub async fn get_dynamic_fields_in_blocking_task(
1510 &self,
1511 parent_object_id: ObjectID,
1512 cursor: Option<ObjectID>,
1513 limit: usize,
1514 ) -> Result<Vec<DynamicFieldInfo>, IndexerError> {
1515 let stored_objects = self
1516 .spawn_blocking(move |this| {
1517 this.get_dynamic_fields_raw(parent_object_id, cursor, limit)
1518 })
1519 .await?;
1520
1521 let mut df_futures = vec![];
1522 let indexer_reader_arc = Arc::new(self.clone());
1523 for stored_object in stored_objects {
1524 let indexer_reader_arc_clone = Arc::clone(&indexer_reader_arc);
1525 df_futures.push(tokio::task::spawn(async move {
1526 indexer_reader_arc_clone
1527 .try_create_dynamic_field_info(stored_object)
1528 .await
1529 }));
1530 }
1531 let df_infos = futures::future::try_join_all(df_futures)
1532 .await
1533 .tap_err(|e| tracing::error!("Error joining DF futures: {:?}", e))?
1534 .into_iter()
1535 .collect::<Result<Vec<_>, _>>()
1536 .tap_err(|e| {
1537 tracing::error!(
1538 "Error calling DF try_create_dynamic_field_info function: {:?}",
1539 e
1540 )
1541 })?
1542 .into_iter()
1543 .flatten()
1544 .collect::<Vec<_>>();
1545 Ok(df_infos)
1546 }
1547
1548 pub async fn get_dynamic_fields_raw_in_blocking_task(
1549 &self,
1550 parent_object_id: ObjectID,
1551 cursor: Option<ObjectID>,
1552 limit: usize,
1553 ) -> Result<Vec<StoredObject>, IndexerError> {
1554 self.spawn_blocking(move |this| {
1555 this.get_dynamic_fields_raw(parent_object_id, cursor, limit)
1556 })
1557 .await
1558 }
1559
1560 fn get_dynamic_fields_raw(
1561 &self,
1562 parent_object_id: ObjectID,
1563 cursor: Option<ObjectID>,
1564 limit: usize,
1565 ) -> Result<Vec<StoredObject>, IndexerError> {
1566 let objects: Vec<StoredObject> = run_query!(&self.pool, |conn| {
1567 let mut query = objects::dsl::objects
1568 .filter(objects::dsl::owner_type.eq(OwnerType::Object as i16))
1569 .filter(objects::dsl::owner_id.eq(parent_object_id.to_vec()))
1570 .order(objects::dsl::object_id.asc())
1571 .limit(limit as i64)
1572 .into_boxed();
1573 if let Some(object_cursor) = cursor {
1574 query = query.filter(objects::dsl::object_id.gt(object_cursor.to_vec()));
1575 }
1576 query.load::<StoredObject>(conn)
1577 })?;
1578
1579 Ok(objects)
1580 }
1581
1582 async fn try_create_dynamic_field_info(
1583 &self,
1584 stored_object: StoredObject,
1585 ) -> Result<Option<DynamicFieldInfo>, IndexerError> {
1586 if stored_object.df_kind.is_none() {
1587 return Ok(None);
1588 }
1589
1590 let object: Object = stored_object.try_into()?;
1591 let Some(move_object) = object.data.try_as_move().cloned() else {
1592 return Err(IndexerError::ResolveMoveStruct(
1593 "Object is not a MoveObject".to_string(),
1594 ));
1595 };
1596 let type_tag: TypeTag = move_object.type_().clone().into();
1597 let layout = self
1598 .package_resolver
1599 .type_layout(type_tag.clone())
1600 .await
1601 .map_err(|e| {
1602 IndexerError::ResolveMoveStruct(format!(
1603 "Failed to get type layout for type {}: {e}",
1604 type_tag.to_canonical_display(true),
1605 ))
1606 })?;
1607
1608 let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
1609 .tap_err(|e| tracing::warn!("{e}"))?;
1610
1611 let type_ = field.kind;
1612 let name_type: TypeTag = field.name_layout.into();
1613 let bcs_name = field.name_bytes.to_owned();
1614
1615 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
1616 .tap_err(|e| tracing::warn!("{e}"))?;
1617
1618 let name = DynamicFieldName {
1619 type_: name_type,
1620 value: IotaMoveValue::from(name_value).to_json_value(),
1621 };
1622
1623 let value_metadata = field.value_metadata().map_err(|e| {
1624 tracing::warn!("{e}");
1625 IndexerError::Uncategorized(anyhow!(e))
1626 })?;
1627
1628 Ok(Some(match value_metadata {
1629 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
1630 name,
1631 bcs_name,
1632 type_,
1633 object_type: object_type.to_canonical_string(true),
1634 object_id: object.id(),
1635 version: object.version(),
1636 digest: object.digest(),
1637 },
1638
1639 DFV::ValueMetadata::DynamicObjectField(object_id) => {
1640 let object = self
1641 .get_object_in_blocking_task(object_id)
1642 .await?
1643 .ok_or_else(|| {
1644 IndexerError::Uncategorized(anyhow!(
1645 "Failed to find object_id {} when trying to create dynamic field info",
1646 object_id.to_canonical_display(true),
1647 ))
1648 })?;
1649
1650 let object_type = object.data.type_().unwrap().clone();
1651 DynamicFieldInfo {
1652 name,
1653 bcs_name,
1654 type_,
1655 object_type: object_type.to_canonical_string(true),
1656 object_id,
1657 version: object.version(),
1658 digest: object.digest(),
1659 }
1660 }
1661 }))
1662 }
1663
1664 pub async fn bcs_name_from_dynamic_field_name(
1665 &self,
1666 name: &DynamicFieldName,
1667 ) -> Result<Vec<u8>, IndexerError> {
1668 let move_type_layout = self
1669 .package_resolver()
1670 .type_layout(name.type_.clone())
1671 .await
1672 .map_err(|e| {
1673 IndexerError::ResolveMoveStruct(format!(
1674 "Failed to get type layout for type {}: {}",
1675 name.type_, e
1676 ))
1677 })?;
1678 let iota_json_value = iota_json::IotaJsonValue::new(name.value.clone())?;
1679 let name_bcs_value = iota_json_value.to_bcs_bytes(&move_type_layout)?;
1680 Ok(name_bcs_value)
1681 }
1682
1683 pub async fn get_display_object_by_type(
1684 &self,
1685 object_type: &move_core_types::language_storage::StructTag,
1686 ) -> Result<Option<iota_types::display::DisplayVersionUpdatedEvent>, IndexerError> {
1687 let object_type = object_type.to_canonical_string(true);
1688 self.spawn_blocking(move |this| this.get_display_update_event(object_type))
1689 .await
1690 }
1691
1692 fn get_display_update_event(
1693 &self,
1694 object_type: String,
1695 ) -> Result<Option<iota_types::display::DisplayVersionUpdatedEvent>, IndexerError> {
1696 let stored_display = run_query!(&self.pool, |conn| {
1697 display::table
1698 .filter(display::object_type.eq(object_type))
1699 .first::<StoredDisplay>(conn)
1700 .optional()
1701 })?;
1702
1703 let stored_display = match stored_display {
1704 Some(display) => display,
1705 None => return Ok(None),
1706 };
1707
1708 let display_update = stored_display.to_display_update_event()?;
1709
1710 Ok(Some(display_update))
1711 }
1712
1713 pub async fn get_owned_coins_in_blocking_task(
1714 &self,
1715 owner: IotaAddress,
1716 coin_type: Option<String>,
1717 cursor: ObjectID,
1718 limit: usize,
1719 ) -> Result<Vec<IotaCoin>, IndexerError> {
1720 self.spawn_blocking(move |this| this.get_owned_coins(owner, coin_type, cursor, limit))
1721 .await
1722 }
1723
1724 fn get_owned_coins(
1725 &self,
1726 owner: IotaAddress,
1727 coin_type: Option<String>,
1729 cursor: ObjectID,
1730 limit: usize,
1731 ) -> Result<Vec<IotaCoin>, IndexerError> {
1732 let mut query = objects::dsl::objects
1733 .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
1734 .filter(objects::dsl::owner_id.eq(owner.to_vec()))
1735 .filter(objects::dsl::object_id.gt(cursor.to_vec()))
1736 .into_boxed();
1737 if let Some(coin_type) = coin_type {
1738 query = query.filter(objects::dsl::coin_type.eq(Some(coin_type)));
1739 } else {
1740 query = query.filter(objects::dsl::coin_type.is_not_null());
1741 }
1742 query = query
1743 .order(objects::dsl::object_id.asc())
1744 .limit(limit as i64);
1745
1746 let stored_objects = run_query!(&self.pool, |conn| query.load::<StoredObject>(conn))?;
1747
1748 stored_objects
1749 .into_iter()
1750 .map(|o| o.try_into())
1751 .collect::<IndexerResult<Vec<_>>>()
1752 }
1753
1754 pub async fn get_coin_balances_in_blocking_task(
1755 &self,
1756 owner: IotaAddress,
1757 coin_type: Option<String>,
1759 ) -> Result<Vec<Balance>, IndexerError> {
1760 self.spawn_blocking(move |this| this.get_coin_balances(owner, coin_type))
1761 .await
1762 }
1763
1764 fn get_coin_balances(
1765 &self,
1766 owner: IotaAddress,
1767 coin_type: Option<String>,
1769 ) -> Result<Vec<Balance>, IndexerError> {
1770 let coin_type_filter = if let Some(coin_type) = coin_type {
1771 format!("= '{}'", coin_type)
1772 } else {
1773 "IS NOT NULL".to_string()
1774 };
1775 let query = format!(
1777 "
1778 SELECT coin_type, \
1779 CAST(COUNT(*) AS BIGINT) AS coin_num, \
1780 CAST(SUM(coin_balance) AS BIGINT) AS coin_balance \
1781 FROM objects \
1782 WHERE owner_type = {} \
1783 AND owner_id = '\\x{}'::BYTEA \
1784 AND coin_type {} \
1785 GROUP BY coin_type \
1786 ORDER BY coin_type ASC
1787 ",
1788 OwnerType::Address as i16,
1789 Hex::encode(owner.to_vec()),
1790 coin_type_filter,
1791 );
1792
1793 tracing::debug!("get coin balances query: {query}");
1794 let coin_balances = run_query!(&self.pool, |conn| diesel::sql_query(query)
1795 .load::<CoinBalance>(conn))?;
1796 coin_balances
1797 .into_iter()
1798 .map(|cb| cb.try_into())
1799 .collect::<IndexerResult<Vec<_>>>()
1800 }
1801
1802 pub fn get_latest_network_metrics(&self) -> IndexerResult<NetworkMetrics> {
1803 let mut metrics = run_query!(&self.pool, |conn| {
1804 diesel::sql_query("SELECT * FROM network_metrics;")
1805 .get_result::<StoredNetworkMetrics>(conn)
1806 })?;
1807 if metrics.total_addresses == -1 {
1808 metrics.total_addresses = run_query!(&self.pool, |conn| {
1811 addresses::dsl::addresses.count().get_result::<i64>(conn)
1812 })?;
1813 }
1814 if metrics.total_packages == -1 {
1815 metrics.total_packages = run_query!(&self.pool, |conn| {
1818 packages::dsl::packages.count().get_result::<i64>(conn)
1819 })?;
1820 }
1821 Ok(metrics.into())
1822 }
1823
1824 pub fn get_latest_move_call_metrics(&self) -> IndexerResult<MoveCallMetrics> {
1826 let latest_3_days = self.get_latest_move_call_metrics_by_day(3)?;
1827 let latest_7_days = self.get_latest_move_call_metrics_by_day(7)?;
1828 let latest_30_days = self.get_latest_move_call_metrics_by_day(30)?;
1829
1830 let rank_3_days = latest_3_days
1832 .into_iter()
1833 .sorted_by(|a, b| b.1.cmp(&a.1))
1834 .collect::<Vec<_>>();
1835 let rank_7_days = latest_7_days
1836 .into_iter()
1837 .sorted_by(|a, b| b.1.cmp(&a.1))
1838 .collect::<Vec<_>>();
1839 let rank_30_days = latest_30_days
1840 .into_iter()
1841 .sorted_by(|a, b| b.1.cmp(&a.1))
1842 .collect::<Vec<_>>();
1843
1844 Ok(MoveCallMetrics {
1845 rank_3_days,
1846 rank_7_days,
1847 rank_30_days,
1848 })
1849 }
1850
1851 pub fn get_latest_move_call_metrics_by_day(
1853 &self,
1854 day_value: i64,
1855 ) -> IndexerResult<Vec<(MoveFunctionName, usize)>> {
1856 let query = "
1857 SELECT id, epoch, day, move_package, move_module, move_function, count
1858 FROM move_call_metrics
1859 WHERE day = $1
1860 AND epoch = (SELECT MAX(epoch) FROM move_call_metrics WHERE day = $1)
1861 ORDER BY count DESC
1862 LIMIT 10
1863 ";
1864
1865 let queried_metrics = run_query!(&self.pool, |conn| sql_query(query)
1866 .bind::<BigInt, _>(day_value)
1867 .load::<QueriedMoveCallMetrics>(conn))?;
1868
1869 let metrics = queried_metrics
1870 .into_iter()
1871 .map(|m| {
1872 m.try_into()
1873 .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))
1874 })
1875 .collect::<Result<Vec<_>, _>>()?;
1876
1877 Ok(metrics)
1878 }
1879
1880 pub fn get_latest_address_metrics(&self) -> IndexerResult<AddressMetrics> {
1881 let stored_address_metrics = run_query!(&self.pool, |conn| {
1882 address_metrics::table
1883 .order(address_metrics::dsl::checkpoint.desc())
1884 .first::<StoredAddressMetrics>(conn)
1885 })?;
1886 Ok(stored_address_metrics.into())
1887 }
1888
1889 pub fn get_checkpoint_address_metrics(
1890 &self,
1891 checkpoint_seq: u64,
1892 ) -> IndexerResult<AddressMetrics> {
1893 let stored_address_metrics = run_query!(&self.pool, |conn| {
1894 address_metrics::table
1895 .filter(address_metrics::dsl::checkpoint.eq(checkpoint_seq as i64))
1896 .first::<StoredAddressMetrics>(conn)
1897 })?;
1898 Ok(stored_address_metrics.into())
1899 }
1900
1901 pub fn get_all_epoch_address_metrics(
1902 &self,
1903 descending_order: Option<bool>,
1904 ) -> IndexerResult<Vec<AddressMetrics>> {
1905 let is_descending = descending_order.unwrap_or_default();
1906 let epoch_address_metrics_query = format!(
1907 "WITH ranked_rows AS (
1908 SELECT
1909 checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses,
1910 row_number() OVER(PARTITION BY epoch ORDER BY checkpoint DESC) as row_num
1911 FROM
1912 address_metrics
1913 )
1914 SELECT
1915 checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses
1916 FROM ranked_rows
1917 WHERE row_num = 1 ORDER BY epoch {}",
1918 if is_descending { "DESC" } else { "ASC" },
1919 );
1920 let epoch_address_metrics = run_query!(&self.pool, |conn| {
1921 diesel::sql_query(epoch_address_metrics_query).load::<StoredAddressMetrics>(conn)
1922 })?;
1923
1924 Ok(epoch_address_metrics
1925 .into_iter()
1926 .map(|stored_address_metrics| stored_address_metrics.into())
1927 .collect())
1928 }
1929
1930 pub(crate) async fn get_display_fields(
1931 &self,
1932 original_object: &iota_types::object::Object,
1933 original_layout: &Option<MoveStructLayout>,
1934 ) -> Result<DisplayFieldsResponse, IndexerError> {
1935 let (object_type, layout) = if let Some((object_type, layout)) =
1936 iota_json_rpc::read_api::get_object_type_and_struct(original_object, original_layout)
1937 .map_err(|e| IndexerError::Generic(e.to_string()))?
1938 {
1939 (object_type, layout)
1940 } else {
1941 return Ok(DisplayFieldsResponse {
1942 data: None,
1943 error: None,
1944 });
1945 };
1946
1947 if let Some(display_object) = self.get_display_object_by_type(&object_type).await? {
1948 return iota_json_rpc::read_api::get_rendered_fields(display_object.fields, &layout)
1949 .map_err(|e| IndexerError::Generic(e.to_string()));
1950 }
1951 Ok(DisplayFieldsResponse {
1952 data: None,
1953 error: None,
1954 })
1955 }
1956
1957 pub async fn get_coin_metadata_in_blocking_task(
1958 &self,
1959 coin_struct: StructTag,
1960 ) -> Result<Option<IotaCoinMetadata>, IndexerError> {
1961 self.spawn_blocking(move |this| this.get_coin_metadata(coin_struct))
1962 .await
1963 }
1964
1965 fn get_coin_metadata(
1966 &self,
1967 coin_struct: StructTag,
1968 ) -> Result<Option<IotaCoinMetadata>, IndexerError> {
1969 let package_id = coin_struct.address.into();
1970 let coin_metadata_type =
1971 CoinMetadata::type_(coin_struct).to_canonical_string(true);
1972 let coin_metadata_obj_id = *self
1973 .package_obj_type_cache
1974 .lock()
1975 .unwrap()
1976 .cache_get_or_set_with(format!("{}{}", package_id, coin_metadata_type), || {
1977 get_single_obj_id_from_package_publish(self, package_id, coin_metadata_type.clone())
1978 .unwrap()
1979 });
1980 if let Some(id) = coin_metadata_obj_id {
1981 let metadata_object = self.get_object(&id, None)?;
1982 Ok(metadata_object.and_then(|v| IotaCoinMetadata::try_from(v).ok()))
1983 } else {
1984 Ok(None)
1985 }
1986 }
1987
1988 pub async fn get_total_supply_in_blocking_task(
1989 &self,
1990 coin_struct: StructTag,
1991 ) -> Result<Supply, IndexerError> {
1992 self.spawn_blocking(move |this| this.get_total_supply(coin_struct))
1993 .await
1994 }
1995
1996 fn get_total_supply(&self, coin_struct: StructTag) -> Result<Supply, IndexerError> {
1997 let package_id = coin_struct.address.into();
1998 let treasury_cap_type =
1999 TreasuryCap::type_(coin_struct).to_canonical_string(true);
2000 let treasury_cap_obj_id = self
2001 .package_obj_type_cache
2002 .lock()
2003 .unwrap()
2004 .cache_get_or_set_with(format!("{}{}", package_id, treasury_cap_type), || {
2005 get_single_obj_id_from_package_publish(self, package_id, treasury_cap_type.clone())
2006 .unwrap()
2007 })
2008 .ok_or(IndexerError::Generic(format!(
2009 "Cannot find treasury cap for type {}",
2010 treasury_cap_type
2011 )))?;
2012 let treasury_cap_obj_object =
2013 self.get_object(&treasury_cap_obj_id, None)?
2014 .ok_or(IndexerError::Generic(format!(
2015 "Cannot find treasury cap object with id {}",
2016 treasury_cap_obj_id
2017 )))?;
2018 Ok(TreasuryCap::try_from(treasury_cap_obj_object)?.total_supply)
2019 }
2020
2021 pub fn get_consistent_read_range(&self) -> Result<(i64, i64), IndexerError> {
2022 let latest_checkpoint_sequence = run_query!(&self.pool, |conn| {
2023 checkpoints::table
2024 .select(checkpoints::sequence_number)
2025 .order(checkpoints::sequence_number.desc())
2026 .first::<i64>(conn)
2027 .optional()
2028 })?
2029 .unwrap_or_default();
2030 let latest_object_snapshot_checkpoint_sequence = run_query!(&self.pool, |conn| {
2031 objects_snapshot::table
2032 .select(objects_snapshot::checkpoint_sequence_number)
2033 .order(objects_snapshot::checkpoint_sequence_number.desc())
2034 .first::<i64>(conn)
2035 .optional()
2036 })?
2037 .unwrap_or_default();
2038 Ok((
2039 latest_object_snapshot_checkpoint_sequence,
2040 latest_checkpoint_sequence,
2041 ))
2042 }
2043
2044 pub fn package_resolver(&self) -> PackageResolver {
2045 self.package_resolver.clone()
2046 }
2047
2048 pub async fn pending_active_validators(
2049 &self,
2050 ) -> Result<Vec<IotaValidatorSummary>, IndexerError> {
2051 self.spawn_blocking(move |this| {
2052 iota_types::iota_system_state::get_iota_system_state(&this)
2053 .and_then(|system_state| system_state.get_pending_active_validators(&this))
2054 })
2055 .await
2056 .map_err(Into::into)
2057 }
2058
2059 pub fn get_participation_metrics(&self) -> IndexerResult<ParticipationMetrics> {
2063 run_query!(&self.pool, |conn| {
2064 diesel::sql_query("SELECT * FROM participation_metrics")
2065 .get_result::<StoredParticipationMetrics>(conn)
2066 })
2067 .map(Into::into)
2068 }
2069}
2070
2071impl iota_types::storage::ObjectStore for IndexerReader {
2072 fn get_object(
2073 &self,
2074 object_id: &ObjectID,
2075 ) -> Result<Option<iota_types::object::Object>, iota_types::storage::error::Error> {
2076 self.get_object(object_id, None)
2077 .map_err(iota_types::storage::error::Error::custom)
2078 }
2079
2080 fn get_object_by_key(
2081 &self,
2082 object_id: &ObjectID,
2083 version: iota_types::base_types::VersionNumber,
2084 ) -> Result<Option<iota_types::object::Object>, iota_types::storage::error::Error> {
2085 self.get_object(object_id, Some(version))
2086 .map_err(iota_types::storage::error::Error::custom)
2087 }
2088}
2089
2090fn get_single_obj_id_from_package_publish(
2091 reader: &IndexerReader,
2092 package_id: ObjectID,
2093 obj_type: String,
2094) -> Result<Option<ObjectID>, IndexerError> {
2095 let publish_txn_effects_opt = if is_system_package(package_id) {
2096 Some(reader.get_transaction_effects_with_sequence_number(0))
2097 } else {
2098 reader.get_object(&package_id, None)?.map(|o| {
2099 let publish_txn_digest = o.previous_transaction;
2100 reader.get_transaction_effects_with_digest(publish_txn_digest)
2101 })
2102 };
2103 if let Some(publish_txn_effects) = publish_txn_effects_opt {
2104 let created_objs = publish_txn_effects?
2105 .created()
2106 .iter()
2107 .map(|o| o.object_id())
2108 .collect::<Vec<_>>();
2109 let obj_ids_with_type =
2110 reader.filter_object_id_with_type(created_objs, obj_type.clone())?;
2111 if obj_ids_with_type.len() == 1 {
2112 Ok(Some(obj_ids_with_type[0]))
2113 } else if obj_ids_with_type.is_empty() {
2114 Ok(None)
2117 } else {
2118 tracing::error!(
2121 "There are more than one objects found for type {}",
2122 obj_type
2123 );
2124 Ok(None)
2125 }
2126 } else {
2127 Ok(None)
2129 }
2130}