iota_indexer/store/
pg_indexer_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use core::result::Result::Ok;
6use std::{any::Any as StdAny, collections::BTreeMap, time::Duration};
7
8use async_trait::async_trait;
9use diesel::{
10    ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
11    dsl::{max, min},
12    sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
13    upsert::excluded,
14};
15use downcast::Any;
16use iota_protocol_config::ProtocolConfig;
17use iota_types::{
18    base_types::ObjectID,
19    digests::{ChainIdentifier, CheckpointDigest},
20};
21use itertools::Itertools;
22use tap::TapFallible;
23use tracing::info;
24
25use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
26use crate::{
27    db::ConnectionPool,
28    errors::{Context, IndexerError, IndexerResult},
29    ingestion::{
30        common::prepare::{
31            CheckpointObjectChanges, LiveObject, RemovedObject,
32            retain_latest_objects_from_checkpoint_batch,
33        },
34        primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
35    },
36    insert_or_ignore_into,
37    metrics::IndexerMetrics,
38    models::{
39        checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
40        display::StoredDisplay,
41        epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
42        events::StoredEvent,
43        obj_indices::StoredObjectVersion,
44        objects::{
45            StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
46            StoredObjects,
47        },
48        packages::StoredPackage,
49        transactions::{
50            CheckpointTxGlobalOrder, IndexStatus, OptimisticTransaction, StoredTransaction,
51        },
52        tx_indices::TxIndexSplit,
53    },
54    on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
55    persist_chunk_into_table_in_existing_connection, read_only_blocking,
56    schema::{
57        chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
58        event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
59        event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
60        objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
61        transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
62        tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
63        tx_wrapped_or_deleted_objects,
64    },
65    store::IndexerStore,
66    transactional_blocking_with_retry,
67    types::{
68        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
69        IndexedPackage, IndexedTransaction, TxIndex,
70    },
71};
72
73/// A cursor representing the global order position of transaction according to
74/// tx_global_order table
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub struct TxGlobalOrderCursor {
77    pub global_sequence_number: i64,
78    pub optimistic_sequence_number: i64,
79}
80
81#[macro_export]
82macro_rules! chunk {
83    ($data: expr, $size: expr) => {{
84        $data
85            .into_iter()
86            .chunks($size)
87            .into_iter()
88            .map(|c| c.collect())
89            .collect::<Vec<Vec<_>>>()
90    }};
91}
92
93macro_rules! prune_tx_or_event_indice_table {
94    ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
95        diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
96            .execute($conn)
97            .map_err(IndexerError::from)
98            .context($context_msg)?;
99    };
100}
101
102// In one DB transaction, the update could be chunked into
103// a few statements, this is the amount of rows to update in one statement
104// TODO: I think with the `per_db_tx` params, `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX`
105// is now less relevant. We should do experiments and remove it if it's true.
106const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
107// The amount of rows to update in one DB transaction
108const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
109// The amount of rows to update in one DB transaction, for objects particularly
110// Having this number too high may cause many db deadlocks because of
111// optimistic locking.
112const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
113const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
114
115#[derive(Clone)]
116pub struct PgIndexerStoreConfig {
117    pub parallel_chunk_size: usize,
118    pub parallel_objects_chunk_size: usize,
119}
120
121pub struct PgIndexerStore {
122    blocking_cp: ConnectionPool,
123    metrics: IndexerMetrics,
124    partition_manager: PgPartitionManager,
125    config: PgIndexerStoreConfig,
126}
127
128impl Clone for PgIndexerStore {
129    fn clone(&self) -> PgIndexerStore {
130        Self {
131            blocking_cp: self.blocking_cp.clone(),
132            metrics: self.metrics.clone(),
133            partition_manager: self.partition_manager.clone(),
134            config: self.config.clone(),
135        }
136    }
137}
138
139impl PgIndexerStore {
140    pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
141        let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
142            .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
143            .parse::<usize>()
144            .unwrap();
145        let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
146            .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
147            .parse::<usize>()
148            .unwrap();
149        let partition_manager = PgPartitionManager::new(blocking_cp.clone())
150            .expect("failed to initialize partition manager");
151        let config = PgIndexerStoreConfig {
152            parallel_chunk_size,
153            parallel_objects_chunk_size,
154        };
155
156        Self {
157            blocking_cp,
158            metrics,
159            partition_manager,
160            config,
161        }
162    }
163
164    pub fn get_metrics(&self) -> IndexerMetrics {
165        self.metrics.clone()
166    }
167
168    pub fn blocking_cp(&self) -> ConnectionPool {
169        self.blocking_cp.clone()
170    }
171
172    pub(crate) async fn get_latest_epoch_id_in_blocking_worker(
173        &self,
174    ) -> Result<Option<u64>, IndexerError> {
175        self.execute_in_blocking_worker(move |this| this.get_latest_epoch_id())
176            .await
177    }
178
179    pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
180        read_only_blocking!(&self.blocking_cp, |conn| {
181            epochs::dsl::epochs
182                .select(max(epochs::epoch))
183                .first::<Option<i64>>(conn)
184                .map(|v| v.map(|v| v as u64))
185        })
186        .context("Failed reading latest epoch id from PostgresDB")
187    }
188
189    /// Get the range of the protocol versions that need to be indexed.
190    pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
191        // We start indexing from the next protocol version after the latest one stored
192        // in the db.
193        let start = read_only_blocking!(&self.blocking_cp, |conn| {
194            protocol_configs::dsl::protocol_configs
195                .select(max(protocol_configs::protocol_version))
196                .first::<Option<i64>>(conn)
197        })
198        .context("Failed reading latest protocol version from PostgresDB")?
199        .map_or(1, |v| v + 1);
200
201        // We end indexing at the protocol version of the latest epoch stored in the db.
202        let end = read_only_blocking!(&self.blocking_cp, |conn| {
203            epochs::dsl::epochs
204                .select(max(epochs::protocol_version))
205                .first::<Option<i64>>(conn)
206        })
207        .context("Failed reading latest epoch protocol version from PostgresDB")?
208        .unwrap_or(1);
209        Ok((start, end))
210    }
211
212    pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
213        read_only_blocking!(&self.blocking_cp, |conn| {
214            chain_identifier::dsl::chain_identifier
215                .select(chain_identifier::checkpoint_digest)
216                .first::<Vec<u8>>(conn)
217                .optional()
218        })
219        .context("Failed reading chain id from PostgresDB")
220    }
221
222    fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
223        read_only_blocking!(&self.blocking_cp, |conn| {
224            checkpoints::dsl::checkpoints
225                .select(max(checkpoints::sequence_number))
226                .first::<Option<i64>>(conn)
227                .map(|v| v.map(|v| v as u64))
228        })
229        .context("Failed reading latest checkpoint sequence number from PostgresDB")
230    }
231
232    fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
233        read_only_blocking!(&self.blocking_cp, |conn| {
234            checkpoints::dsl::checkpoints
235                .select((
236                    min(checkpoints::sequence_number),
237                    max(checkpoints::sequence_number),
238                ))
239                .first::<(Option<i64>, Option<i64>)>(conn)
240                .map(|(min, max)| {
241                    (
242                        min.unwrap_or_default() as u64,
243                        max.unwrap_or_default() as u64,
244                    )
245                })
246        })
247        .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
248    }
249
250    fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
251        read_only_blocking!(&self.blocking_cp, |conn| {
252            epochs::dsl::epochs
253                .select((min(epochs::epoch), max(epochs::epoch)))
254                .first::<(Option<i64>, Option<i64>)>(conn)
255                .map(|(min, max)| {
256                    (
257                        min.unwrap_or_default() as u64,
258                        max.unwrap_or_default() as u64,
259                    )
260                })
261        })
262        .context("Failed reading min and max epoch numbers from PostgresDB")
263    }
264
265    fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
266        read_only_blocking!(&self.blocking_cp, |conn| {
267            pruner_cp_watermark::dsl::pruner_cp_watermark
268                .select(min(pruner_cp_watermark::checkpoint_sequence_number))
269                .first::<Option<i64>>(conn)
270                .map(|v| v.unwrap_or_default() as u64)
271        })
272        .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
273    }
274
275    fn get_checkpoint_range_for_epoch(
276        &self,
277        epoch: u64,
278    ) -> Result<(u64, Option<u64>), IndexerError> {
279        read_only_blocking!(&self.blocking_cp, |conn| {
280            epochs::dsl::epochs
281                .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
282                .filter(epochs::epoch.eq(epoch as i64))
283                .first::<(i64, Option<i64>)>(conn)
284                .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
285        })
286        .context(
287            format!("failed reading checkpoint range from PostgresDB for epoch {epoch}").as_str(),
288        )
289    }
290
291    fn get_transaction_range_for_checkpoint(
292        &self,
293        checkpoint: u64,
294    ) -> Result<(u64, u64), IndexerError> {
295        read_only_blocking!(&self.blocking_cp, |conn| {
296            pruner_cp_watermark::dsl::pruner_cp_watermark
297                .select((
298                    pruner_cp_watermark::min_tx_sequence_number,
299                    pruner_cp_watermark::max_tx_sequence_number,
300                ))
301                .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
302                .first::<(i64, i64)>(conn)
303                .map(|(min, max)| (min as u64, max as u64))
304        })
305        .context(
306            format!("failed reading transaction range from PostgresDB for checkpoint {checkpoint}")
307                .as_str(),
308        )
309    }
310
311    pub(crate) async fn get_global_order_for_tx_seq_in_blocking_worker(
312        &self,
313        tx_seq: i64,
314    ) -> Result<TxGlobalOrderCursor, IndexerError> {
315        self.execute_in_blocking_worker(move |this| this.get_global_order_for_tx_seq(tx_seq))
316            .await
317    }
318
319    fn get_global_order_for_tx_seq(
320        &self,
321        tx_seq: i64,
322    ) -> Result<TxGlobalOrderCursor, IndexerError> {
323        let result = read_only_blocking!(&self.blocking_cp, |conn| {
324            tx_global_order::dsl::tx_global_order
325                .select((
326                    tx_global_order::global_sequence_number,
327                    tx_global_order::optimistic_sequence_number,
328                ))
329                .filter(tx_global_order::chk_tx_sequence_number.eq(tx_seq))
330                .first::<(i64, i64)>(conn)
331        })
332        .context(
333            format!("failed reading global sequence number from PostgresDB for tx seq {tx_seq}")
334                .as_str(),
335        )?;
336        let (global_sequence_number, optimistic_sequence_number) = result;
337        Ok(TxGlobalOrderCursor {
338            global_sequence_number,
339            optimistic_sequence_number,
340        })
341    }
342
343    pub(crate) async fn prune_optimistic_transactions_up_to_in_blocking_worker(
344        &self,
345        to: TxGlobalOrderCursor,
346        limit: i64,
347    ) -> IndexerResult<usize> {
348        self.execute_in_blocking_worker(move |this| {
349            this.prune_optimistic_transactions_up_to(to, limit)
350        })
351        .await
352    }
353
354    fn prune_optimistic_transactions_up_to(
355        &self,
356        to: TxGlobalOrderCursor,
357        limit: i64,
358    ) -> IndexerResult<usize> {
359        transactional_blocking_with_retry!(
360            &self.blocking_cp,
361            |conn| {
362                let sql = r#"
363                    WITH ids_to_delete AS (
364                         SELECT global_sequence_number, optimistic_sequence_number
365                         FROM optimistic_transactions
366                         WHERE (global_sequence_number, optimistic_sequence_number) <= ($1, $2)
367                         ORDER BY global_sequence_number, optimistic_sequence_number
368                         FOR UPDATE LIMIT $3
369                     )
370                     DELETE FROM optimistic_transactions otx
371                     USING ids_to_delete
372                     WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
373                           (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
374                "#;
375                diesel::sql_query(sql)
376                    .bind::<BigInt, _>(to.global_sequence_number)
377                    .bind::<BigInt, _>(to.optimistic_sequence_number)
378                    .bind::<BigInt, _>(limit)
379                    .execute(conn)
380                    .map_err(IndexerError::from)
381                    .context(
382                        format!("failed to prune optimistic_transactions table to {to:?} with limit {limit}").as_str(),
383                    )
384            },
385            PG_DB_COMMIT_SLEEP_DURATION
386        )
387    }
388
389    fn get_latest_object_snapshot_checkpoint_sequence_number(
390        &self,
391    ) -> Result<Option<u64>, IndexerError> {
392        read_only_blocking!(&self.blocking_cp, |conn| {
393            objects_snapshot::dsl::objects_snapshot
394                .select(max(objects_snapshot::checkpoint_sequence_number))
395                .first::<Option<i64>>(conn)
396                .map(|v| v.map(|v| v as u64))
397        })
398        .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
399    }
400
401    fn persist_display_updates(
402        &self,
403        display_updates: BTreeMap<String, StoredDisplay>,
404    ) -> Result<(), IndexerError> {
405        transactional_blocking_with_retry!(
406            &self.blocking_cp,
407            {
408                let value = display_updates.values().collect::<Vec<_>>();
409                |conn| self.persist_displays_in_existing_transaction(conn, value)
410            },
411            PG_DB_COMMIT_SLEEP_DURATION
412        )?;
413
414        Ok(())
415    }
416
417    fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
418        let guard = self
419            .metrics
420            .checkpoint_db_commit_latency_objects_chunks
421            .start_timer();
422        let len = objects.len();
423        let raw_query = r#"
424            INSERT INTO objects (
425                object_id,
426                object_version,
427                object_digest,
428                owner_type,
429                owner_id,
430                object_type,
431                object_type_package,
432                object_type_module,
433                object_type_name,
434                serialized_object,
435                coin_type,
436                coin_balance,
437                df_kind
438            )
439            SELECT
440                u.object_id,
441                u.object_version,
442                u.object_digest,
443                u.owner_type,
444                u.owner_id,
445                u.object_type,
446                u.object_type_package,
447                u.object_type_module,
448                u.object_type_name,
449                u.serialized_object,
450                u.coin_type,
451                u.coin_balance,
452                u.df_kind
453            FROM UNNEST(
454                $1::BYTEA[],
455                $2::BIGINT[],
456                $3::BYTEA[],
457                $4::SMALLINT[],
458                $5::BYTEA[],
459                $6::TEXT[],
460                $7::BYTEA[],
461                $8::TEXT[],
462                $9::TEXT[],
463                $10::BYTEA[],
464                $11::TEXT[],
465                $12::BIGINT[],
466                $13::SMALLINT[],
467                $14::BYTEA[]
468            ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest)
469            LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
470            WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
471            ON CONFLICT (object_id) DO UPDATE
472            SET
473                object_version = EXCLUDED.object_version,
474                object_digest = EXCLUDED.object_digest,
475                owner_type = EXCLUDED.owner_type,
476                owner_id = EXCLUDED.owner_id,
477                object_type = EXCLUDED.object_type,
478                object_type_package = EXCLUDED.object_type_package,
479                object_type_module = EXCLUDED.object_type_module,
480                object_type_name = EXCLUDED.object_type_name,
481                serialized_object = EXCLUDED.serialized_object,
482                coin_type = EXCLUDED.coin_type,
483                coin_balance = EXCLUDED.coin_balance,
484                df_kind = EXCLUDED.df_kind
485        "#;
486        let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
487            .into_iter()
488            .map(LiveObject::split)
489            .map(|(indexed_object, tx_digest)| {
490                (
491                    StoredObject::from(indexed_object),
492                    tx_digest.into_inner().to_vec(),
493                )
494            })
495            .unzip();
496        let query = diesel::sql_query(raw_query)
497            .bind::<Array<Bytea>, _>(objects.object_ids)
498            .bind::<Array<BigInt>, _>(objects.object_versions)
499            .bind::<Array<Bytea>, _>(objects.object_digests)
500            .bind::<Array<SmallInt>, _>(objects.owner_types)
501            .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
502            .bind::<Array<Nullable<Text>>, _>(objects.object_types)
503            .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
504            .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
505            .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
506            .bind::<Array<Bytea>, _>(objects.serialized_objects)
507            .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
508            .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
509            .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
510            .bind::<Array<Bytea>, _>(tx_digests);
511        transactional_blocking_with_retry!(
512            &self.blocking_cp,
513            |conn| {
514                query.clone().execute(conn)?;
515                Ok::<(), IndexerError>(())
516            },
517            PG_DB_COMMIT_SLEEP_DURATION
518        )
519        .tap_ok(|_| {
520            let elapsed = guard.stop_and_record();
521            info!(elapsed, "Persisted {len} chunked objects");
522        })
523        .tap_err(|e| {
524            tracing::error!("failed to persist object mutations with error: {e}");
525        })
526    }
527
528    fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
529        let guard = self
530            .metrics
531            .checkpoint_db_commit_latency_objects_chunks
532            .start_timer();
533        let len = objects.len();
534        let raw_query = r#"
535            DELETE FROM objects
536            WHERE object_id IN (
537                SELECT u.object_id
538                FROM UNNEST(
539                    $1::BYTEA[],
540                    $2::BYTEA[]
541                ) AS u(object_id, tx_digest)
542                LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
543                WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
544            )
545        "#;
546        let (object_ids, tx_digests): (Vec<_>, Vec<_>) = objects
547            .into_iter()
548            .map(|removed_object| {
549                (
550                    removed_object.object_id().to_vec(),
551                    removed_object.transaction_digest.into_inner().to_vec(),
552                )
553            })
554            .unzip();
555        let query = diesel::sql_query(raw_query)
556            .bind::<Array<Bytea>, _>(object_ids)
557            .bind::<Array<Bytea>, _>(tx_digests);
558        transactional_blocking_with_retry!(
559            &self.blocking_cp,
560            |conn| {
561                query.clone().execute(conn)?;
562                Ok::<(), IndexerError>(())
563            },
564            PG_DB_COMMIT_SLEEP_DURATION
565        )
566        .tap_ok(|_| {
567            let elapsed = guard.stop_and_record();
568            info!(elapsed, "Deleted {len} chunked objects");
569        })
570        .tap_err(|e| {
571            tracing::error!("failed to persist object deletions with error: {e}");
572        })
573    }
574
575    fn persist_object_mutation_chunk_in_existing_transaction(
576        &self,
577        conn: &mut PgConnection,
578        mutated_object_mutation_chunk: Vec<StoredObject>,
579    ) -> Result<(), IndexerError> {
580        on_conflict_do_update!(
581            objects::table,
582            mutated_object_mutation_chunk,
583            objects::object_id,
584            (
585                objects::object_id.eq(excluded(objects::object_id)),
586                objects::object_version.eq(excluded(objects::object_version)),
587                objects::object_digest.eq(excluded(objects::object_digest)),
588                objects::owner_type.eq(excluded(objects::owner_type)),
589                objects::owner_id.eq(excluded(objects::owner_id)),
590                objects::object_type.eq(excluded(objects::object_type)),
591                objects::serialized_object.eq(excluded(objects::serialized_object)),
592                objects::coin_type.eq(excluded(objects::coin_type)),
593                objects::coin_balance.eq(excluded(objects::coin_balance)),
594                objects::df_kind.eq(excluded(objects::df_kind)),
595            ),
596            conn
597        );
598        Ok::<(), IndexerError>(())
599    }
600
601    fn persist_object_deletion_chunk_in_existing_transaction(
602        &self,
603        conn: &mut PgConnection,
604        deleted_objects_chunk: Vec<StoredDeletedObject>,
605    ) -> Result<(), IndexerError> {
606        diesel::delete(
607            objects::table.filter(
608                objects::object_id.eq_any(
609                    deleted_objects_chunk
610                        .iter()
611                        .map(|o| o.object_id.clone())
612                        .collect::<Vec<_>>(),
613                ),
614            ),
615        )
616        .execute(conn)
617        .map_err(IndexerError::from)
618        .context("Failed to write object deletion to PostgresDB")?;
619
620        Ok::<(), IndexerError>(())
621    }
622
623    fn backfill_objects_snapshot_chunk(
624        &self,
625        objects_snapshot: Vec<StoredObjectSnapshot>,
626    ) -> Result<(), IndexerError> {
627        let guard = self
628            .metrics
629            .checkpoint_db_commit_latency_objects_snapshot_chunks
630            .start_timer();
631        transactional_blocking_with_retry!(
632            &self.blocking_cp,
633            |conn| {
634                for objects_snapshot_chunk in
635                    objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
636                {
637                    on_conflict_do_update!(
638                        objects_snapshot::table,
639                        objects_snapshot_chunk,
640                        objects_snapshot::object_id,
641                        (
642                            objects_snapshot::object_version
643                                .eq(excluded(objects_snapshot::object_version)),
644                            objects_snapshot::object_status
645                                .eq(excluded(objects_snapshot::object_status)),
646                            objects_snapshot::object_digest
647                                .eq(excluded(objects_snapshot::object_digest)),
648                            objects_snapshot::checkpoint_sequence_number
649                                .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
650                            objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
651                            objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
652                            objects_snapshot::object_type_package
653                                .eq(excluded(objects_snapshot::object_type_package)),
654                            objects_snapshot::object_type_module
655                                .eq(excluded(objects_snapshot::object_type_module)),
656                            objects_snapshot::object_type_name
657                                .eq(excluded(objects_snapshot::object_type_name)),
658                            objects_snapshot::object_type
659                                .eq(excluded(objects_snapshot::object_type)),
660                            objects_snapshot::serialized_object
661                                .eq(excluded(objects_snapshot::serialized_object)),
662                            objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
663                            objects_snapshot::coin_balance
664                                .eq(excluded(objects_snapshot::coin_balance)),
665                            objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
666                        ),
667                        conn
668                    );
669                }
670                Ok::<(), IndexerError>(())
671            },
672            PG_DB_COMMIT_SLEEP_DURATION
673        )
674        .tap_ok(|_| {
675            let elapsed = guard.stop_and_record();
676            info!(
677                elapsed,
678                "Persisted {} chunked objects snapshot",
679                objects_snapshot.len(),
680            );
681        })
682        .tap_err(|e| {
683            tracing::error!("failed to persist object snapshot with error: {e}");
684        })
685    }
686
687    fn persist_objects_history_chunk(
688        &self,
689        stored_objects_history: Vec<StoredHistoryObject>,
690    ) -> Result<(), IndexerError> {
691        let guard = self
692            .metrics
693            .checkpoint_db_commit_latency_objects_history_chunks
694            .start_timer();
695        transactional_blocking_with_retry!(
696            &self.blocking_cp,
697            |conn| {
698                for stored_objects_history_chunk in
699                    stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
700                {
701                    insert_or_ignore_into!(
702                        objects_history::table,
703                        stored_objects_history_chunk,
704                        conn
705                    );
706                }
707                Ok::<(), IndexerError>(())
708            },
709            PG_DB_COMMIT_SLEEP_DURATION
710        )
711        .tap_ok(|_| {
712            let elapsed = guard.stop_and_record();
713            info!(
714                elapsed,
715                "Persisted {} chunked objects history",
716                stored_objects_history.len(),
717            );
718        })
719        .tap_err(|e| {
720            tracing::error!("failed to persist object history with error: {e}");
721        })
722    }
723
724    fn persist_object_version_chunk(
725        &self,
726        object_versions: Vec<StoredObjectVersion>,
727    ) -> Result<(), IndexerError> {
728        let guard = self
729            .metrics
730            .checkpoint_db_commit_latency_objects_version_chunks
731            .start_timer();
732
733        transactional_blocking_with_retry!(
734            &self.blocking_cp,
735            |conn| {
736                for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
737                {
738                    insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
739                }
740                Ok::<(), IndexerError>(())
741            },
742            PG_DB_COMMIT_SLEEP_DURATION
743        )
744        .tap_ok(|_| {
745            let elapsed = guard.stop_and_record();
746            info!(
747                elapsed,
748                "Persisted {} chunked object versions",
749                object_versions.len(),
750            );
751        })
752        .tap_err(|e| {
753            tracing::error!("failed to persist object versions with error: {e}");
754        })
755    }
756
757    fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
758        let Some(first_checkpoint) = checkpoints.first() else {
759            return Ok(());
760        };
761
762        // If the first checkpoint has sequence number 0, we need to persist the digest
763        // as chain identifier.
764        if first_checkpoint.sequence_number == 0 {
765            let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
766            self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
767            transactional_blocking_with_retry!(
768                &self.blocking_cp,
769                |conn| {
770                    let checkpoint_digest =
771                        first_checkpoint.checkpoint_digest.into_inner().to_vec();
772                    insert_or_ignore_into!(
773                        chain_identifier::table,
774                        StoredChainIdentifier { checkpoint_digest },
775                        conn
776                    );
777                    Ok::<(), IndexerError>(())
778                },
779                PG_DB_COMMIT_SLEEP_DURATION
780            )?;
781        }
782        let guard = self
783            .metrics
784            .checkpoint_db_commit_latency_checkpoints
785            .start_timer();
786
787        let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
788        transactional_blocking_with_retry!(
789            &self.blocking_cp,
790            |conn| {
791                for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
792                    insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
793                }
794                Ok::<(), IndexerError>(())
795            },
796            PG_DB_COMMIT_SLEEP_DURATION
797        )
798        .tap_ok(|_| {
799            info!(
800                "Persisted {} pruner_cp_watermark rows.",
801                stored_cp_txs.len(),
802            );
803        })
804        .tap_err(|e| {
805            tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
806        })?;
807
808        let stored_checkpoints = checkpoints
809            .iter()
810            .map(StoredCheckpoint::from)
811            .collect::<Vec<_>>();
812        transactional_blocking_with_retry!(
813            &self.blocking_cp,
814            |conn| {
815                for stored_checkpoint_chunk in
816                    stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
817                {
818                    insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
819                    let time_now_ms = chrono::Utc::now().timestamp_millis();
820                    for stored_checkpoint in stored_checkpoint_chunk {
821                        self.metrics
822                            .db_commit_lag_ms
823                            .set(time_now_ms - stored_checkpoint.timestamp_ms);
824                        self.metrics.max_committed_checkpoint_sequence_number.set(
825                            stored_checkpoint.sequence_number,
826                        );
827                        self.metrics.committed_checkpoint_timestamp_ms.set(
828                            stored_checkpoint.timestamp_ms,
829                        );
830                    }
831                    for stored_checkpoint in stored_checkpoint_chunk {
832                        info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
833                    }
834                }
835                Ok::<(), IndexerError>(())
836            },
837            PG_DB_COMMIT_SLEEP_DURATION
838        )
839        .tap_ok(|_| {
840            let elapsed = guard.stop_and_record();
841            info!(
842                elapsed,
843                "Persisted {} checkpoints",
844                stored_checkpoints.len()
845            );
846        })
847        .tap_err(|e| {
848            tracing::error!("failed to persist checkpoints with error: {e}");
849        })
850    }
851
852    fn persist_transactions_chunk(
853        &self,
854        transactions: Vec<IndexedTransaction>,
855    ) -> Result<(), IndexerError> {
856        let guard = self
857            .metrics
858            .checkpoint_db_commit_latency_transactions_chunks
859            .start_timer();
860        let transformation_guard = self
861            .metrics
862            .checkpoint_db_commit_latency_transactions_chunks_transformation
863            .start_timer();
864        let transactions = transactions
865            .iter()
866            .map(StoredTransaction::from)
867            .collect::<Vec<_>>();
868        drop(transformation_guard);
869
870        transactional_blocking_with_retry!(
871            &self.blocking_cp,
872            |conn| {
873                for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
874                    insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
875                }
876                Ok::<(), IndexerError>(())
877            },
878            PG_DB_COMMIT_SLEEP_DURATION
879        )
880        .tap_ok(|_| {
881            let elapsed = guard.stop_and_record();
882            info!(
883                elapsed,
884                "Persisted {} chunked transactions",
885                transactions.len()
886            );
887        })
888        .tap_err(|e| {
889            tracing::error!("failed to persist transactions with error: {e}");
890        })
891    }
892
893    fn persist_tx_global_order_chunk(
894        &self,
895        tx_order: Vec<CheckpointTxGlobalOrder>,
896    ) -> Result<(), IndexerError> {
897        let guard = self
898            .metrics
899            .checkpoint_db_commit_latency_tx_insertion_order_chunks
900            .start_timer();
901
902        transactional_blocking_with_retry!(
903            &self.blocking_cp,
904            |conn| {
905                for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
906                    insert_or_ignore_into!(tx_global_order::table, tx_order_chunk, conn);
907                }
908                Ok::<(), IndexerError>(())
909            },
910            PG_DB_COMMIT_SLEEP_DURATION
911        )
912        .tap_ok(|_| {
913            let elapsed = guard.stop_and_record();
914            info!(
915                elapsed,
916                "Persisted {} chunked txs insertion order",
917                tx_order.len()
918            );
919        })
920        .tap_err(|e| {
921            tracing::error!("failed to persist txs insertion order with error: {e}");
922        })
923    }
924
925    /// We enforce index-status semantics for checkpointed transactions
926    /// in `tx_global_order`.
927    ///
928    /// Namely, checkpointed transactions (i.e. with `optimistic_sequence_number
929    /// == 0`) are updated to `optimistic_sequence_number == -1` to indicate
930    /// that they have been persisted in the database.
931    fn update_status_for_checkpoint_transactions_chunk(
932        &self,
933        tx_order: Vec<CheckpointTxGlobalOrder>,
934    ) -> Result<(), IndexerError> {
935        let guard = self
936            .metrics
937            .checkpoint_db_commit_latency_tx_insertion_order_chunks
938            .start_timer();
939
940        let num_transactions = tx_order.len();
941        transactional_blocking_with_retry!(
942            &self.blocking_cp,
943            |conn| {
944                on_conflict_do_update_with_condition!(
945                    tx_global_order::table,
946                    tx_order.clone(),
947                    tx_global_order::tx_digest,
948                    tx_global_order::optimistic_sequence_number.eq(IndexStatus::Completed),
949                    tx_global_order::optimistic_sequence_number.eq(IndexStatus::Started),
950                    conn
951                );
952                on_conflict_do_update_with_condition!(
953                    tx_global_order::table,
954                    tx_order.clone(),
955                    tx_global_order::tx_digest,
956                    tx_global_order::chk_tx_sequence_number
957                        .eq(excluded(tx_global_order::chk_tx_sequence_number)),
958                    tx_global_order::chk_tx_sequence_number.is_null(),
959                    conn
960                );
961                Ok::<(), IndexerError>(())
962            },
963            PG_DB_COMMIT_SLEEP_DURATION
964        )
965        .tap_ok(|_| {
966            let elapsed = guard.stop_and_record();
967            info!(
968                elapsed,
969                "Updated {} chunked values of `tx_global_order`", num_transactions
970            );
971        })
972        .tap_err(|e| {
973            tracing::error!("failed to update `tx_global_order` with error: {e}");
974        })
975    }
976
977    fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
978        let guard = self
979            .metrics
980            .checkpoint_db_commit_latency_events_chunks
981            .start_timer();
982        let len = events.len();
983        let events = events
984            .into_iter()
985            .map(StoredEvent::from)
986            .collect::<Vec<_>>();
987
988        transactional_blocking_with_retry!(
989            &self.blocking_cp,
990            |conn| {
991                for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
992                    insert_or_ignore_into!(events::table, event_chunk, conn);
993                }
994                Ok::<(), IndexerError>(())
995            },
996            PG_DB_COMMIT_SLEEP_DURATION
997        )
998        .tap_ok(|_| {
999            let elapsed = guard.stop_and_record();
1000            info!(elapsed, "Persisted {} chunked events", len);
1001        })
1002        .tap_err(|e| {
1003            tracing::error!("failed to persist events with error: {e}");
1004        })
1005    }
1006
1007    fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1008        if packages.is_empty() {
1009            return Ok(());
1010        }
1011        let guard = self
1012            .metrics
1013            .checkpoint_db_commit_latency_packages
1014            .start_timer();
1015        let packages = packages
1016            .into_iter()
1017            .map(StoredPackage::from)
1018            .collect::<Vec<_>>();
1019        transactional_blocking_with_retry!(
1020            &self.blocking_cp,
1021            |conn| {
1022                for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1023                    on_conflict_do_update!(
1024                        packages::table,
1025                        packages_chunk,
1026                        packages::package_id,
1027                        (
1028                            packages::package_id.eq(excluded(packages::package_id)),
1029                            packages::move_package.eq(excluded(packages::move_package)),
1030                        ),
1031                        conn
1032                    );
1033                }
1034                Ok::<(), IndexerError>(())
1035            },
1036            PG_DB_COMMIT_SLEEP_DURATION
1037        )
1038        .tap_ok(|_| {
1039            let elapsed = guard.stop_and_record();
1040            info!(elapsed, "Persisted {} packages", packages.len());
1041        })
1042        .tap_err(|e| {
1043            tracing::error!("failed to persist packages with error: {e}");
1044        })
1045    }
1046
1047    async fn persist_event_indices_chunk(
1048        &self,
1049        indices: Vec<EventIndex>,
1050    ) -> Result<(), IndexerError> {
1051        let guard = self
1052            .metrics
1053            .checkpoint_db_commit_latency_event_indices_chunks
1054            .start_timer();
1055        let len = indices.len();
1056        let (
1057            event_emit_packages,
1058            event_emit_modules,
1059            event_senders,
1060            event_struct_packages,
1061            event_struct_modules,
1062            event_struct_names,
1063            event_struct_instantiations,
1064        ) = indices.into_iter().map(|i| i.split()).fold(
1065            (
1066                Vec::new(),
1067                Vec::new(),
1068                Vec::new(),
1069                Vec::new(),
1070                Vec::new(),
1071                Vec::new(),
1072                Vec::new(),
1073            ),
1074            |(
1075                mut event_emit_packages,
1076                mut event_emit_modules,
1077                mut event_senders,
1078                mut event_struct_packages,
1079                mut event_struct_modules,
1080                mut event_struct_names,
1081                mut event_struct_instantiations,
1082            ),
1083             index| {
1084                event_emit_packages.push(index.0);
1085                event_emit_modules.push(index.1);
1086                event_senders.push(index.2);
1087                event_struct_packages.push(index.3);
1088                event_struct_modules.push(index.4);
1089                event_struct_names.push(index.5);
1090                event_struct_instantiations.push(index.6);
1091                (
1092                    event_emit_packages,
1093                    event_emit_modules,
1094                    event_senders,
1095                    event_struct_packages,
1096                    event_struct_modules,
1097                    event_struct_names,
1098                    event_struct_instantiations,
1099                )
1100            },
1101        );
1102
1103        // Now persist all the event indices in parallel into their tables.
1104        let mut futures = vec![];
1105        futures.push(self.spawn_blocking_task(move |this| {
1106            persist_chunk_into_table!(
1107                event_emit_package::table,
1108                event_emit_packages,
1109                &this.blocking_cp
1110            )
1111        }));
1112
1113        futures.push(self.spawn_blocking_task(move |this| {
1114            persist_chunk_into_table!(
1115                event_emit_module::table,
1116                event_emit_modules,
1117                &this.blocking_cp
1118            )
1119        }));
1120
1121        futures.push(self.spawn_blocking_task(move |this| {
1122            persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
1123        }));
1124
1125        futures.push(self.spawn_blocking_task(move |this| {
1126            persist_chunk_into_table!(
1127                event_struct_package::table,
1128                event_struct_packages,
1129                &this.blocking_cp
1130            )
1131        }));
1132
1133        futures.push(self.spawn_blocking_task(move |this| {
1134            persist_chunk_into_table!(
1135                event_struct_module::table,
1136                event_struct_modules,
1137                &this.blocking_cp
1138            )
1139        }));
1140
1141        futures.push(self.spawn_blocking_task(move |this| {
1142            persist_chunk_into_table!(
1143                event_struct_name::table,
1144                event_struct_names,
1145                &this.blocking_cp
1146            )
1147        }));
1148
1149        futures.push(self.spawn_blocking_task(move |this| {
1150            persist_chunk_into_table!(
1151                event_struct_instantiation::table,
1152                event_struct_instantiations,
1153                &this.blocking_cp
1154            )
1155        }));
1156
1157        futures::future::try_join_all(futures)
1158            .await
1159            .map_err(|e| {
1160                tracing::error!("failed to join event indices futures in a chunk: {e}");
1161                IndexerError::from(e)
1162            })?
1163            .into_iter()
1164            .collect::<Result<Vec<_>, _>>()
1165            .map_err(|e| {
1166                IndexerError::PostgresWrite(format!(
1167                    "Failed to persist all event indices in a chunk: {e:?}"
1168                ))
1169            })?;
1170        let elapsed = guard.stop_and_record();
1171        info!(elapsed, "Persisted {} chunked event indices", len);
1172        Ok(())
1173    }
1174
1175    async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1176        let guard = self
1177            .metrics
1178            .checkpoint_db_commit_latency_tx_indices_chunks
1179            .start_timer();
1180        let len = indices.len();
1181
1182        let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
1183
1184        let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1185        let recipients: Vec<_> = splits
1186            .iter()
1187            .flat_map(|ix| ix.tx_recipients.clone())
1188            .collect();
1189        let input_objects: Vec<_> = splits
1190            .iter()
1191            .flat_map(|ix| ix.tx_input_objects.clone())
1192            .collect();
1193        let changed_objects: Vec<_> = splits
1194            .iter()
1195            .flat_map(|ix| ix.tx_changed_objects.clone())
1196            .collect();
1197        let wrapped_or_deleted_objects: Vec<_> = splits
1198            .iter()
1199            .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1200            .collect();
1201        let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1202        let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1203        let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1204        let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1205        let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1206
1207        let futures = [
1208            self.spawn_blocking_task(move |this| {
1209                persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1210            }),
1211            self.spawn_blocking_task(move |this| {
1212                persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1213            }),
1214            self.spawn_blocking_task(move |this| {
1215                persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1216            }),
1217            self.spawn_blocking_task(move |this| {
1218                persist_chunk_into_table!(
1219                    tx_changed_objects::table,
1220                    changed_objects,
1221                    &this.blocking_cp
1222                )
1223            }),
1224            self.spawn_blocking_task(move |this| {
1225                persist_chunk_into_table!(
1226                    tx_wrapped_or_deleted_objects::table,
1227                    wrapped_or_deleted_objects,
1228                    &this.blocking_cp
1229                )
1230            }),
1231            self.spawn_blocking_task(move |this| {
1232                persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1233            }),
1234            self.spawn_blocking_task(move |this| {
1235                persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1236            }),
1237            self.spawn_blocking_task(move |this| {
1238                persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1239            }),
1240            self.spawn_blocking_task(move |this| {
1241                persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1242            }),
1243            self.spawn_blocking_task(move |this| {
1244                persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1245            }),
1246        ];
1247
1248        futures::future::try_join_all(futures)
1249            .await
1250            .map_err(|e| {
1251                tracing::error!("failed to join tx indices futures in a chunk: {e}");
1252                IndexerError::from(e)
1253            })?
1254            .into_iter()
1255            .collect::<Result<Vec<_>, _>>()
1256            .map_err(|e| {
1257                IndexerError::PostgresWrite(format!(
1258                    "Failed to persist all tx indices in a chunk: {e:?}"
1259                ))
1260            })?;
1261        let elapsed = guard.stop_and_record();
1262        info!(elapsed, "Persisted {} chunked tx_indices", len);
1263        Ok(())
1264    }
1265
1266    fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1267        let guard = self
1268            .metrics
1269            .checkpoint_db_commit_latency_epoch
1270            .start_timer();
1271        let epoch_id = epoch.new_epoch.epoch;
1272
1273        transactional_blocking_with_retry!(
1274            &self.blocking_cp,
1275            |conn| {
1276                if let Some(last_epoch) = &epoch.last_epoch {
1277                    info!(last_epoch.epoch, "Persisting epoch end data.");
1278                    diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1279                        .set(last_epoch)
1280                        .execute(conn)?;
1281                }
1282
1283                info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1284                insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1285                Ok::<(), IndexerError>(())
1286            },
1287            PG_DB_COMMIT_SLEEP_DURATION
1288        )
1289        .tap_ok(|_| {
1290            let elapsed = guard.stop_and_record();
1291            info!(elapsed, epoch_id, "Persisted epoch beginning info");
1292        })
1293        .tap_err(|e| {
1294            tracing::error!("failed to persist epoch with error: {e}");
1295        })
1296    }
1297
1298    fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1299        let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1300        // partition_0 has been created, so no need to advance it.
1301        if let Some(last_epoch_id) = last_epoch_id {
1302            let last_db_epoch: Option<StoredEpochInfo> =
1303                read_only_blocking!(&self.blocking_cp, |conn| {
1304                    epochs::table
1305                        .filter(epochs::epoch.eq(last_epoch_id))
1306                        .first::<StoredEpochInfo>(conn)
1307                        .optional()
1308                })
1309                .context("Failed to read last epoch from PostgresDB")?;
1310            if let Some(last_epoch) = last_db_epoch {
1311                let epoch_partition_data =
1312                    EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1313                let table_partitions = self.partition_manager.get_table_partitions()?;
1314                for (table, (_, last_partition)) in table_partitions {
1315                    // Only advance epoch partition for epoch partitioned tables.
1316                    if !self
1317                        .partition_manager
1318                        .get_strategy(&table)
1319                        .is_epoch_partitioned()
1320                    {
1321                        continue;
1322                    }
1323                    let guard = self.metrics.advance_epoch_latency.start_timer();
1324                    self.partition_manager.advance_epoch(
1325                        table.clone(),
1326                        last_partition,
1327                        &epoch_partition_data,
1328                    )?;
1329                    let elapsed = guard.stop_and_record();
1330                    info!(
1331                        elapsed,
1332                        "Advanced epoch partition {} for table {}",
1333                        last_partition,
1334                        table.clone()
1335                    );
1336                }
1337            } else {
1338                tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1339            }
1340        }
1341
1342        Ok(())
1343    }
1344
1345    fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1346        transactional_blocking_with_retry!(
1347            &self.blocking_cp,
1348            |conn| {
1349                diesel::delete(
1350                    checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1351                )
1352                .execute(conn)
1353                .map_err(IndexerError::from)
1354                .context("Failed to prune checkpoints table")?;
1355
1356                Ok::<(), IndexerError>(())
1357            },
1358            PG_DB_COMMIT_SLEEP_DURATION
1359        )
1360    }
1361
1362    fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1363        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1364        transactional_blocking_with_retry!(
1365            &self.blocking_cp,
1366            |conn| {
1367                prune_tx_or_event_indice_table!(
1368                    event_emit_module,
1369                    conn,
1370                    min_tx,
1371                    max_tx,
1372                    "Failed to prune event_emit_module table"
1373                );
1374                prune_tx_or_event_indice_table!(
1375                    event_emit_package,
1376                    conn,
1377                    min_tx,
1378                    max_tx,
1379                    "Failed to prune event_emit_package table"
1380                );
1381                prune_tx_or_event_indice_table![
1382                    event_senders,
1383                    conn,
1384                    min_tx,
1385                    max_tx,
1386                    "Failed to prune event_senders table"
1387                ];
1388                prune_tx_or_event_indice_table![
1389                    event_struct_instantiation,
1390                    conn,
1391                    min_tx,
1392                    max_tx,
1393                    "Failed to prune event_struct_instantiation table"
1394                ];
1395                prune_tx_or_event_indice_table![
1396                    event_struct_module,
1397                    conn,
1398                    min_tx,
1399                    max_tx,
1400                    "Failed to prune event_struct_module table"
1401                ];
1402                prune_tx_or_event_indice_table![
1403                    event_struct_name,
1404                    conn,
1405                    min_tx,
1406                    max_tx,
1407                    "Failed to prune event_struct_name table"
1408                ];
1409                prune_tx_or_event_indice_table![
1410                    event_struct_package,
1411                    conn,
1412                    min_tx,
1413                    max_tx,
1414                    "Failed to prune event_struct_package table"
1415                ];
1416                Ok::<(), IndexerError>(())
1417            },
1418            PG_DB_COMMIT_SLEEP_DURATION
1419        )
1420    }
1421
1422    fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1423        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1424        transactional_blocking_with_retry!(
1425            &self.blocking_cp,
1426            |conn| {
1427                prune_tx_or_event_indice_table!(
1428                    tx_senders,
1429                    conn,
1430                    min_tx,
1431                    max_tx,
1432                    "Failed to prune tx_senders table"
1433                );
1434                prune_tx_or_event_indice_table!(
1435                    tx_recipients,
1436                    conn,
1437                    min_tx,
1438                    max_tx,
1439                    "Failed to prune tx_recipients table"
1440                );
1441                prune_tx_or_event_indice_table![
1442                    tx_input_objects,
1443                    conn,
1444                    min_tx,
1445                    max_tx,
1446                    "Failed to prune tx_input_objects table"
1447                ];
1448                prune_tx_or_event_indice_table![
1449                    tx_changed_objects,
1450                    conn,
1451                    min_tx,
1452                    max_tx,
1453                    "Failed to prune tx_changed_objects table"
1454                ];
1455                prune_tx_or_event_indice_table![
1456                    tx_wrapped_or_deleted_objects,
1457                    conn,
1458                    min_tx,
1459                    max_tx,
1460                    "Failed to prune tx_wrapped_or_deleted_objects table"
1461                ];
1462                prune_tx_or_event_indice_table![
1463                    tx_calls_pkg,
1464                    conn,
1465                    min_tx,
1466                    max_tx,
1467                    "Failed to prune tx_calls_pkg table"
1468                ];
1469                prune_tx_or_event_indice_table![
1470                    tx_calls_mod,
1471                    conn,
1472                    min_tx,
1473                    max_tx,
1474                    "Failed to prune tx_calls_mod table"
1475                ];
1476                prune_tx_or_event_indice_table![
1477                    tx_calls_fun,
1478                    conn,
1479                    min_tx,
1480                    max_tx,
1481                    "Failed to prune tx_calls_fun table"
1482                ];
1483                prune_tx_or_event_indice_table![
1484                    tx_digests,
1485                    conn,
1486                    min_tx,
1487                    max_tx,
1488                    "Failed to prune tx_digests table"
1489                ];
1490                Ok::<(), IndexerError>(())
1491            },
1492            PG_DB_COMMIT_SLEEP_DURATION
1493        )
1494    }
1495
1496    fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1497        transactional_blocking_with_retry!(
1498            &self.blocking_cp,
1499            |conn| {
1500                diesel::delete(
1501                    pruner_cp_watermark::table
1502                        .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1503                )
1504                .execute(conn)
1505                .map_err(IndexerError::from)
1506                .context("Failed to prune pruner_cp_watermark table")?;
1507                Ok::<(), IndexerError>(())
1508            },
1509            PG_DB_COMMIT_SLEEP_DURATION
1510        )
1511    }
1512
1513    fn get_network_total_transactions_by_end_of_epoch(
1514        &self,
1515        epoch: u64,
1516    ) -> Result<Option<u64>, IndexerError> {
1517        read_only_blocking!(&self.blocking_cp, |conn| {
1518            epochs::table
1519                .filter(epochs::epoch.eq(epoch as i64))
1520                .select(epochs::network_total_transactions)
1521                .get_result::<Option<i64>>(conn)
1522        })
1523        .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1524        .map(|option| option.map(|v| v as u64))
1525    }
1526
1527    fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1528        transactional_blocking_with_retry!(
1529            &self.blocking_cp,
1530            |conn| {
1531                diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1532                    .execute(conn)?;
1533                Ok::<(), IndexerError>(())
1534            },
1535            PG_DB_COMMIT_SLEEP_DURATION
1536        )
1537        .tap_ok(|_| {
1538            info!("Successfully refreshed participation_metrics");
1539        })
1540        .tap_err(|e| {
1541            tracing::error!("failed to refresh participation_metrics: {e}");
1542        })
1543    }
1544
1545    async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1546    where
1547        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1548        R: Send + 'static,
1549    {
1550        let this = self.clone();
1551        let current_span = tracing::Span::current();
1552        tokio::task::spawn_blocking(move || {
1553            let _guard = current_span.enter();
1554            f(this)
1555        })
1556        .await
1557        .map_err(Into::into)
1558        .and_then(std::convert::identity)
1559    }
1560
1561    fn spawn_blocking_task<F, R>(
1562        &self,
1563        f: F,
1564    ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1565    where
1566        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1567        R: Send + 'static,
1568    {
1569        let this = self.clone();
1570        let current_span = tracing::Span::current();
1571        let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1572        tokio::task::spawn_blocking(move || {
1573            let _guard = current_span.enter();
1574            let _elapsed = guard.stop_and_record();
1575            f(this)
1576        })
1577    }
1578
1579    fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1580    where
1581        F: FnOnce(Self) -> Fut + Send + 'static,
1582        Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1583        R: Send + 'static,
1584    {
1585        let this = self.clone();
1586        tokio::task::spawn(async move { f(this).await })
1587    }
1588}
1589
1590#[async_trait]
1591impl IndexerStore for PgIndexerStore {
1592    async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1593        self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1594            .await
1595    }
1596
1597    async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1598        self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1599            .await
1600    }
1601
1602    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1603        self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1604            .await
1605    }
1606
1607    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1608        self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1609            .await
1610    }
1611
1612    async fn get_latest_object_snapshot_checkpoint_sequence_number(
1613        &self,
1614    ) -> Result<Option<u64>, IndexerError> {
1615        self.execute_in_blocking_worker(|this| {
1616            this.get_latest_object_snapshot_checkpoint_sequence_number()
1617        })
1618        .await
1619    }
1620
1621    fn persist_objects_in_existing_transaction(
1622        &self,
1623        conn: &mut PgConnection,
1624        object_changes: Vec<TransactionObjectChangesToCommit>,
1625    ) -> Result<(), IndexerError> {
1626        if object_changes.is_empty() {
1627            return Ok(());
1628        }
1629
1630        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1631        let object_mutations = indexed_mutations
1632            .into_iter()
1633            .map(StoredObject::from)
1634            .collect::<Vec<_>>();
1635        let object_deletions = indexed_deletions
1636            .into_iter()
1637            .map(StoredDeletedObject::from)
1638            .collect::<Vec<_>>();
1639
1640        self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1641        self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1642
1643        Ok(())
1644    }
1645
1646    async fn persist_objects_snapshot(
1647        &self,
1648        object_changes: Vec<TransactionObjectChangesToCommit>,
1649    ) -> Result<(), IndexerError> {
1650        if object_changes.is_empty() {
1651            return Ok(());
1652        }
1653        let guard = self
1654            .metrics
1655            .checkpoint_db_commit_latency_objects_snapshot
1656            .start_timer();
1657        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1658        let objects_snapshot = indexed_mutations
1659            .into_iter()
1660            .map(StoredObjectSnapshot::from)
1661            .chain(
1662                indexed_deletions
1663                    .into_iter()
1664                    .map(StoredObjectSnapshot::from),
1665            )
1666            .collect::<Vec<_>>();
1667        let len = objects_snapshot.len();
1668        let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1669        let futures = chunks
1670            .into_iter()
1671            .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1672
1673        futures::future::try_join_all(futures)
1674            .await
1675            .map_err(|e| {
1676                tracing::error!("failed to join backfill_objects_snapshot_chunk futures: {e}");
1677                IndexerError::from(e)
1678            })?
1679            .into_iter()
1680            .collect::<Result<Vec<_>, _>>()
1681            .map_err(|e| {
1682                IndexerError::PostgresWrite(format!(
1683                    "Failed to persist all objects snapshot chunks: {e:?}"
1684                ))
1685            })?;
1686        let elapsed = guard.stop_and_record();
1687        info!(elapsed, "Persisted {} objects snapshot", len);
1688        Ok(())
1689    }
1690
1691    async fn persist_object_history(
1692        &self,
1693        object_changes: Vec<TransactionObjectChangesToCommit>,
1694    ) -> Result<(), IndexerError> {
1695        let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1696            .map(|val| val.eq_ignore_ascii_case("true"))
1697            .unwrap_or(false);
1698        if skip_history {
1699            info!("skipping object history");
1700            return Ok(());
1701        }
1702
1703        if object_changes.is_empty() {
1704            return Ok(());
1705        }
1706        let objects = make_objects_history_to_commit(object_changes);
1707        let guard = self
1708            .metrics
1709            .checkpoint_db_commit_latency_objects_history
1710            .start_timer();
1711
1712        let len = objects.len();
1713        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1714        let futures = chunks
1715            .into_iter()
1716            .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1717
1718        futures::future::try_join_all(futures)
1719            .await
1720            .map_err(|e| {
1721                tracing::error!("failed to join persist_objects_history_chunk futures: {e}");
1722                IndexerError::from(e)
1723            })?
1724            .into_iter()
1725            .collect::<Result<Vec<_>, _>>()
1726            .map_err(|e| {
1727                IndexerError::PostgresWrite(format!(
1728                    "Failed to persist all objects history chunks: {e:?}"
1729                ))
1730            })?;
1731        let elapsed = guard.stop_and_record();
1732        info!(elapsed, "Persisted {} objects history", len);
1733        Ok(())
1734    }
1735
1736    async fn persist_object_versions(
1737        &self,
1738        object_versions: Vec<StoredObjectVersion>,
1739    ) -> Result<(), IndexerError> {
1740        if object_versions.is_empty() {
1741            return Ok(());
1742        }
1743
1744        let guard = self
1745            .metrics
1746            .checkpoint_db_commit_latency_objects_version
1747            .start_timer();
1748
1749        let object_versions_count = object_versions.len();
1750
1751        let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1752        let futures = chunks
1753            .into_iter()
1754            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1755            .collect::<Vec<_>>();
1756
1757        futures::future::try_join_all(futures)
1758            .await
1759            .map_err(|e| {
1760                tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1761                IndexerError::from(e)
1762            })?
1763            .into_iter()
1764            .collect::<Result<Vec<_>, _>>()
1765            .map_err(|e| {
1766                IndexerError::PostgresWrite(format!(
1767                    "Failed to persist all objects version chunks: {e:?}"
1768                ))
1769            })?;
1770        let elapsed = guard.stop_and_record();
1771        info!(elapsed, "Persisted {object_versions_count} object versions");
1772        Ok(())
1773    }
1774
1775    async fn persist_checkpoints(
1776        &self,
1777        checkpoints: Vec<IndexedCheckpoint>,
1778    ) -> Result<(), IndexerError> {
1779        self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1780            .await
1781    }
1782
1783    async fn persist_transactions(
1784        &self,
1785        transactions: Vec<IndexedTransaction>,
1786    ) -> Result<(), IndexerError> {
1787        let guard = self
1788            .metrics
1789            .checkpoint_db_commit_latency_transactions
1790            .start_timer();
1791        let len = transactions.len();
1792
1793        let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1794        let futures = chunks
1795            .into_iter()
1796            .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1797
1798        futures::future::try_join_all(futures)
1799            .await
1800            .map_err(|e| {
1801                tracing::error!("failed to join persist_transactions_chunk futures: {e}");
1802                IndexerError::from(e)
1803            })?
1804            .into_iter()
1805            .collect::<Result<Vec<_>, _>>()
1806            .map_err(|e| {
1807                IndexerError::PostgresWrite(format!(
1808                    "Failed to persist all transactions chunks: {e:?}"
1809                ))
1810            })?;
1811        let elapsed = guard.stop_and_record();
1812        info!(elapsed, "Persisted {} transactions", len);
1813        Ok(())
1814    }
1815
1816    fn persist_optimistic_transaction_in_existing_transaction(
1817        &self,
1818        conn: &mut PgConnection,
1819        transaction: OptimisticTransaction,
1820    ) -> Result<(), IndexerError> {
1821        insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1822        Ok(())
1823    }
1824
1825    async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1826        if events.is_empty() {
1827            return Ok(());
1828        }
1829        let len = events.len();
1830        let guard = self
1831            .metrics
1832            .checkpoint_db_commit_latency_events
1833            .start_timer();
1834        let chunks = chunk!(events, self.config.parallel_chunk_size);
1835        let futures = chunks
1836            .into_iter()
1837            .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1838
1839        futures::future::try_join_all(futures)
1840            .await
1841            .map_err(|e| {
1842                tracing::error!("failed to join persist_events_chunk futures: {e}");
1843                IndexerError::from(e)
1844            })?
1845            .into_iter()
1846            .collect::<Result<Vec<_>, _>>()
1847            .map_err(|e| {
1848                IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
1849            })?;
1850        let elapsed = guard.stop_and_record();
1851        info!(elapsed, "Persisted {} events", len);
1852        Ok(())
1853    }
1854
1855    async fn persist_displays(
1856        &self,
1857        display_updates: BTreeMap<String, StoredDisplay>,
1858    ) -> Result<(), IndexerError> {
1859        if display_updates.is_empty() {
1860            return Ok(());
1861        }
1862
1863        self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
1864            .await?
1865    }
1866
1867    fn persist_displays_in_existing_transaction(
1868        &self,
1869        conn: &mut PgConnection,
1870        display_updates: Vec<&StoredDisplay>,
1871    ) -> Result<(), IndexerError> {
1872        if display_updates.is_empty() {
1873            return Ok(());
1874        }
1875
1876        on_conflict_do_update_with_condition!(
1877            display::table,
1878            display_updates,
1879            display::object_type,
1880            (
1881                display::id.eq(excluded(display::id)),
1882                display::version.eq(excluded(display::version)),
1883                display::bcs.eq(excluded(display::bcs)),
1884            ),
1885            excluded(display::version).gt(display::version),
1886            conn
1887        );
1888
1889        Ok(())
1890    }
1891
1892    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1893        if packages.is_empty() {
1894            return Ok(());
1895        }
1896        self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
1897            .await
1898    }
1899
1900    async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
1901        if indices.is_empty() {
1902            return Ok(());
1903        }
1904        let len = indices.len();
1905        let guard = self
1906            .metrics
1907            .checkpoint_db_commit_latency_event_indices
1908            .start_timer();
1909        let chunks = chunk!(indices, self.config.parallel_chunk_size);
1910
1911        let futures = chunks.into_iter().map(|chunk| {
1912            self.spawn_task(move |this: Self| async move {
1913                this.persist_event_indices_chunk(chunk).await
1914            })
1915        });
1916
1917        futures::future::try_join_all(futures)
1918            .await
1919            .map_err(|e| {
1920                tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
1921                IndexerError::from(e)
1922            })?
1923            .into_iter()
1924            .collect::<Result<Vec<_>, _>>()
1925            .map_err(|e| {
1926                IndexerError::PostgresWrite(format!(
1927                    "Failed to persist all event_indices chunks: {e:?}"
1928                ))
1929            })?;
1930        let elapsed = guard.stop_and_record();
1931        info!(elapsed, "Persisted {} event_indices chunks", len);
1932        Ok(())
1933    }
1934
1935    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1936        self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
1937            .await
1938    }
1939
1940    async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1941        self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
1942            .await
1943    }
1944
1945    async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
1946        let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
1947            (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
1948            _ => Err(IndexerError::PostgresRead(format!(
1949                "Failed to get checkpoint range for epoch {epoch}"
1950            ))),
1951        }?;
1952
1953        // NOTE: for disaster recovery, min_cp is the min cp of the current epoch, which
1954        // is likely partially pruned already. min_prunable_cp is the min cp to
1955        // be pruned. By std::cmp::max, we will resume the pruning process from
1956        // the next checkpoint, instead of the first cp of the current epoch.
1957        let min_prunable_cp = self.get_min_prunable_checkpoint()?;
1958        min_cp = std::cmp::max(min_cp, min_prunable_cp);
1959        for cp in min_cp..=max_cp {
1960            // NOTE: the order of pruning tables is crucial:
1961            // 1. prune checkpoints table, checkpoints table is the source table of
1962            //    available range,
1963            // we prune it first to make sure that we always have full data for checkpoints
1964            // within the available range;
1965            // 2. then prune tx_* tables;
1966            // 3. then prune pruner_cp_watermark table, which is the checkpoint pruning
1967            //    watermark table and also tx seq source
1968            // of a checkpoint to prune tx_* tables;
1969            // 4. lastly we prune epochs table when all checkpoints of the epoch have been
1970            //    pruned.
1971            info!(
1972                "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
1973                cp, epoch, min_prunable_cp
1974            );
1975            self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
1976                .await
1977                .unwrap_or_else(|e| {
1978                    tracing::error!("failed to prune checkpoint {cp}: {e}");
1979                });
1980
1981            let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
1982            self.execute_in_blocking_worker(move |this| {
1983                this.prune_tx_indices_table(min_tx, max_tx)
1984            })
1985            .await
1986            .unwrap_or_else(|e| {
1987                tracing::error!("failed to prune transactions for cp {cp}: {e}");
1988            });
1989            info!(
1990                "Pruned transactions for checkpoint {} from tx {} to tx {}",
1991                cp, min_tx, max_tx
1992            );
1993            self.execute_in_blocking_worker(move |this| {
1994                this.prune_event_indices_table(min_tx, max_tx)
1995            })
1996            .await
1997            .unwrap_or_else(|e| {
1998                tracing::error!("failed to prune events of transactions for cp {cp}: {e}");
1999            });
2000            info!(
2001                "Pruned events of transactions for checkpoint {cp} from tx {min_tx} to tx {max_tx}"
2002            );
2003            self.metrics.last_pruned_transaction.set(max_tx as i64);
2004
2005            self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2006                .await
2007                .unwrap_or_else(|e| {
2008                    tracing::error!("failed to prune pruner_cp_watermark table for cp {cp}: {e}");
2009                });
2010            info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2011            self.metrics.last_pruned_checkpoint.set(cp as i64);
2012        }
2013
2014        Ok(())
2015    }
2016
2017    async fn get_network_total_transactions_by_end_of_epoch(
2018        &self,
2019        epoch: u64,
2020    ) -> Result<Option<u64>, IndexerError> {
2021        self.execute_in_blocking_worker(move |this| {
2022            this.get_network_total_transactions_by_end_of_epoch(epoch)
2023        })
2024        .await
2025    }
2026
2027    async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2028        self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2029            .await
2030    }
2031
2032    fn as_any(&self) -> &dyn StdAny {
2033        self
2034    }
2035
2036    /// Persist protocol configs and feature flags until the protocol version
2037    /// for the latest epoch we have stored in the db, inclusive.
2038    fn persist_protocol_configs_and_feature_flags(
2039        &self,
2040        chain_id: Vec<u8>,
2041    ) -> Result<(), IndexerError> {
2042        let chain_id = ChainIdentifier::from(
2043            CheckpointDigest::try_from(chain_id).expect("unable to convert chain id"),
2044        );
2045
2046        let mut all_configs = vec![];
2047        let mut all_flags = vec![];
2048
2049        let (start_version, end_version) = self.get_protocol_version_index_range()?;
2050        info!(
2051            "Persisting protocol configs with start_version: {}, end_version: {}",
2052            start_version, end_version
2053        );
2054
2055        // Gather all protocol configs and feature flags for all versions between start
2056        // and end.
2057        for version in start_version..=end_version {
2058            let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2059                (version as u64).into(),
2060                chain_id.chain(),
2061            )
2062            .ok_or(IndexerError::Generic(format!(
2063                "Unable to fetch protocol version {} and chain {:?}",
2064                version,
2065                chain_id.chain()
2066            )))?;
2067            let configs_vec = protocol_configs
2068                .attr_map()
2069                .into_iter()
2070                .map(|(k, v)| StoredProtocolConfig {
2071                    protocol_version: version,
2072                    config_name: k,
2073                    config_value: v.map(|v| v.to_string()),
2074                })
2075                .collect::<Vec<_>>();
2076            all_configs.extend(configs_vec);
2077
2078            let feature_flags = protocol_configs
2079                .feature_map()
2080                .into_iter()
2081                .map(|(k, v)| StoredFeatureFlag {
2082                    protocol_version: version,
2083                    flag_name: k,
2084                    flag_value: v,
2085                })
2086                .collect::<Vec<_>>();
2087            all_flags.extend(feature_flags);
2088        }
2089
2090        transactional_blocking_with_retry!(
2091            &self.blocking_cp,
2092            |conn| {
2093                for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2094                    insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2095                }
2096                insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2097                Ok::<(), IndexerError>(())
2098            },
2099            PG_DB_COMMIT_SLEEP_DURATION
2100        )?;
2101        Ok(())
2102    }
2103
2104    async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2105        if indices.is_empty() {
2106            return Ok(());
2107        }
2108        let len = indices.len();
2109        let guard = self
2110            .metrics
2111            .checkpoint_db_commit_latency_tx_indices
2112            .start_timer();
2113        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2114
2115        let futures = chunks.into_iter().map(|chunk| {
2116            self.spawn_task(move |this: Self| async move {
2117                this.persist_tx_indices_chunk_v2(chunk).await
2118            })
2119        });
2120        futures::future::try_join_all(futures)
2121            .await
2122            .map_err(|e| {
2123                tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2124                IndexerError::from(e)
2125            })?
2126            .into_iter()
2127            .collect::<Result<Vec<_>, _>>()
2128            .map_err(|e| {
2129                IndexerError::PostgresWrite(format!(
2130                    "Failed to persist all tx_indices chunks: {e:?}"
2131                ))
2132            })?;
2133        let elapsed = guard.stop_and_record();
2134        info!(elapsed, "Persisted {} tx_indices chunks", len);
2135        Ok(())
2136    }
2137
2138    async fn persist_checkpoint_objects(
2139        &self,
2140        objects: Vec<CheckpointObjectChanges>,
2141    ) -> Result<(), IndexerError> {
2142        if objects.is_empty() {
2143            return Ok(());
2144        }
2145        let guard = self
2146            .metrics
2147            .checkpoint_db_commit_latency_objects
2148            .start_timer();
2149        let CheckpointObjectChanges {
2150            changed_objects: mutations,
2151            deleted_objects: deletions,
2152        } = retain_latest_objects_from_checkpoint_batch(objects);
2153        let mutation_len = mutations.len();
2154        let deletion_len = deletions.len();
2155
2156        let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2157        let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2158        let mutation_futures = mutation_chunks
2159            .into_iter()
2160            .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2161        let deletion_futures = deletion_chunks
2162            .into_iter()
2163            .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2164        futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2165            .await
2166            .map_err(|e| {
2167                tracing::error!("failed to join futures for persisting objects: {e}");
2168                IndexerError::from(e)
2169            })?
2170            .into_iter()
2171            .collect::<Result<Vec<_>, _>>()
2172            .map_err(|e| {
2173                IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2174            })?;
2175
2176        let elapsed = guard.stop_and_record();
2177        info!(
2178            elapsed,
2179            "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2180        );
2181        Ok(())
2182    }
2183
2184    async fn update_status_for_checkpoint_transactions(
2185        &self,
2186        tx_order: Vec<CheckpointTxGlobalOrder>,
2187    ) -> Result<(), IndexerError> {
2188        let guard = self
2189            .metrics
2190            .checkpoint_db_commit_latency_tx_insertion_order
2191            .start_timer();
2192        let len = tx_order.len();
2193
2194        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2195        let futures = chunks.into_iter().map(|c| {
2196            self.spawn_blocking_task(move |this| {
2197                this.update_status_for_checkpoint_transactions_chunk(c)
2198            })
2199        });
2200
2201        futures::future::try_join_all(futures)
2202            .await
2203            .map_err(|e| {
2204                tracing::error!(
2205                    "failed to join update_status_for_checkpoint_transactions_chunk futures: {e}",
2206                );
2207                IndexerError::from(e)
2208            })?
2209            .into_iter()
2210            .collect::<Result<Vec<_>, _>>()
2211            .map_err(|e| {
2212                IndexerError::PostgresWrite(format!(
2213                    "Failed to update all `tx_global_order` chunks: {e:?}",
2214                ))
2215            })?;
2216        let elapsed = guard.stop_and_record();
2217        info!(
2218            elapsed,
2219            "Updated index status for {len} txs insertion orders"
2220        );
2221        Ok(())
2222    }
2223
2224    async fn persist_tx_global_order(
2225        &self,
2226        tx_order: Vec<CheckpointTxGlobalOrder>,
2227    ) -> Result<(), IndexerError> {
2228        let guard = self
2229            .metrics
2230            .checkpoint_db_commit_latency_tx_insertion_order
2231            .start_timer();
2232        let len = tx_order.len();
2233
2234        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2235        let futures = chunks
2236            .into_iter()
2237            .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2238
2239        futures::future::try_join_all(futures)
2240            .await
2241            .map_err(|e| {
2242                tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2243                IndexerError::from(e)
2244            })?
2245            .into_iter()
2246            .collect::<Result<Vec<_>, _>>()
2247            .map_err(|e| {
2248                IndexerError::PostgresWrite(format!(
2249                    "Failed to persist all txs insertion order chunks: {e:?}",
2250                ))
2251            })?;
2252        let elapsed = guard.stop_and_record();
2253        info!(elapsed, "Persisted {len} txs insertion orders");
2254        Ok(())
2255    }
2256}
2257
2258fn make_objects_history_to_commit(
2259    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2260) -> Vec<StoredHistoryObject> {
2261    let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2262        .clone()
2263        .into_iter()
2264        .flat_map(|changes| changes.deleted_objects)
2265        .map(|o| o.into())
2266        .collect();
2267    let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2268        .into_iter()
2269        .flat_map(|changes| changes.changed_objects)
2270        .map(|o| o.into())
2271        .collect();
2272    deleted_objects.into_iter().chain(mutated_objects).collect()
2273}
2274
2275/// Partitions object changes into deletions and mutations.
2276///
2277/// Retains only the highest version of each object among deletions and
2278/// mutations. This allows concurrent insertion into the DB of the resulting
2279/// partitions.
2280fn retain_latest_indexed_objects(
2281    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2282) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2283    use std::collections::HashMap;
2284
2285    let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2286    let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2287
2288    for change in tx_object_changes {
2289        // Remove mutation / deletion with a following deletion / mutation,
2290        // as we expect that following deletion / mutation has a higher version.
2291        // Technically, assertions below are not required, double check just in case.
2292        for mutation in change.changed_objects {
2293            let id = mutation.object.id();
2294            let version = mutation.object.version();
2295
2296            if let Some(existing) = deletions.remove(&id) {
2297                assert!(
2298                    existing.object_version < version.value(),
2299                    "mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2300                    existing.object_version
2301                );
2302            }
2303
2304            if let Some(existing) = mutations.insert(id, mutation) {
2305                assert!(
2306                    existing.object.version() < version,
2307                    "mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2308                    existing.object.version()
2309                );
2310            }
2311        }
2312        // Handle deleted objects
2313        for deletion in change.deleted_objects {
2314            let id = deletion.object_id;
2315            let version = deletion.object_version;
2316
2317            if let Some(existing) = mutations.remove(&id) {
2318                assert!(
2319                    existing.object.version().value() < version,
2320                    "deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2321                    existing.object.version(),
2322                );
2323            }
2324
2325            if let Some(existing) = deletions.insert(id, deletion) {
2326                assert!(
2327                    existing.object_version < version,
2328                    "deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2329                    existing.object_version
2330                );
2331            }
2332        }
2333    }
2334
2335    (
2336        mutations.into_values().collect(),
2337        deletions.into_values().collect(),
2338    )
2339}