1use core::result::Result::Ok;
5use std::{
6 any::Any as StdAny,
7 collections::{BTreeMap, HashMap},
8 time::Duration,
9};
10
11use async_trait::async_trait;
12use diesel::{
13 ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
14 dsl::{max, min, sql},
15 sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
16 upsert::excluded,
17};
18use downcast::Any;
19use iota_protocol_config::ProtocolConfig;
20use iota_types::{
21 base_types::ObjectID,
22 digests::{ChainIdentifier, CheckpointDigest},
23 messages_checkpoint::CheckpointSequenceNumber,
24};
25use itertools::Itertools;
26use strum::IntoEnumIterator;
27use tap::TapFallible;
28use tracing::info;
29
30use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
31use crate::{
32 blocking_call_is_ok_or_panic,
33 db::ConnectionPool,
34 errors::{Context, IndexerError, IndexerResult},
35 ingestion::{
36 common::{
37 persist::{CommitterWatermark, ObjectsSnapshotHandlerTables},
38 prepare::{
39 CheckpointObjectChanges, LiveObject, RemovedObject,
40 retain_latest_objects_from_checkpoint_batch,
41 },
42 },
43 primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
44 },
45 insert_or_ignore_into,
46 metrics::IndexerMetrics,
47 models::{
48 checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
49 display::StoredDisplay,
50 epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
51 events::StoredEvent,
52 obj_indices::StoredObjectVersion,
53 objects::{
54 StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
55 StoredObjects,
56 },
57 packages::StoredPackage,
58 transactions::{
59 CheckpointTxGlobalOrder, IndexStatus, OptimisticTransaction, StoredTransaction,
60 },
61 tx_indices::TxIndexSplit,
62 watermarks::StoredWatermark,
63 },
64 on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
65 persist_chunk_into_table_in_existing_connection,
66 pruning::pruner::PrunableTable,
67 read_only_blocking, run_query, run_query_with_retry,
68 schema::{
69 chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
70 event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
71 event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
72 objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
73 transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
74 tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
75 tx_wrapped_or_deleted_objects, watermarks,
76 },
77 store::IndexerStore,
78 transactional_blocking_with_retry,
79 types::{
80 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
81 IndexedPackage, IndexedTransaction, TxIndex,
82 },
83};
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub struct TxGlobalOrderCursor {
89 pub global_sequence_number: i64,
90 pub optimistic_sequence_number: i64,
91}
92
93#[macro_export]
94macro_rules! chunk {
95 ($data: expr, $size: expr) => {{
96 $data
97 .into_iter()
98 .chunks($size)
99 .into_iter()
100 .map(|c| c.collect())
101 .collect::<Vec<Vec<_>>>()
102 }};
103}
104
105macro_rules! prune_tx_or_event_indice_table {
106 ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
107 diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
108 .execute($conn)
109 .map_err(IndexerError::from)
110 .context($context_msg)?;
111 };
112}
113
114const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
119const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
121const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
125const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
126
127#[derive(Clone)]
128pub struct PgIndexerStoreConfig {
129 pub parallel_chunk_size: usize,
130 pub parallel_objects_chunk_size: usize,
131}
132
133pub struct PgIndexerStore {
134 blocking_cp: ConnectionPool,
135 metrics: IndexerMetrics,
136 partition_manager: PgPartitionManager,
137 config: PgIndexerStoreConfig,
138}
139
140impl Clone for PgIndexerStore {
141 fn clone(&self) -> PgIndexerStore {
142 Self {
143 blocking_cp: self.blocking_cp.clone(),
144 metrics: self.metrics.clone(),
145 partition_manager: self.partition_manager.clone(),
146 config: self.config.clone(),
147 }
148 }
149}
150
151impl PgIndexerStore {
152 pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
153 let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
154 .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
155 .parse::<usize>()
156 .unwrap();
157 let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
158 .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
159 .parse::<usize>()
160 .unwrap();
161 let partition_manager = PgPartitionManager::new(blocking_cp.clone())
162 .expect("failed to initialize partition manager");
163 let config = PgIndexerStoreConfig {
164 parallel_chunk_size,
165 parallel_objects_chunk_size,
166 };
167
168 Self {
169 blocking_cp,
170 metrics,
171 partition_manager,
172 config,
173 }
174 }
175
176 pub fn get_metrics(&self) -> IndexerMetrics {
177 self.metrics.clone()
178 }
179
180 pub fn blocking_cp(&self) -> ConnectionPool {
181 self.blocking_cp.clone()
182 }
183
184 pub(crate) async fn get_latest_epoch_id_in_blocking_worker(
185 &self,
186 ) -> Result<Option<u64>, IndexerError> {
187 self.execute_in_blocking_worker(move |this| this.get_latest_epoch_id())
188 .await
189 }
190
191 pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
192 read_only_blocking!(&self.blocking_cp, |conn| {
193 epochs::dsl::epochs
194 .select(max(epochs::epoch))
195 .first::<Option<i64>>(conn)
196 .map(|v| v.map(|v| v as u64))
197 })
198 .context("Failed reading latest epoch id from PostgresDB")
199 }
200
201 pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
203 let start = read_only_blocking!(&self.blocking_cp, |conn| {
206 protocol_configs::dsl::protocol_configs
207 .select(max(protocol_configs::protocol_version))
208 .first::<Option<i64>>(conn)
209 })
210 .context("Failed reading latest protocol version from PostgresDB")?
211 .map_or(1, |v| v + 1);
212
213 let end = read_only_blocking!(&self.blocking_cp, |conn| {
215 epochs::dsl::epochs
216 .select(max(epochs::protocol_version))
217 .first::<Option<i64>>(conn)
218 })
219 .context("Failed reading latest epoch protocol version from PostgresDB")?
220 .unwrap_or(1);
221 Ok((start, end))
222 }
223
224 pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
225 read_only_blocking!(&self.blocking_cp, |conn| {
226 chain_identifier::dsl::chain_identifier
227 .select(chain_identifier::checkpoint_digest)
228 .first::<Vec<u8>>(conn)
229 .optional()
230 })
231 .context("Failed reading chain id from PostgresDB")
232 }
233
234 fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
235 read_only_blocking!(&self.blocking_cp, |conn| {
236 checkpoints::dsl::checkpoints
237 .select(max(checkpoints::sequence_number))
238 .first::<Option<i64>>(conn)
239 .map(|v| v.map(|v| v as u64))
240 })
241 .context("Failed reading latest checkpoint sequence number from PostgresDB")
242 }
243
244 fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
245 read_only_blocking!(&self.blocking_cp, |conn| {
246 checkpoints::dsl::checkpoints
247 .select((
248 min(checkpoints::sequence_number),
249 max(checkpoints::sequence_number),
250 ))
251 .first::<(Option<i64>, Option<i64>)>(conn)
252 .map(|(min, max)| {
253 (
254 min.unwrap_or_default() as u64,
255 max.unwrap_or_default() as u64,
256 )
257 })
258 })
259 .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
260 }
261
262 fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
263 read_only_blocking!(&self.blocking_cp, |conn| {
264 epochs::dsl::epochs
265 .select((min(epochs::epoch), max(epochs::epoch)))
266 .first::<(Option<i64>, Option<i64>)>(conn)
267 .map(|(min, max)| {
268 (
269 min.unwrap_or_default() as u64,
270 max.unwrap_or_default() as u64,
271 )
272 })
273 })
274 .context("Failed reading min and max epoch numbers from PostgresDB")
275 }
276
277 fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
278 read_only_blocking!(&self.blocking_cp, |conn| {
279 pruner_cp_watermark::dsl::pruner_cp_watermark
280 .select(min(pruner_cp_watermark::checkpoint_sequence_number))
281 .first::<Option<i64>>(conn)
282 .map(|v| v.unwrap_or_default() as u64)
283 })
284 .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
285 }
286
287 fn get_checkpoint_range_for_epoch(
288 &self,
289 epoch: u64,
290 ) -> Result<(u64, Option<u64>), IndexerError> {
291 read_only_blocking!(&self.blocking_cp, |conn| {
292 epochs::dsl::epochs
293 .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
294 .filter(epochs::epoch.eq(epoch as i64))
295 .first::<(i64, Option<i64>)>(conn)
296 .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
297 })
298 .context(
299 format!("failed reading checkpoint range from PostgresDB for epoch {epoch}").as_str(),
300 )
301 }
302
303 fn get_transaction_range_for_checkpoint(
304 &self,
305 checkpoint: u64,
306 ) -> Result<(u64, u64), IndexerError> {
307 read_only_blocking!(&self.blocking_cp, |conn| {
308 pruner_cp_watermark::dsl::pruner_cp_watermark
309 .select((
310 pruner_cp_watermark::min_tx_sequence_number,
311 pruner_cp_watermark::max_tx_sequence_number,
312 ))
313 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
314 .first::<(i64, i64)>(conn)
315 .map(|(min, max)| (min as u64, max as u64))
316 })
317 .context(
318 format!("failed reading transaction range from PostgresDB for checkpoint {checkpoint}")
319 .as_str(),
320 )
321 }
322
323 pub(crate) async fn get_global_order_for_tx_seq_in_blocking_worker(
324 &self,
325 tx_seq: i64,
326 ) -> Result<TxGlobalOrderCursor, IndexerError> {
327 self.execute_in_blocking_worker(move |this| this.get_global_order_for_tx_seq(tx_seq))
328 .await
329 }
330
331 fn get_global_order_for_tx_seq(
332 &self,
333 tx_seq: i64,
334 ) -> Result<TxGlobalOrderCursor, IndexerError> {
335 let result = read_only_blocking!(&self.blocking_cp, |conn| {
336 tx_global_order::dsl::tx_global_order
337 .select((
338 tx_global_order::global_sequence_number,
339 tx_global_order::optimistic_sequence_number,
340 ))
341 .filter(tx_global_order::chk_tx_sequence_number.eq(tx_seq))
342 .first::<(i64, i64)>(conn)
343 })
344 .context(
345 format!("failed reading global sequence number from PostgresDB for tx seq {tx_seq}")
346 .as_str(),
347 )?;
348 let (global_sequence_number, optimistic_sequence_number) = result;
349 Ok(TxGlobalOrderCursor {
350 global_sequence_number,
351 optimistic_sequence_number,
352 })
353 }
354
355 pub(crate) async fn prune_optimistic_transactions_up_to_in_blocking_worker(
356 &self,
357 to: TxGlobalOrderCursor,
358 limit: i64,
359 ) -> IndexerResult<usize> {
360 self.execute_in_blocking_worker(move |this| {
361 this.prune_optimistic_transactions_up_to(to, limit)
362 })
363 .await
364 }
365
366 fn prune_optimistic_transactions_up_to(
367 &self,
368 to: TxGlobalOrderCursor,
369 limit: i64,
370 ) -> IndexerResult<usize> {
371 transactional_blocking_with_retry!(
372 &self.blocking_cp,
373 |conn| {
374 let sql = r#"
375 WITH ids_to_delete AS (
376 SELECT global_sequence_number, optimistic_sequence_number
377 FROM optimistic_transactions
378 WHERE (global_sequence_number, optimistic_sequence_number) <= ($1, $2)
379 ORDER BY global_sequence_number, optimistic_sequence_number
380 FOR UPDATE LIMIT $3
381 )
382 DELETE FROM optimistic_transactions otx
383 USING ids_to_delete
384 WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
385 (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
386 "#;
387 diesel::sql_query(sql)
388 .bind::<BigInt, _>(to.global_sequence_number)
389 .bind::<BigInt, _>(to.optimistic_sequence_number)
390 .bind::<BigInt, _>(limit)
391 .execute(conn)
392 .map_err(IndexerError::from)
393 .context(
394 format!("failed to prune optimistic_transactions table to {to:?} with limit {limit}").as_str(),
395 )
396 },
397 PG_DB_COMMIT_SLEEP_DURATION
398 )
399 }
400
401 fn get_latest_object_snapshot_watermark(
402 &self,
403 ) -> Result<Option<CommitterWatermark>, IndexerError> {
404 read_only_blocking!(&self.blocking_cp, |conn| {
405 watermarks::table
406 .select((
407 watermarks::epoch_hi_inclusive,
408 watermarks::checkpoint_hi_inclusive,
409 watermarks::tx_hi,
410 ))
411 .filter(
412 watermarks::entity
413 .eq(ObjectsSnapshotHandlerTables::ObjectsSnapshot.to_string()),
414 )
415 .first::<(i64, i64, i64)>(conn)
416 .optional()
418 .map(|v| {
419 v.map(|(epoch, cp, tx)| CommitterWatermark {
420 epoch_hi_inclusive: epoch as u64,
421 checkpoint_hi_inclusive: cp as u64,
422 tx_hi: tx as u64,
423 })
424 })
425 })
426 .context("Failed reading latest object snapshot watermark from PostgresDB")
427 }
428
429 fn get_latest_object_snapshot_checkpoint_sequence_number(
430 &self,
431 ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
432 read_only_blocking!(&self.blocking_cp, |conn| {
433 objects_snapshot::table
434 .select(max(objects_snapshot::checkpoint_sequence_number))
435 .first::<Option<i64>>(conn)
436 .map(|v| v.map(|v| v as CheckpointSequenceNumber))
437 })
438 .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
439 }
440
441 fn persist_display_updates(
442 &self,
443 display_updates: BTreeMap<String, StoredDisplay>,
444 ) -> Result<(), IndexerError> {
445 transactional_blocking_with_retry!(
446 &self.blocking_cp,
447 {
448 let value = display_updates.values().collect::<Vec<_>>();
449 |conn| self.persist_displays_in_existing_transaction(conn, value)
450 },
451 PG_DB_COMMIT_SLEEP_DURATION
452 )?;
453
454 Ok(())
455 }
456
457 fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
458 let guard = self
459 .metrics
460 .checkpoint_db_commit_latency_objects_chunks
461 .start_timer();
462 let len = objects.len();
463 let raw_query = r#"
464 INSERT INTO objects (
465 object_id,
466 object_version,
467 object_digest,
468 owner_type,
469 owner_id,
470 object_type,
471 object_type_package,
472 object_type_module,
473 object_type_name,
474 serialized_object,
475 coin_type,
476 coin_balance,
477 df_kind
478 )
479 SELECT
480 u.object_id,
481 u.object_version,
482 u.object_digest,
483 u.owner_type,
484 u.owner_id,
485 u.object_type,
486 u.object_type_package,
487 u.object_type_module,
488 u.object_type_name,
489 u.serialized_object,
490 u.coin_type,
491 u.coin_balance,
492 u.df_kind
493 FROM UNNEST(
494 $1::BYTEA[],
495 $2::BIGINT[],
496 $3::BYTEA[],
497 $4::SMALLINT[],
498 $5::BYTEA[],
499 $6::TEXT[],
500 $7::BYTEA[],
501 $8::TEXT[],
502 $9::TEXT[],
503 $10::BYTEA[],
504 $11::TEXT[],
505 $12::BIGINT[],
506 $13::SMALLINT[],
507 $14::BYTEA[]
508 ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest)
509 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
510 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
511 ON CONFLICT (object_id) DO UPDATE
512 SET
513 object_version = EXCLUDED.object_version,
514 object_digest = EXCLUDED.object_digest,
515 owner_type = EXCLUDED.owner_type,
516 owner_id = EXCLUDED.owner_id,
517 object_type = EXCLUDED.object_type,
518 object_type_package = EXCLUDED.object_type_package,
519 object_type_module = EXCLUDED.object_type_module,
520 object_type_name = EXCLUDED.object_type_name,
521 serialized_object = EXCLUDED.serialized_object,
522 coin_type = EXCLUDED.coin_type,
523 coin_balance = EXCLUDED.coin_balance,
524 df_kind = EXCLUDED.df_kind
525 "#;
526 let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
527 .into_iter()
528 .map(LiveObject::split)
529 .map(|(indexed_object, tx_digest)| {
530 (
531 StoredObject::from(indexed_object),
532 tx_digest.into_inner().to_vec(),
533 )
534 })
535 .unzip();
536 let query = diesel::sql_query(raw_query)
537 .bind::<Array<Bytea>, _>(objects.object_ids)
538 .bind::<Array<BigInt>, _>(objects.object_versions)
539 .bind::<Array<Bytea>, _>(objects.object_digests)
540 .bind::<Array<SmallInt>, _>(objects.owner_types)
541 .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
542 .bind::<Array<Nullable<Text>>, _>(objects.object_types)
543 .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
544 .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
545 .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
546 .bind::<Array<Bytea>, _>(objects.serialized_objects)
547 .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
548 .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
549 .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
550 .bind::<Array<Bytea>, _>(tx_digests);
551 transactional_blocking_with_retry!(
552 &self.blocking_cp,
553 |conn| {
554 query.clone().execute(conn)?;
555 Ok::<(), IndexerError>(())
556 },
557 PG_DB_COMMIT_SLEEP_DURATION
558 )
559 .tap_ok(|_| {
560 let elapsed = guard.stop_and_record();
561 info!(elapsed, "Persisted {len} chunked objects");
562 })
563 .tap_err(|e| {
564 tracing::error!("failed to persist object mutations with error: {e}");
565 })
566 }
567
568 fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
569 let guard = self
570 .metrics
571 .checkpoint_db_commit_latency_objects_chunks
572 .start_timer();
573 let len = objects.len();
574 let raw_query = r#"
575 DELETE FROM objects
576 WHERE object_id IN (
577 SELECT u.object_id
578 FROM UNNEST(
579 $1::BYTEA[],
580 $2::BYTEA[]
581 ) AS u(object_id, tx_digest)
582 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
583 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
584 )
585 "#;
586 let (object_ids, tx_digests): (Vec<_>, Vec<_>) = objects
587 .into_iter()
588 .map(|removed_object| {
589 (
590 removed_object.object_id().to_vec(),
591 removed_object.transaction_digest.into_inner().to_vec(),
592 )
593 })
594 .unzip();
595 let query = diesel::sql_query(raw_query)
596 .bind::<Array<Bytea>, _>(object_ids)
597 .bind::<Array<Bytea>, _>(tx_digests);
598 transactional_blocking_with_retry!(
599 &self.blocking_cp,
600 |conn| {
601 query.clone().execute(conn)?;
602 Ok::<(), IndexerError>(())
603 },
604 PG_DB_COMMIT_SLEEP_DURATION
605 )
606 .tap_ok(|_| {
607 let elapsed = guard.stop_and_record();
608 info!(elapsed, "Deleted {len} chunked objects");
609 })
610 .tap_err(|e| {
611 tracing::error!("failed to persist object deletions with error: {e}");
612 })
613 }
614
615 fn persist_object_mutation_chunk_in_existing_transaction(
616 &self,
617 conn: &mut PgConnection,
618 mutated_object_mutation_chunk: Vec<StoredObject>,
619 ) -> Result<(), IndexerError> {
620 on_conflict_do_update!(
621 objects::table,
622 mutated_object_mutation_chunk,
623 objects::object_id,
624 (
625 objects::object_id.eq(excluded(objects::object_id)),
626 objects::object_version.eq(excluded(objects::object_version)),
627 objects::object_digest.eq(excluded(objects::object_digest)),
628 objects::owner_type.eq(excluded(objects::owner_type)),
629 objects::owner_id.eq(excluded(objects::owner_id)),
630 objects::object_type.eq(excluded(objects::object_type)),
631 objects::serialized_object.eq(excluded(objects::serialized_object)),
632 objects::coin_type.eq(excluded(objects::coin_type)),
633 objects::coin_balance.eq(excluded(objects::coin_balance)),
634 objects::df_kind.eq(excluded(objects::df_kind)),
635 ),
636 conn
637 );
638 Ok::<(), IndexerError>(())
639 }
640
641 fn persist_object_deletion_chunk_in_existing_transaction(
642 &self,
643 conn: &mut PgConnection,
644 deleted_objects_chunk: Vec<StoredDeletedObject>,
645 ) -> Result<(), IndexerError> {
646 diesel::delete(
647 objects::table.filter(
648 objects::object_id.eq_any(
649 deleted_objects_chunk
650 .iter()
651 .map(|o| o.object_id.clone())
652 .collect::<Vec<_>>(),
653 ),
654 ),
655 )
656 .execute(conn)
657 .map_err(IndexerError::from)
658 .context("Failed to write object deletion to PostgresDB")?;
659
660 Ok::<(), IndexerError>(())
661 }
662
663 fn backfill_objects_snapshot_chunk(
664 &self,
665 objects_snapshot: Vec<StoredObjectSnapshot>,
666 ) -> Result<(), IndexerError> {
667 let guard = self
668 .metrics
669 .checkpoint_db_commit_latency_objects_snapshot_chunks
670 .start_timer();
671 transactional_blocking_with_retry!(
672 &self.blocking_cp,
673 |conn| {
674 for objects_snapshot_chunk in
675 objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
676 {
677 on_conflict_do_update!(
678 objects_snapshot::table,
679 objects_snapshot_chunk,
680 objects_snapshot::object_id,
681 (
682 objects_snapshot::object_version
683 .eq(excluded(objects_snapshot::object_version)),
684 objects_snapshot::object_status
685 .eq(excluded(objects_snapshot::object_status)),
686 objects_snapshot::object_digest
687 .eq(excluded(objects_snapshot::object_digest)),
688 objects_snapshot::checkpoint_sequence_number
689 .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
690 objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
691 objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
692 objects_snapshot::object_type_package
693 .eq(excluded(objects_snapshot::object_type_package)),
694 objects_snapshot::object_type_module
695 .eq(excluded(objects_snapshot::object_type_module)),
696 objects_snapshot::object_type_name
697 .eq(excluded(objects_snapshot::object_type_name)),
698 objects_snapshot::object_type
699 .eq(excluded(objects_snapshot::object_type)),
700 objects_snapshot::serialized_object
701 .eq(excluded(objects_snapshot::serialized_object)),
702 objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
703 objects_snapshot::coin_balance
704 .eq(excluded(objects_snapshot::coin_balance)),
705 objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
706 ),
707 conn
708 );
709 }
710 Ok::<(), IndexerError>(())
711 },
712 PG_DB_COMMIT_SLEEP_DURATION
713 )
714 .tap_ok(|_| {
715 let elapsed = guard.stop_and_record();
716 info!(
717 elapsed,
718 "Persisted {} chunked objects snapshot",
719 objects_snapshot.len(),
720 );
721 })
722 .tap_err(|e| {
723 tracing::error!("failed to persist object snapshot with error: {e}");
724 })
725 }
726
727 fn persist_objects_history_chunk(
728 &self,
729 stored_objects_history: Vec<StoredHistoryObject>,
730 ) -> Result<(), IndexerError> {
731 let guard = self
732 .metrics
733 .checkpoint_db_commit_latency_objects_history_chunks
734 .start_timer();
735 transactional_blocking_with_retry!(
736 &self.blocking_cp,
737 |conn| {
738 for stored_objects_history_chunk in
739 stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
740 {
741 insert_or_ignore_into!(
742 objects_history::table,
743 stored_objects_history_chunk,
744 conn
745 );
746 }
747 Ok::<(), IndexerError>(())
748 },
749 PG_DB_COMMIT_SLEEP_DURATION
750 )
751 .tap_ok(|_| {
752 let elapsed = guard.stop_and_record();
753 info!(
754 elapsed,
755 "Persisted {} chunked objects history",
756 stored_objects_history.len(),
757 );
758 })
759 .tap_err(|e| {
760 tracing::error!("failed to persist object history with error: {e}");
761 })
762 }
763
764 fn persist_object_version_chunk(
765 &self,
766 object_versions: Vec<StoredObjectVersion>,
767 ) -> Result<(), IndexerError> {
768 let guard = self
769 .metrics
770 .checkpoint_db_commit_latency_objects_version_chunks
771 .start_timer();
772
773 transactional_blocking_with_retry!(
774 &self.blocking_cp,
775 |conn| {
776 for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
777 {
778 insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
779 }
780 Ok::<(), IndexerError>(())
781 },
782 PG_DB_COMMIT_SLEEP_DURATION
783 )
784 .tap_ok(|_| {
785 let elapsed = guard.stop_and_record();
786 info!(
787 elapsed,
788 "Persisted {} chunked object versions",
789 object_versions.len(),
790 );
791 })
792 .tap_err(|e| {
793 tracing::error!("failed to persist object versions with error: {e}");
794 })
795 }
796
797 fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
798 let Some(first_checkpoint) = checkpoints.first() else {
799 return Ok(());
800 };
801
802 if first_checkpoint.sequence_number == 0 {
805 let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
806 self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
807 transactional_blocking_with_retry!(
808 &self.blocking_cp,
809 |conn| {
810 let checkpoint_digest =
811 first_checkpoint.checkpoint_digest.into_inner().to_vec();
812 insert_or_ignore_into!(
813 chain_identifier::table,
814 StoredChainIdentifier { checkpoint_digest },
815 conn
816 );
817 Ok::<(), IndexerError>(())
818 },
819 PG_DB_COMMIT_SLEEP_DURATION
820 )?;
821 }
822 let guard = self
823 .metrics
824 .checkpoint_db_commit_latency_checkpoints
825 .start_timer();
826
827 let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
828 transactional_blocking_with_retry!(
829 &self.blocking_cp,
830 |conn| {
831 for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
832 insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
833 }
834 Ok::<(), IndexerError>(())
835 },
836 PG_DB_COMMIT_SLEEP_DURATION
837 )
838 .tap_ok(|_| {
839 info!(
840 "Persisted {} pruner_cp_watermark rows.",
841 stored_cp_txs.len(),
842 );
843 })
844 .tap_err(|e| {
845 tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
846 })?;
847
848 let stored_checkpoints = checkpoints
849 .iter()
850 .map(StoredCheckpoint::from)
851 .collect::<Vec<_>>();
852 transactional_blocking_with_retry!(
853 &self.blocking_cp,
854 |conn| {
855 for stored_checkpoint_chunk in
856 stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
857 {
858 insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
859 let time_now_ms = chrono::Utc::now().timestamp_millis();
860 for stored_checkpoint in stored_checkpoint_chunk {
861 self.metrics
862 .db_commit_lag_ms
863 .set(time_now_ms - stored_checkpoint.timestamp_ms);
864 self.metrics.max_committed_checkpoint_sequence_number.set(
865 stored_checkpoint.sequence_number,
866 );
867 self.metrics.committed_checkpoint_timestamp_ms.set(
868 stored_checkpoint.timestamp_ms,
869 );
870 }
871 for stored_checkpoint in stored_checkpoint_chunk {
872 info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
873 }
874 }
875 Ok::<(), IndexerError>(())
876 },
877 PG_DB_COMMIT_SLEEP_DURATION
878 )
879 .tap_ok(|_| {
880 let elapsed = guard.stop_and_record();
881 info!(
882 elapsed,
883 "Persisted {} checkpoints",
884 stored_checkpoints.len()
885 );
886 })
887 .tap_err(|e| {
888 tracing::error!("failed to persist checkpoints with error: {e}");
889 })
890 }
891
892 fn persist_transactions_chunk(
893 &self,
894 transactions: Vec<IndexedTransaction>,
895 ) -> Result<(), IndexerError> {
896 let guard = self
897 .metrics
898 .checkpoint_db_commit_latency_transactions_chunks
899 .start_timer();
900 let transformation_guard = self
901 .metrics
902 .checkpoint_db_commit_latency_transactions_chunks_transformation
903 .start_timer();
904 let transactions = transactions
905 .iter()
906 .map(StoredTransaction::from)
907 .collect::<Vec<_>>();
908 drop(transformation_guard);
909
910 transactional_blocking_with_retry!(
911 &self.blocking_cp,
912 |conn| {
913 for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
914 insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
915 }
916 Ok::<(), IndexerError>(())
917 },
918 PG_DB_COMMIT_SLEEP_DURATION
919 )
920 .tap_ok(|_| {
921 let elapsed = guard.stop_and_record();
922 info!(
923 elapsed,
924 "Persisted {} chunked transactions",
925 transactions.len()
926 );
927 })
928 .tap_err(|e| {
929 tracing::error!("failed to persist transactions with error: {e}");
930 })
931 }
932
933 fn persist_tx_global_order_chunk(
934 &self,
935 tx_order: Vec<CheckpointTxGlobalOrder>,
936 ) -> Result<(), IndexerError> {
937 let guard = self
938 .metrics
939 .checkpoint_db_commit_latency_tx_insertion_order_chunks
940 .start_timer();
941
942 transactional_blocking_with_retry!(
943 &self.blocking_cp,
944 |conn| {
945 for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
946 insert_or_ignore_into!(tx_global_order::table, tx_order_chunk, conn);
947 }
948 Ok::<(), IndexerError>(())
949 },
950 PG_DB_COMMIT_SLEEP_DURATION
951 )
952 .tap_ok(|_| {
953 let elapsed = guard.stop_and_record();
954 info!(
955 elapsed,
956 "Persisted {} chunked txs insertion order",
957 tx_order.len()
958 );
959 })
960 .tap_err(|e| {
961 tracing::error!("failed to persist txs insertion order with error: {e}");
962 })
963 }
964
965 fn update_status_for_checkpoint_transactions_chunk(
972 &self,
973 tx_order: Vec<CheckpointTxGlobalOrder>,
974 ) -> Result<(), IndexerError> {
975 let guard = self
976 .metrics
977 .checkpoint_db_commit_latency_tx_insertion_order_chunks
978 .start_timer();
979
980 let num_transactions = tx_order.len();
981 transactional_blocking_with_retry!(
982 &self.blocking_cp,
983 |conn| {
984 on_conflict_do_update_with_condition!(
985 tx_global_order::table,
986 tx_order.clone(),
987 tx_global_order::tx_digest,
988 tx_global_order::optimistic_sequence_number.eq(IndexStatus::Completed),
989 tx_global_order::optimistic_sequence_number.eq(IndexStatus::Started),
990 conn
991 );
992 on_conflict_do_update_with_condition!(
993 tx_global_order::table,
994 tx_order.clone(),
995 tx_global_order::tx_digest,
996 tx_global_order::chk_tx_sequence_number
997 .eq(excluded(tx_global_order::chk_tx_sequence_number)),
998 tx_global_order::chk_tx_sequence_number.is_null(),
999 conn
1000 );
1001 Ok::<(), IndexerError>(())
1002 },
1003 PG_DB_COMMIT_SLEEP_DURATION
1004 )
1005 .tap_ok(|_| {
1006 let elapsed = guard.stop_and_record();
1007 info!(
1008 elapsed,
1009 "Updated {} chunked values of `tx_global_order`", num_transactions
1010 );
1011 })
1012 .tap_err(|e| {
1013 tracing::error!("failed to update `tx_global_order` with error: {e}");
1014 })
1015 }
1016
1017 fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1018 let guard = self
1019 .metrics
1020 .checkpoint_db_commit_latency_events_chunks
1021 .start_timer();
1022 let len = events.len();
1023 let events = events
1024 .into_iter()
1025 .map(StoredEvent::from)
1026 .collect::<Vec<_>>();
1027
1028 transactional_blocking_with_retry!(
1029 &self.blocking_cp,
1030 |conn| {
1031 for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1032 insert_or_ignore_into!(events::table, event_chunk, conn);
1033 }
1034 Ok::<(), IndexerError>(())
1035 },
1036 PG_DB_COMMIT_SLEEP_DURATION
1037 )
1038 .tap_ok(|_| {
1039 let elapsed = guard.stop_and_record();
1040 info!(elapsed, "Persisted {} chunked events", len);
1041 })
1042 .tap_err(|e| {
1043 tracing::error!("failed to persist events with error: {e}");
1044 })
1045 }
1046
1047 fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1048 if packages.is_empty() {
1049 return Ok(());
1050 }
1051 let guard = self
1052 .metrics
1053 .checkpoint_db_commit_latency_packages
1054 .start_timer();
1055 let packages = packages
1056 .into_iter()
1057 .map(StoredPackage::from)
1058 .collect::<Vec<_>>();
1059 transactional_blocking_with_retry!(
1060 &self.blocking_cp,
1061 |conn| {
1062 for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1063 on_conflict_do_update!(
1064 packages::table,
1065 packages_chunk,
1066 packages::package_id,
1067 (
1068 packages::package_id.eq(excluded(packages::package_id)),
1069 packages::move_package.eq(excluded(packages::move_package)),
1070 ),
1071 conn
1072 );
1073 }
1074 Ok::<(), IndexerError>(())
1075 },
1076 PG_DB_COMMIT_SLEEP_DURATION
1077 )
1078 .tap_ok(|_| {
1079 let elapsed = guard.stop_and_record();
1080 info!(elapsed, "Persisted {} packages", packages.len());
1081 })
1082 .tap_err(|e| {
1083 tracing::error!("failed to persist packages with error: {e}");
1084 })
1085 }
1086
1087 async fn persist_event_indices_chunk(
1088 &self,
1089 indices: Vec<EventIndex>,
1090 ) -> Result<(), IndexerError> {
1091 let guard = self
1092 .metrics
1093 .checkpoint_db_commit_latency_event_indices_chunks
1094 .start_timer();
1095 let len = indices.len();
1096 let (
1097 event_emit_packages,
1098 event_emit_modules,
1099 event_senders,
1100 event_struct_packages,
1101 event_struct_modules,
1102 event_struct_names,
1103 event_struct_instantiations,
1104 ) = indices.into_iter().map(|i| i.split()).fold(
1105 (
1106 Vec::new(),
1107 Vec::new(),
1108 Vec::new(),
1109 Vec::new(),
1110 Vec::new(),
1111 Vec::new(),
1112 Vec::new(),
1113 ),
1114 |(
1115 mut event_emit_packages,
1116 mut event_emit_modules,
1117 mut event_senders,
1118 mut event_struct_packages,
1119 mut event_struct_modules,
1120 mut event_struct_names,
1121 mut event_struct_instantiations,
1122 ),
1123 index| {
1124 event_emit_packages.push(index.0);
1125 event_emit_modules.push(index.1);
1126 event_senders.push(index.2);
1127 event_struct_packages.push(index.3);
1128 event_struct_modules.push(index.4);
1129 event_struct_names.push(index.5);
1130 event_struct_instantiations.push(index.6);
1131 (
1132 event_emit_packages,
1133 event_emit_modules,
1134 event_senders,
1135 event_struct_packages,
1136 event_struct_modules,
1137 event_struct_names,
1138 event_struct_instantiations,
1139 )
1140 },
1141 );
1142
1143 let mut futures = vec![];
1145 futures.push(self.spawn_blocking_task(move |this| {
1146 persist_chunk_into_table!(
1147 event_emit_package::table,
1148 event_emit_packages,
1149 &this.blocking_cp
1150 )
1151 }));
1152
1153 futures.push(self.spawn_blocking_task(move |this| {
1154 persist_chunk_into_table!(
1155 event_emit_module::table,
1156 event_emit_modules,
1157 &this.blocking_cp
1158 )
1159 }));
1160
1161 futures.push(self.spawn_blocking_task(move |this| {
1162 persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
1163 }));
1164
1165 futures.push(self.spawn_blocking_task(move |this| {
1166 persist_chunk_into_table!(
1167 event_struct_package::table,
1168 event_struct_packages,
1169 &this.blocking_cp
1170 )
1171 }));
1172
1173 futures.push(self.spawn_blocking_task(move |this| {
1174 persist_chunk_into_table!(
1175 event_struct_module::table,
1176 event_struct_modules,
1177 &this.blocking_cp
1178 )
1179 }));
1180
1181 futures.push(self.spawn_blocking_task(move |this| {
1182 persist_chunk_into_table!(
1183 event_struct_name::table,
1184 event_struct_names,
1185 &this.blocking_cp
1186 )
1187 }));
1188
1189 futures.push(self.spawn_blocking_task(move |this| {
1190 persist_chunk_into_table!(
1191 event_struct_instantiation::table,
1192 event_struct_instantiations,
1193 &this.blocking_cp
1194 )
1195 }));
1196
1197 futures::future::try_join_all(futures)
1198 .await
1199 .map_err(|e| {
1200 tracing::error!("failed to join event indices futures in a chunk: {e}");
1201 IndexerError::from(e)
1202 })?
1203 .into_iter()
1204 .collect::<Result<Vec<_>, _>>()
1205 .map_err(|e| {
1206 IndexerError::PostgresWrite(format!(
1207 "Failed to persist all event indices in a chunk: {e:?}"
1208 ))
1209 })?;
1210 let elapsed = guard.stop_and_record();
1211 info!(elapsed, "Persisted {} chunked event indices", len);
1212 Ok(())
1213 }
1214
1215 async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1216 let guard = self
1217 .metrics
1218 .checkpoint_db_commit_latency_tx_indices_chunks
1219 .start_timer();
1220 let len = indices.len();
1221
1222 let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
1223
1224 let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1225 let recipients: Vec<_> = splits
1226 .iter()
1227 .flat_map(|ix| ix.tx_recipients.clone())
1228 .collect();
1229 let input_objects: Vec<_> = splits
1230 .iter()
1231 .flat_map(|ix| ix.tx_input_objects.clone())
1232 .collect();
1233 let changed_objects: Vec<_> = splits
1234 .iter()
1235 .flat_map(|ix| ix.tx_changed_objects.clone())
1236 .collect();
1237 let wrapped_or_deleted_objects: Vec<_> = splits
1238 .iter()
1239 .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1240 .collect();
1241 let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1242 let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1243 let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1244 let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1245 let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1246
1247 let futures = [
1248 self.spawn_blocking_task(move |this| {
1249 persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1250 }),
1251 self.spawn_blocking_task(move |this| {
1252 persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1253 }),
1254 self.spawn_blocking_task(move |this| {
1255 persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1256 }),
1257 self.spawn_blocking_task(move |this| {
1258 persist_chunk_into_table!(
1259 tx_changed_objects::table,
1260 changed_objects,
1261 &this.blocking_cp
1262 )
1263 }),
1264 self.spawn_blocking_task(move |this| {
1265 persist_chunk_into_table!(
1266 tx_wrapped_or_deleted_objects::table,
1267 wrapped_or_deleted_objects,
1268 &this.blocking_cp
1269 )
1270 }),
1271 self.spawn_blocking_task(move |this| {
1272 persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1273 }),
1274 self.spawn_blocking_task(move |this| {
1275 persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1276 }),
1277 self.spawn_blocking_task(move |this| {
1278 persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1279 }),
1280 self.spawn_blocking_task(move |this| {
1281 persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1282 }),
1283 self.spawn_blocking_task(move |this| {
1284 persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1285 }),
1286 ];
1287
1288 futures::future::try_join_all(futures)
1289 .await
1290 .map_err(|e| {
1291 tracing::error!("failed to join tx indices futures in a chunk: {e}");
1292 IndexerError::from(e)
1293 })?
1294 .into_iter()
1295 .collect::<Result<Vec<_>, _>>()
1296 .map_err(|e| {
1297 IndexerError::PostgresWrite(format!(
1298 "Failed to persist all tx indices in a chunk: {e:?}"
1299 ))
1300 })?;
1301 let elapsed = guard.stop_and_record();
1302 info!(elapsed, "Persisted {} chunked tx_indices", len);
1303 Ok(())
1304 }
1305
1306 fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1307 let guard = self
1308 .metrics
1309 .checkpoint_db_commit_latency_epoch
1310 .start_timer();
1311 let epoch_id = epoch.new_epoch.epoch;
1312
1313 transactional_blocking_with_retry!(
1314 &self.blocking_cp,
1315 |conn| {
1316 if let Some(last_epoch) = &epoch.last_epoch {
1317 info!(last_epoch.epoch, "Persisting epoch end data.");
1318 diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1319 .set(last_epoch)
1320 .execute(conn)?;
1321 }
1322
1323 info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1324 insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1325 Ok::<(), IndexerError>(())
1326 },
1327 PG_DB_COMMIT_SLEEP_DURATION
1328 )
1329 .tap_ok(|_| {
1330 let elapsed = guard.stop_and_record();
1331 info!(elapsed, epoch_id, "Persisted epoch beginning info");
1332 })
1333 .tap_err(|e| {
1334 tracing::error!("failed to persist epoch with error: {e}");
1335 })
1336 }
1337
1338 fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1339 let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1340 if let Some(last_epoch_id) = last_epoch_id {
1342 let last_db_epoch: Option<StoredEpochInfo> =
1343 read_only_blocking!(&self.blocking_cp, |conn| {
1344 epochs::table
1345 .filter(epochs::epoch.eq(last_epoch_id))
1346 .first::<StoredEpochInfo>(conn)
1347 .optional()
1348 })
1349 .context("Failed to read last epoch from PostgresDB")?;
1350 if let Some(last_epoch) = last_db_epoch {
1351 let epoch_partition_data =
1352 EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1353 let table_partitions = self.partition_manager.get_table_partitions()?;
1354 for (table, (_, last_partition)) in table_partitions {
1355 if !self
1357 .partition_manager
1358 .get_strategy(&table)
1359 .is_epoch_partitioned()
1360 {
1361 continue;
1362 }
1363 let guard = self.metrics.advance_epoch_latency.start_timer();
1364 self.partition_manager.advance_epoch(
1365 table.clone(),
1366 last_partition,
1367 &epoch_partition_data,
1368 )?;
1369 let elapsed = guard.stop_and_record();
1370 info!(
1371 elapsed,
1372 "Advanced epoch partition {} for table {}",
1373 last_partition,
1374 table.clone()
1375 );
1376 }
1377 } else {
1378 tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1379 }
1380 }
1381
1382 Ok(())
1383 }
1384
1385 fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1386 transactional_blocking_with_retry!(
1387 &self.blocking_cp,
1388 |conn| {
1389 diesel::delete(
1390 checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1391 )
1392 .execute(conn)
1393 .map_err(IndexerError::from)
1394 .context("Failed to prune checkpoints table")?;
1395
1396 Ok::<(), IndexerError>(())
1397 },
1398 PG_DB_COMMIT_SLEEP_DURATION
1399 )
1400 }
1401
1402 fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1403 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1404 transactional_blocking_with_retry!(
1405 &self.blocking_cp,
1406 |conn| {
1407 prune_tx_or_event_indice_table!(
1408 event_emit_module,
1409 conn,
1410 min_tx,
1411 max_tx,
1412 "Failed to prune event_emit_module table"
1413 );
1414 prune_tx_or_event_indice_table!(
1415 event_emit_package,
1416 conn,
1417 min_tx,
1418 max_tx,
1419 "Failed to prune event_emit_package table"
1420 );
1421 prune_tx_or_event_indice_table![
1422 event_senders,
1423 conn,
1424 min_tx,
1425 max_tx,
1426 "Failed to prune event_senders table"
1427 ];
1428 prune_tx_or_event_indice_table![
1429 event_struct_instantiation,
1430 conn,
1431 min_tx,
1432 max_tx,
1433 "Failed to prune event_struct_instantiation table"
1434 ];
1435 prune_tx_or_event_indice_table![
1436 event_struct_module,
1437 conn,
1438 min_tx,
1439 max_tx,
1440 "Failed to prune event_struct_module table"
1441 ];
1442 prune_tx_or_event_indice_table![
1443 event_struct_name,
1444 conn,
1445 min_tx,
1446 max_tx,
1447 "Failed to prune event_struct_name table"
1448 ];
1449 prune_tx_or_event_indice_table![
1450 event_struct_package,
1451 conn,
1452 min_tx,
1453 max_tx,
1454 "Failed to prune event_struct_package table"
1455 ];
1456 Ok::<(), IndexerError>(())
1457 },
1458 PG_DB_COMMIT_SLEEP_DURATION
1459 )
1460 }
1461
1462 fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1463 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1464 transactional_blocking_with_retry!(
1465 &self.blocking_cp,
1466 |conn| {
1467 prune_tx_or_event_indice_table!(
1468 tx_senders,
1469 conn,
1470 min_tx,
1471 max_tx,
1472 "Failed to prune tx_senders table"
1473 );
1474 prune_tx_or_event_indice_table!(
1475 tx_recipients,
1476 conn,
1477 min_tx,
1478 max_tx,
1479 "Failed to prune tx_recipients table"
1480 );
1481 prune_tx_or_event_indice_table![
1482 tx_input_objects,
1483 conn,
1484 min_tx,
1485 max_tx,
1486 "Failed to prune tx_input_objects table"
1487 ];
1488 prune_tx_or_event_indice_table![
1489 tx_changed_objects,
1490 conn,
1491 min_tx,
1492 max_tx,
1493 "Failed to prune tx_changed_objects table"
1494 ];
1495 prune_tx_or_event_indice_table![
1496 tx_wrapped_or_deleted_objects,
1497 conn,
1498 min_tx,
1499 max_tx,
1500 "Failed to prune tx_wrapped_or_deleted_objects table"
1501 ];
1502 prune_tx_or_event_indice_table![
1503 tx_calls_pkg,
1504 conn,
1505 min_tx,
1506 max_tx,
1507 "Failed to prune tx_calls_pkg table"
1508 ];
1509 prune_tx_or_event_indice_table![
1510 tx_calls_mod,
1511 conn,
1512 min_tx,
1513 max_tx,
1514 "Failed to prune tx_calls_mod table"
1515 ];
1516 prune_tx_or_event_indice_table![
1517 tx_calls_fun,
1518 conn,
1519 min_tx,
1520 max_tx,
1521 "Failed to prune tx_calls_fun table"
1522 ];
1523 prune_tx_or_event_indice_table![
1524 tx_digests,
1525 conn,
1526 min_tx,
1527 max_tx,
1528 "Failed to prune tx_digests table"
1529 ];
1530 Ok::<(), IndexerError>(())
1531 },
1532 PG_DB_COMMIT_SLEEP_DURATION
1533 )
1534 }
1535
1536 fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1537 transactional_blocking_with_retry!(
1538 &self.blocking_cp,
1539 |conn| {
1540 diesel::delete(
1541 pruner_cp_watermark::table
1542 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1543 )
1544 .execute(conn)
1545 .map_err(IndexerError::from)
1546 .context("Failed to prune pruner_cp_watermark table")?;
1547 Ok::<(), IndexerError>(())
1548 },
1549 PG_DB_COMMIT_SLEEP_DURATION
1550 )
1551 }
1552
1553 fn get_network_total_transactions_by_end_of_epoch(
1554 &self,
1555 epoch: u64,
1556 ) -> Result<Option<u64>, IndexerError> {
1557 read_only_blocking!(&self.blocking_cp, |conn| {
1558 epochs::table
1559 .filter(epochs::epoch.eq(epoch as i64))
1560 .select(epochs::network_total_transactions)
1561 .get_result::<Option<i64>>(conn)
1562 })
1563 .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1564 .map(|option| option.map(|v| v as u64))
1565 }
1566
1567 fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1568 transactional_blocking_with_retry!(
1569 &self.blocking_cp,
1570 |conn| {
1571 diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1572 .execute(conn)?;
1573 Ok::<(), IndexerError>(())
1574 },
1575 PG_DB_COMMIT_SLEEP_DURATION
1576 )
1577 .tap_ok(|_| {
1578 info!("Successfully refreshed participation_metrics");
1579 })
1580 .tap_err(|e| {
1581 tracing::error!("failed to refresh participation_metrics: {e}");
1582 })
1583 }
1584
1585 fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1586 &self,
1587 watermark: CommitterWatermark,
1588 ) -> Result<(), IndexerError>
1589 where
1590 E::Iterator: Iterator<Item: AsRef<str>>,
1591 {
1592 let guard = self
1593 .metrics
1594 .checkpoint_db_commit_latency_watermarks
1595 .start_timer();
1596
1597 let upper_bound_updates = E::iter()
1598 .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1599 .collect::<Vec<_>>();
1600
1601 transactional_blocking_with_retry!(
1602 &self.blocking_cp,
1603 |conn| {
1604 diesel::insert_into(watermarks::table)
1605 .values(&upper_bound_updates)
1606 .on_conflict(watermarks::entity)
1607 .do_update()
1608 .set((
1609 watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)),
1610 watermarks::checkpoint_hi_inclusive
1611 .eq(excluded(watermarks::checkpoint_hi_inclusive)),
1612 watermarks::tx_hi.eq(excluded(watermarks::tx_hi)),
1613 ))
1614 .execute(conn)
1615 .map_err(IndexerError::from)
1616 .context("Failed to update watermarks upper bound")?;
1617 Ok::<(), IndexerError>(())
1618 },
1619 PG_DB_COMMIT_SLEEP_DURATION
1620 )
1621 .tap_ok(|_| {
1622 let elapsed = guard.stop_and_record();
1623 info!(elapsed, "Persisted watermarks");
1624 })
1625 .tap_err(|e| {
1626 tracing::error!("Failed to persist watermarks with error: {}", e);
1627 })
1628 }
1629
1630 fn map_epochs_to_cp_tx(
1631 &self,
1632 epochs: &[u64],
1633 ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1634 let pool = &self.blocking_cp;
1635 let results: Vec<(i64, i64, i64)> = run_query!(pool, move |conn| {
1636 epochs::table
1637 .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1638 .select((
1639 epochs::epoch,
1640 epochs::first_checkpoint_id,
1641 epochs::first_tx_sequence_number,
1642 ))
1643 .load::<(i64, i64, i64)>(conn)
1644 })
1645 .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1646
1647 Ok(results
1648 .into_iter()
1649 .map(|(epoch, checkpoint, tx)| (epoch as u64, (checkpoint as u64, tx as u64)))
1650 .collect())
1651 }
1652
1653 fn update_watermarks_lower_bound(
1654 &self,
1655 watermarks: Vec<(PrunableTable, u64)>,
1656 ) -> Result<(), IndexerError> {
1657 use diesel::query_dsl::methods::FilterDsl;
1658
1659 let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1660 let epoch_mapping = self.map_epochs_to_cp_tx(&epochs)?;
1661 let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1662 .into_iter()
1663 .map(|(table, epoch)| {
1664 let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1665 IndexerError::PersistentStorageDataCorruption(format!(
1666 "epoch {epoch} not found in epoch mapping",
1667 ))
1668 })?;
1669 Ok(StoredWatermark::from_lower_bound_update(
1670 table.as_ref(),
1671 epoch,
1672 table.select_reader_lo(*checkpoint, *tx),
1673 ))
1674 })
1675 .collect();
1676 let lower_bound_updates = lookups?;
1677 let guard = self
1678 .metrics
1679 .checkpoint_db_commit_latency_watermarks
1680 .start_timer();
1681 transactional_blocking_with_retry!(
1682 &self.blocking_cp,
1683 |conn| {
1684 diesel::insert_into(watermarks::table)
1685 .values(&lower_bound_updates)
1686 .on_conflict(watermarks::entity)
1687 .do_update()
1688 .set((
1689 watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),
1690 watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)),
1691 watermarks::timestamp_ms.eq(sql::<diesel::sql_types::BigInt>(
1692 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1693 )),
1694 ))
1695 .filter(excluded(watermarks::reader_lo).gt(watermarks::reader_lo))
1696 .filter(excluded(watermarks::epoch_lo).gt(watermarks::epoch_lo))
1697 .filter(
1698 diesel::dsl::sql::<diesel::sql_types::BigInt>(
1699 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1700 )
1701 .gt(watermarks::timestamp_ms),
1702 )
1703 .execute(conn)
1704 },
1705 PG_DB_COMMIT_SLEEP_DURATION
1706 )
1707 .tap_ok(|_| {
1708 let elapsed = guard.stop_and_record();
1709 info!(elapsed, "Persisted watermarks");
1710 })
1711 .tap_err(|e| {
1712 tracing::error!("Failed to persist watermarks with error: {}", e);
1713 })?;
1714 Ok(())
1715 }
1716
1717 fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1718 run_query_with_retry!(
1721 &self.blocking_cp,
1722 |conn| {
1723 let stored = watermarks::table
1724 .load::<StoredWatermark>(conn)
1725 .map_err(Into::into)
1726 .context("Failed reading watermarks from PostgresDB")?;
1727 let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1728 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1729 ))
1730 .get_result(conn)
1731 .map_err(Into::into)
1732 .context("Failed reading current timestamp from PostgresDB")?;
1733 Ok::<_, IndexerError>((stored, timestamp))
1734 },
1735 PG_DB_COMMIT_SLEEP_DURATION
1736 )
1737 }
1738
1739 async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1740 where
1741 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1742 R: Send + 'static,
1743 {
1744 let this = self.clone();
1745 let current_span = tracing::Span::current();
1746 tokio::task::spawn_blocking(move || {
1747 let _guard = current_span.enter();
1748 f(this)
1749 })
1750 .await
1751 .map_err(Into::into)
1752 .and_then(std::convert::identity)
1753 }
1754
1755 fn spawn_blocking_task<F, R>(
1756 &self,
1757 f: F,
1758 ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1759 where
1760 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1761 R: Send + 'static,
1762 {
1763 let this = self.clone();
1764 let current_span = tracing::Span::current();
1765 let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1766 tokio::task::spawn_blocking(move || {
1767 let _guard = current_span.enter();
1768 let _elapsed = guard.stop_and_record();
1769 f(this)
1770 })
1771 }
1772
1773 fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1774 where
1775 F: FnOnce(Self) -> Fut + Send + 'static,
1776 Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1777 R: Send + 'static,
1778 {
1779 let this = self.clone();
1780 tokio::task::spawn(async move { f(this).await })
1781 }
1782}
1783
1784#[async_trait]
1785impl IndexerStore for PgIndexerStore {
1786 async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1787 self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1788 .await
1789 }
1790
1791 async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1792 self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1793 .await
1794 }
1795
1796 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1797 self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1798 .await
1799 }
1800
1801 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1802 self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1803 .await
1804 }
1805
1806 async fn get_latest_object_snapshot_watermark(
1807 &self,
1808 ) -> Result<Option<CommitterWatermark>, IndexerError> {
1809 self.execute_in_blocking_worker(|this| this.get_latest_object_snapshot_watermark())
1810 .await
1811 }
1812
1813 async fn get_latest_object_snapshot_checkpoint_sequence_number(
1814 &self,
1815 ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
1816 self.execute_in_blocking_worker(|this| {
1817 this.get_latest_object_snapshot_checkpoint_sequence_number()
1818 })
1819 .await
1820 }
1821
1822 fn persist_objects_in_existing_transaction(
1823 &self,
1824 conn: &mut PgConnection,
1825 object_changes: Vec<TransactionObjectChangesToCommit>,
1826 ) -> Result<(), IndexerError> {
1827 if object_changes.is_empty() {
1828 return Ok(());
1829 }
1830
1831 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1832 let object_mutations = indexed_mutations
1833 .into_iter()
1834 .map(StoredObject::from)
1835 .collect::<Vec<_>>();
1836 let object_deletions = indexed_deletions
1837 .into_iter()
1838 .map(StoredDeletedObject::from)
1839 .collect::<Vec<_>>();
1840
1841 self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1842 self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1843
1844 Ok(())
1845 }
1846
1847 async fn persist_objects_snapshot(
1848 &self,
1849 object_changes: Vec<TransactionObjectChangesToCommit>,
1850 ) -> Result<(), IndexerError> {
1851 if object_changes.is_empty() {
1852 return Ok(());
1853 }
1854 let guard = self
1855 .metrics
1856 .checkpoint_db_commit_latency_objects_snapshot
1857 .start_timer();
1858 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1859 let objects_snapshot = indexed_mutations
1860 .into_iter()
1861 .map(StoredObjectSnapshot::from)
1862 .chain(
1863 indexed_deletions
1864 .into_iter()
1865 .map(StoredObjectSnapshot::from),
1866 )
1867 .collect::<Vec<_>>();
1868 let len = objects_snapshot.len();
1869 let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1870 let futures = chunks
1871 .into_iter()
1872 .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1873
1874 futures::future::try_join_all(futures)
1875 .await
1876 .map_err(|e| {
1877 tracing::error!("failed to join backfill_objects_snapshot_chunk futures: {e}");
1878 IndexerError::from(e)
1879 })?
1880 .into_iter()
1881 .collect::<Result<Vec<_>, _>>()
1882 .map_err(|e| {
1883 IndexerError::PostgresWrite(format!(
1884 "Failed to persist all objects snapshot chunks: {e:?}"
1885 ))
1886 })?;
1887 let elapsed = guard.stop_and_record();
1888 info!(elapsed, "Persisted {} objects snapshot", len);
1889 Ok(())
1890 }
1891
1892 async fn persist_object_history(
1893 &self,
1894 object_changes: Vec<TransactionObjectChangesToCommit>,
1895 ) -> Result<(), IndexerError> {
1896 let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1897 .map(|val| val.eq_ignore_ascii_case("true"))
1898 .unwrap_or(false);
1899 if skip_history {
1900 info!("skipping object history");
1901 return Ok(());
1902 }
1903
1904 if object_changes.is_empty() {
1905 return Ok(());
1906 }
1907 let objects = make_objects_history_to_commit(object_changes);
1908 let guard = self
1909 .metrics
1910 .checkpoint_db_commit_latency_objects_history
1911 .start_timer();
1912
1913 let len = objects.len();
1914 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1915 let futures = chunks
1916 .into_iter()
1917 .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1918
1919 futures::future::try_join_all(futures)
1920 .await
1921 .map_err(|e| {
1922 tracing::error!("failed to join persist_objects_history_chunk futures: {e}");
1923 IndexerError::from(e)
1924 })?
1925 .into_iter()
1926 .collect::<Result<Vec<_>, _>>()
1927 .map_err(|e| {
1928 IndexerError::PostgresWrite(format!(
1929 "Failed to persist all objects history chunks: {e:?}"
1930 ))
1931 })?;
1932 let elapsed = guard.stop_and_record();
1933 info!(elapsed, "Persisted {} objects history", len);
1934 Ok(())
1935 }
1936
1937 async fn persist_object_versions(
1938 &self,
1939 object_versions: Vec<StoredObjectVersion>,
1940 ) -> Result<(), IndexerError> {
1941 if object_versions.is_empty() {
1942 return Ok(());
1943 }
1944
1945 let guard = self
1946 .metrics
1947 .checkpoint_db_commit_latency_objects_version
1948 .start_timer();
1949
1950 let object_versions_count = object_versions.len();
1951
1952 let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1953 let futures = chunks
1954 .into_iter()
1955 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1956 .collect::<Vec<_>>();
1957
1958 futures::future::try_join_all(futures)
1959 .await
1960 .map_err(|e| {
1961 tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1962 IndexerError::from(e)
1963 })?
1964 .into_iter()
1965 .collect::<Result<Vec<_>, _>>()
1966 .map_err(|e| {
1967 IndexerError::PostgresWrite(format!(
1968 "Failed to persist all objects version chunks: {e:?}"
1969 ))
1970 })?;
1971 let elapsed = guard.stop_and_record();
1972 info!(elapsed, "Persisted {object_versions_count} object versions");
1973 Ok(())
1974 }
1975
1976 async fn persist_checkpoints(
1977 &self,
1978 checkpoints: Vec<IndexedCheckpoint>,
1979 ) -> Result<(), IndexerError> {
1980 self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1981 .await
1982 }
1983
1984 async fn persist_transactions(
1985 &self,
1986 transactions: Vec<IndexedTransaction>,
1987 ) -> Result<(), IndexerError> {
1988 let guard = self
1989 .metrics
1990 .checkpoint_db_commit_latency_transactions
1991 .start_timer();
1992 let len = transactions.len();
1993
1994 let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1995 let futures = chunks
1996 .into_iter()
1997 .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1998
1999 futures::future::try_join_all(futures)
2000 .await
2001 .map_err(|e| {
2002 tracing::error!("failed to join persist_transactions_chunk futures: {e}");
2003 IndexerError::from(e)
2004 })?
2005 .into_iter()
2006 .collect::<Result<Vec<_>, _>>()
2007 .map_err(|e| {
2008 IndexerError::PostgresWrite(format!(
2009 "Failed to persist all transactions chunks: {e:?}"
2010 ))
2011 })?;
2012 let elapsed = guard.stop_and_record();
2013 info!(elapsed, "Persisted {} transactions", len);
2014 Ok(())
2015 }
2016
2017 fn persist_optimistic_transaction_in_existing_transaction(
2018 &self,
2019 conn: &mut PgConnection,
2020 transaction: OptimisticTransaction,
2021 ) -> Result<(), IndexerError> {
2022 insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
2023 Ok(())
2024 }
2025
2026 async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
2027 if events.is_empty() {
2028 return Ok(());
2029 }
2030 let len = events.len();
2031 let guard = self
2032 .metrics
2033 .checkpoint_db_commit_latency_events
2034 .start_timer();
2035 let chunks = chunk!(events, self.config.parallel_chunk_size);
2036 let futures = chunks
2037 .into_iter()
2038 .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
2039
2040 futures::future::try_join_all(futures)
2041 .await
2042 .map_err(|e| {
2043 tracing::error!("failed to join persist_events_chunk futures: {e}");
2044 IndexerError::from(e)
2045 })?
2046 .into_iter()
2047 .collect::<Result<Vec<_>, _>>()
2048 .map_err(|e| {
2049 IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
2050 })?;
2051 let elapsed = guard.stop_and_record();
2052 info!(elapsed, "Persisted {} events", len);
2053 Ok(())
2054 }
2055
2056 async fn persist_displays(
2057 &self,
2058 display_updates: BTreeMap<String, StoredDisplay>,
2059 ) -> Result<(), IndexerError> {
2060 if display_updates.is_empty() {
2061 return Ok(());
2062 }
2063
2064 self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
2065 .await?
2066 }
2067
2068 fn persist_displays_in_existing_transaction(
2069 &self,
2070 conn: &mut PgConnection,
2071 display_updates: Vec<&StoredDisplay>,
2072 ) -> Result<(), IndexerError> {
2073 if display_updates.is_empty() {
2074 return Ok(());
2075 }
2076
2077 on_conflict_do_update_with_condition!(
2078 display::table,
2079 display_updates,
2080 display::object_type,
2081 (
2082 display::id.eq(excluded(display::id)),
2083 display::version.eq(excluded(display::version)),
2084 display::bcs.eq(excluded(display::bcs)),
2085 ),
2086 excluded(display::version).gt(display::version),
2087 conn
2088 );
2089
2090 Ok(())
2091 }
2092
2093 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2094 if packages.is_empty() {
2095 return Ok(());
2096 }
2097 self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
2098 .await
2099 }
2100
2101 async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2102 if indices.is_empty() {
2103 return Ok(());
2104 }
2105 let len = indices.len();
2106 let guard = self
2107 .metrics
2108 .checkpoint_db_commit_latency_event_indices
2109 .start_timer();
2110 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2111
2112 let futures = chunks.into_iter().map(|chunk| {
2113 self.spawn_task(move |this: Self| async move {
2114 this.persist_event_indices_chunk(chunk).await
2115 })
2116 });
2117
2118 futures::future::try_join_all(futures)
2119 .await
2120 .map_err(|e| {
2121 tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
2122 IndexerError::from(e)
2123 })?
2124 .into_iter()
2125 .collect::<Result<Vec<_>, _>>()
2126 .map_err(|e| {
2127 IndexerError::PostgresWrite(format!(
2128 "Failed to persist all event_indices chunks: {e:?}"
2129 ))
2130 })?;
2131 let elapsed = guard.stop_and_record();
2132 info!(elapsed, "Persisted {} event_indices chunks", len);
2133 Ok(())
2134 }
2135
2136 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2137 self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2138 .await
2139 }
2140
2141 async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2142 self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2143 .await
2144 }
2145
2146 async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2147 let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2148 (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2149 _ => Err(IndexerError::PostgresRead(format!(
2150 "Failed to get checkpoint range for epoch {epoch}"
2151 ))),
2152 }?;
2153
2154 let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2159 min_cp = std::cmp::max(min_cp, min_prunable_cp);
2160 for cp in min_cp..=max_cp {
2161 info!(
2173 "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2174 cp, epoch, min_prunable_cp
2175 );
2176 self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2177 .await
2178 .unwrap_or_else(|e| {
2179 tracing::error!("failed to prune checkpoint {cp}: {e}");
2180 });
2181
2182 let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2183 self.execute_in_blocking_worker(move |this| {
2184 this.prune_tx_indices_table(min_tx, max_tx)
2185 })
2186 .await
2187 .unwrap_or_else(|e| {
2188 tracing::error!("failed to prune transactions for cp {cp}: {e}");
2189 });
2190 info!(
2191 "Pruned transactions for checkpoint {} from tx {} to tx {}",
2192 cp, min_tx, max_tx
2193 );
2194 self.execute_in_blocking_worker(move |this| {
2195 this.prune_event_indices_table(min_tx, max_tx)
2196 })
2197 .await
2198 .unwrap_or_else(|e| {
2199 tracing::error!("failed to prune events of transactions for cp {cp}: {e}");
2200 });
2201 info!(
2202 "Pruned events of transactions for checkpoint {cp} from tx {min_tx} to tx {max_tx}"
2203 );
2204 self.metrics.last_pruned_transaction.set(max_tx as i64);
2205
2206 self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2207 .await
2208 .unwrap_or_else(|e| {
2209 tracing::error!("failed to prune pruner_cp_watermark table for cp {cp}: {e}");
2210 });
2211 info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2212 self.metrics.last_pruned_checkpoint.set(cp as i64);
2213 }
2214
2215 Ok(())
2216 }
2217
2218 async fn get_network_total_transactions_by_end_of_epoch(
2219 &self,
2220 epoch: u64,
2221 ) -> Result<Option<u64>, IndexerError> {
2222 self.execute_in_blocking_worker(move |this| {
2223 this.get_network_total_transactions_by_end_of_epoch(epoch)
2224 })
2225 .await
2226 }
2227
2228 async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2229 self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2230 .await
2231 }
2232
2233 async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2234 &self,
2235 watermark: CommitterWatermark,
2236 ) -> Result<(), IndexerError>
2237 where
2238 E::Iterator: Iterator<Item: AsRef<str>>,
2239 {
2240 self.execute_in_blocking_worker(move |this| {
2241 this.update_watermarks_upper_bound::<E>(watermark)
2242 })
2243 .await
2244 }
2245
2246 fn as_any(&self) -> &dyn StdAny {
2247 self
2248 }
2249
2250 fn persist_protocol_configs_and_feature_flags(
2253 &self,
2254 chain_id: Vec<u8>,
2255 ) -> Result<(), IndexerError> {
2256 let chain_id = ChainIdentifier::from(
2257 CheckpointDigest::try_from(chain_id).expect("unable to convert chain id"),
2258 );
2259
2260 let mut all_configs = vec![];
2261 let mut all_flags = vec![];
2262
2263 let (start_version, end_version) = self.get_protocol_version_index_range()?;
2264 info!(
2265 "Persisting protocol configs with start_version: {}, end_version: {}",
2266 start_version, end_version
2267 );
2268
2269 for version in start_version..=end_version {
2272 let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2273 (version as u64).into(),
2274 chain_id.chain(),
2275 )
2276 .ok_or(IndexerError::Generic(format!(
2277 "Unable to fetch protocol version {} and chain {:?}",
2278 version,
2279 chain_id.chain()
2280 )))?;
2281 let configs_vec = protocol_configs
2282 .attr_map()
2283 .into_iter()
2284 .map(|(k, v)| StoredProtocolConfig {
2285 protocol_version: version,
2286 config_name: k,
2287 config_value: v.map(|v| v.to_string()),
2288 })
2289 .collect::<Vec<_>>();
2290 all_configs.extend(configs_vec);
2291
2292 let feature_flags = protocol_configs
2293 .feature_map()
2294 .into_iter()
2295 .map(|(k, v)| StoredFeatureFlag {
2296 protocol_version: version,
2297 flag_name: k,
2298 flag_value: v,
2299 })
2300 .collect::<Vec<_>>();
2301 all_flags.extend(feature_flags);
2302 }
2303
2304 transactional_blocking_with_retry!(
2305 &self.blocking_cp,
2306 |conn| {
2307 for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2308 insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2309 }
2310 insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2311 Ok::<(), IndexerError>(())
2312 },
2313 PG_DB_COMMIT_SLEEP_DURATION
2314 )?;
2315 Ok(())
2316 }
2317
2318 async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2319 if indices.is_empty() {
2320 return Ok(());
2321 }
2322 let len = indices.len();
2323 let guard = self
2324 .metrics
2325 .checkpoint_db_commit_latency_tx_indices
2326 .start_timer();
2327 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2328
2329 let futures = chunks.into_iter().map(|chunk| {
2330 self.spawn_task(move |this: Self| async move {
2331 this.persist_tx_indices_chunk_v2(chunk).await
2332 })
2333 });
2334 futures::future::try_join_all(futures)
2335 .await
2336 .map_err(|e| {
2337 tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2338 IndexerError::from(e)
2339 })?
2340 .into_iter()
2341 .collect::<Result<Vec<_>, _>>()
2342 .map_err(|e| {
2343 IndexerError::PostgresWrite(format!(
2344 "Failed to persist all tx_indices chunks: {e:?}"
2345 ))
2346 })?;
2347 let elapsed = guard.stop_and_record();
2348 info!(elapsed, "Persisted {} tx_indices chunks", len);
2349 Ok(())
2350 }
2351
2352 async fn persist_checkpoint_objects(
2353 &self,
2354 objects: Vec<CheckpointObjectChanges>,
2355 ) -> Result<(), IndexerError> {
2356 if objects.is_empty() {
2357 return Ok(());
2358 }
2359 let guard = self
2360 .metrics
2361 .checkpoint_db_commit_latency_objects
2362 .start_timer();
2363 let CheckpointObjectChanges {
2364 changed_objects: mutations,
2365 deleted_objects: deletions,
2366 } = retain_latest_objects_from_checkpoint_batch(objects);
2367 let mutation_len = mutations.len();
2368 let deletion_len = deletions.len();
2369
2370 let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2371 let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2372 let mutation_futures = mutation_chunks
2373 .into_iter()
2374 .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2375 let deletion_futures = deletion_chunks
2376 .into_iter()
2377 .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2378 futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2379 .await
2380 .map_err(|e| {
2381 tracing::error!("failed to join futures for persisting objects: {e}");
2382 IndexerError::from(e)
2383 })?
2384 .into_iter()
2385 .collect::<Result<Vec<_>, _>>()
2386 .map_err(|e| {
2387 IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2388 })?;
2389
2390 let elapsed = guard.stop_and_record();
2391 info!(
2392 elapsed,
2393 "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2394 );
2395 Ok(())
2396 }
2397
2398 async fn update_status_for_checkpoint_transactions(
2399 &self,
2400 tx_order: Vec<CheckpointTxGlobalOrder>,
2401 ) -> Result<(), IndexerError> {
2402 let guard = self
2403 .metrics
2404 .checkpoint_db_commit_latency_tx_insertion_order
2405 .start_timer();
2406 let len = tx_order.len();
2407
2408 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2409 let futures = chunks.into_iter().map(|c| {
2410 self.spawn_blocking_task(move |this| {
2411 this.update_status_for_checkpoint_transactions_chunk(c)
2412 })
2413 });
2414
2415 futures::future::try_join_all(futures)
2416 .await
2417 .map_err(|e| {
2418 tracing::error!(
2419 "failed to join update_status_for_checkpoint_transactions_chunk futures: {e}",
2420 );
2421 IndexerError::from(e)
2422 })?
2423 .into_iter()
2424 .collect::<Result<Vec<_>, _>>()
2425 .map_err(|e| {
2426 IndexerError::PostgresWrite(format!(
2427 "Failed to update all `tx_global_order` chunks: {e:?}",
2428 ))
2429 })?;
2430 let elapsed = guard.stop_and_record();
2431 info!(
2432 elapsed,
2433 "Updated index status for {len} txs insertion orders"
2434 );
2435 Ok(())
2436 }
2437
2438 async fn persist_tx_global_order(
2439 &self,
2440 tx_order: Vec<CheckpointTxGlobalOrder>,
2441 ) -> Result<(), IndexerError> {
2442 let guard = self
2443 .metrics
2444 .checkpoint_db_commit_latency_tx_insertion_order
2445 .start_timer();
2446 let len = tx_order.len();
2447
2448 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2449 let futures = chunks
2450 .into_iter()
2451 .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2452
2453 futures::future::try_join_all(futures)
2454 .await
2455 .map_err(|e| {
2456 tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2457 IndexerError::from(e)
2458 })?
2459 .into_iter()
2460 .collect::<Result<Vec<_>, _>>()
2461 .map_err(|e| {
2462 IndexerError::PostgresWrite(format!(
2463 "Failed to persist all txs insertion order chunks: {e:?}",
2464 ))
2465 })?;
2466 let elapsed = guard.stop_and_record();
2467 info!(elapsed, "Persisted {len} txs insertion orders");
2468 Ok(())
2469 }
2470
2471 async fn update_watermarks_lower_bound(
2472 &self,
2473 watermarks: Vec<(PrunableTable, u64)>,
2474 ) -> Result<(), IndexerError> {
2475 self.execute_in_blocking_worker(move |this| this.update_watermarks_lower_bound(watermarks))
2476 .await
2477 }
2478
2479 async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2480 self.execute_in_blocking_worker(move |this| this.get_watermarks())
2481 .await
2482 }
2483}
2484
2485fn make_objects_history_to_commit(
2486 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2487) -> Vec<StoredHistoryObject> {
2488 let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2489 .clone()
2490 .into_iter()
2491 .flat_map(|changes| changes.deleted_objects)
2492 .map(|o| o.into())
2493 .collect();
2494 let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2495 .into_iter()
2496 .flat_map(|changes| changes.changed_objects)
2497 .map(|o| o.into())
2498 .collect();
2499 deleted_objects.into_iter().chain(mutated_objects).collect()
2500}
2501
2502fn retain_latest_indexed_objects(
2508 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2509) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2510 use std::collections::HashMap;
2511
2512 let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2513 let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2514
2515 for change in tx_object_changes {
2516 for mutation in change.changed_objects {
2520 let id = mutation.object.id();
2521 let version = mutation.object.version();
2522
2523 if let Some(existing) = deletions.remove(&id) {
2524 assert!(
2525 existing.object_version < version.value(),
2526 "mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2527 existing.object_version
2528 );
2529 }
2530
2531 if let Some(existing) = mutations.insert(id, mutation) {
2532 assert!(
2533 existing.object.version() < version,
2534 "mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2535 existing.object.version()
2536 );
2537 }
2538 }
2539 for deletion in change.deleted_objects {
2541 let id = deletion.object_id;
2542 let version = deletion.object_version;
2543
2544 if let Some(existing) = mutations.remove(&id) {
2545 assert!(
2546 existing.object.version().value() < version,
2547 "deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2548 existing.object.version(),
2549 );
2550 }
2551
2552 if let Some(existing) = deletions.insert(id, deletion) {
2553 assert!(
2554 existing.object_version < version,
2555 "deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2556 existing.object_version
2557 );
2558 }
2559 }
2560 }
2561
2562 (
2563 mutations.into_values().collect(),
2564 deletions.into_values().collect(),
2565 )
2566}