iota_indexer/store/
pg_indexer_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4use core::result::Result::Ok;
5use std::{
6    any::Any as StdAny,
7    collections::{BTreeMap, HashMap},
8    time::Duration,
9};
10
11use async_trait::async_trait;
12use diesel::{
13    ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
14    dsl::{max, min, sql},
15    sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
16    upsert::excluded,
17};
18use downcast::Any;
19use iota_protocol_config::ProtocolConfig;
20use iota_types::{
21    base_types::ObjectID,
22    digests::{ChainIdentifier, CheckpointDigest},
23    messages_checkpoint::CheckpointSequenceNumber,
24};
25use itertools::Itertools;
26use strum::IntoEnumIterator;
27use tap::TapFallible;
28use tracing::info;
29
30use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
31use crate::{
32    blocking_call_is_ok_or_panic,
33    db::ConnectionPool,
34    errors::{Context, IndexerError, IndexerResult},
35    ingestion::{
36        common::{
37            persist::{CommitterWatermark, ObjectsSnapshotHandlerTables},
38            prepare::{
39                CheckpointObjectChanges, LiveObject, RemovedObject,
40                retain_latest_objects_from_checkpoint_batch,
41            },
42        },
43        primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
44    },
45    insert_or_ignore_into,
46    metrics::IndexerMetrics,
47    models::{
48        checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
49        display::StoredDisplay,
50        epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
51        events::StoredEvent,
52        obj_indices::StoredObjectVersion,
53        objects::{
54            StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
55            StoredObjects,
56        },
57        packages::StoredPackage,
58        transactions::{
59            CheckpointTxGlobalOrder, IndexStatus, OptimisticTransaction, StoredTransaction,
60        },
61        tx_indices::TxIndexSplit,
62        watermarks::StoredWatermark,
63    },
64    on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
65    persist_chunk_into_table_in_existing_connection,
66    pruning::pruner::PrunableTable,
67    read_only_blocking, run_query, run_query_with_retry,
68    schema::{
69        chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
70        event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
71        event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
72        objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
73        transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
74        tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
75        tx_wrapped_or_deleted_objects, watermarks,
76    },
77    store::IndexerStore,
78    transactional_blocking_with_retry,
79    types::{
80        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
81        IndexedPackage, IndexedTransaction, TxIndex,
82    },
83};
84
85/// A cursor representing the global order position of transaction according to
86/// tx_global_order table
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub struct TxGlobalOrderCursor {
89    pub global_sequence_number: i64,
90    pub optimistic_sequence_number: i64,
91}
92
93#[macro_export]
94macro_rules! chunk {
95    ($data: expr, $size: expr) => {{
96        $data
97            .into_iter()
98            .chunks($size)
99            .into_iter()
100            .map(|c| c.collect())
101            .collect::<Vec<Vec<_>>>()
102    }};
103}
104
105macro_rules! prune_tx_or_event_indice_table {
106    ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
107        diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
108            .execute($conn)
109            .map_err(IndexerError::from)
110            .context($context_msg)?;
111    };
112}
113
114// In one DB transaction, the update could be chunked into
115// a few statements, this is the amount of rows to update in one statement
116// TODO: I think with the `per_db_tx` params, `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX`
117// is now less relevant. We should do experiments and remove it if it's true.
118const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
119// The amount of rows to update in one DB transaction
120const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
121// The amount of rows to update in one DB transaction, for objects particularly
122// Having this number too high may cause many db deadlocks because of
123// optimistic locking.
124const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
125const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
126
127#[derive(Clone)]
128pub struct PgIndexerStoreConfig {
129    pub parallel_chunk_size: usize,
130    pub parallel_objects_chunk_size: usize,
131}
132
133pub struct PgIndexerStore {
134    blocking_cp: ConnectionPool,
135    metrics: IndexerMetrics,
136    partition_manager: PgPartitionManager,
137    config: PgIndexerStoreConfig,
138}
139
140impl Clone for PgIndexerStore {
141    fn clone(&self) -> PgIndexerStore {
142        Self {
143            blocking_cp: self.blocking_cp.clone(),
144            metrics: self.metrics.clone(),
145            partition_manager: self.partition_manager.clone(),
146            config: self.config.clone(),
147        }
148    }
149}
150
151impl PgIndexerStore {
152    pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
153        let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
154            .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
155            .parse::<usize>()
156            .unwrap();
157        let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
158            .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
159            .parse::<usize>()
160            .unwrap();
161        let partition_manager = PgPartitionManager::new(blocking_cp.clone())
162            .expect("failed to initialize partition manager");
163        let config = PgIndexerStoreConfig {
164            parallel_chunk_size,
165            parallel_objects_chunk_size,
166        };
167
168        Self {
169            blocking_cp,
170            metrics,
171            partition_manager,
172            config,
173        }
174    }
175
176    pub fn get_metrics(&self) -> IndexerMetrics {
177        self.metrics.clone()
178    }
179
180    pub fn blocking_cp(&self) -> ConnectionPool {
181        self.blocking_cp.clone()
182    }
183
184    pub(crate) async fn get_latest_epoch_id_in_blocking_worker(
185        &self,
186    ) -> Result<Option<u64>, IndexerError> {
187        self.execute_in_blocking_worker(move |this| this.get_latest_epoch_id())
188            .await
189    }
190
191    pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
192        read_only_blocking!(&self.blocking_cp, |conn| {
193            epochs::dsl::epochs
194                .select(max(epochs::epoch))
195                .first::<Option<i64>>(conn)
196                .map(|v| v.map(|v| v as u64))
197        })
198        .context("Failed reading latest epoch id from PostgresDB")
199    }
200
201    /// Get the range of the protocol versions that need to be indexed.
202    pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
203        // We start indexing from the next protocol version after the latest one stored
204        // in the db.
205        let start = read_only_blocking!(&self.blocking_cp, |conn| {
206            protocol_configs::dsl::protocol_configs
207                .select(max(protocol_configs::protocol_version))
208                .first::<Option<i64>>(conn)
209        })
210        .context("Failed reading latest protocol version from PostgresDB")?
211        .map_or(1, |v| v + 1);
212
213        // We end indexing at the protocol version of the latest epoch stored in the db.
214        let end = read_only_blocking!(&self.blocking_cp, |conn| {
215            epochs::dsl::epochs
216                .select(max(epochs::protocol_version))
217                .first::<Option<i64>>(conn)
218        })
219        .context("Failed reading latest epoch protocol version from PostgresDB")?
220        .unwrap_or(1);
221        Ok((start, end))
222    }
223
224    pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
225        read_only_blocking!(&self.blocking_cp, |conn| {
226            chain_identifier::dsl::chain_identifier
227                .select(chain_identifier::checkpoint_digest)
228                .first::<Vec<u8>>(conn)
229                .optional()
230        })
231        .context("Failed reading chain id from PostgresDB")
232    }
233
234    fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
235        read_only_blocking!(&self.blocking_cp, |conn| {
236            checkpoints::dsl::checkpoints
237                .select(max(checkpoints::sequence_number))
238                .first::<Option<i64>>(conn)
239                .map(|v| v.map(|v| v as u64))
240        })
241        .context("Failed reading latest checkpoint sequence number from PostgresDB")
242    }
243
244    fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
245        read_only_blocking!(&self.blocking_cp, |conn| {
246            checkpoints::dsl::checkpoints
247                .select((
248                    min(checkpoints::sequence_number),
249                    max(checkpoints::sequence_number),
250                ))
251                .first::<(Option<i64>, Option<i64>)>(conn)
252                .map(|(min, max)| {
253                    (
254                        min.unwrap_or_default() as u64,
255                        max.unwrap_or_default() as u64,
256                    )
257                })
258        })
259        .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
260    }
261
262    fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
263        read_only_blocking!(&self.blocking_cp, |conn| {
264            epochs::dsl::epochs
265                .select((min(epochs::epoch), max(epochs::epoch)))
266                .first::<(Option<i64>, Option<i64>)>(conn)
267                .map(|(min, max)| {
268                    (
269                        min.unwrap_or_default() as u64,
270                        max.unwrap_or_default() as u64,
271                    )
272                })
273        })
274        .context("Failed reading min and max epoch numbers from PostgresDB")
275    }
276
277    fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
278        read_only_blocking!(&self.blocking_cp, |conn| {
279            pruner_cp_watermark::dsl::pruner_cp_watermark
280                .select(min(pruner_cp_watermark::checkpoint_sequence_number))
281                .first::<Option<i64>>(conn)
282                .map(|v| v.unwrap_or_default() as u64)
283        })
284        .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
285    }
286
287    fn get_checkpoint_range_for_epoch(
288        &self,
289        epoch: u64,
290    ) -> Result<(u64, Option<u64>), IndexerError> {
291        read_only_blocking!(&self.blocking_cp, |conn| {
292            epochs::dsl::epochs
293                .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
294                .filter(epochs::epoch.eq(epoch as i64))
295                .first::<(i64, Option<i64>)>(conn)
296                .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
297        })
298        .context(
299            format!("failed reading checkpoint range from PostgresDB for epoch {epoch}").as_str(),
300        )
301    }
302
303    fn get_transaction_range_for_checkpoint(
304        &self,
305        checkpoint: u64,
306    ) -> Result<(u64, u64), IndexerError> {
307        read_only_blocking!(&self.blocking_cp, |conn| {
308            pruner_cp_watermark::dsl::pruner_cp_watermark
309                .select((
310                    pruner_cp_watermark::min_tx_sequence_number,
311                    pruner_cp_watermark::max_tx_sequence_number,
312                ))
313                .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
314                .first::<(i64, i64)>(conn)
315                .map(|(min, max)| (min as u64, max as u64))
316        })
317        .context(
318            format!("failed reading transaction range from PostgresDB for checkpoint {checkpoint}")
319                .as_str(),
320        )
321    }
322
323    pub(crate) async fn get_global_order_for_tx_seq_in_blocking_worker(
324        &self,
325        tx_seq: i64,
326    ) -> Result<TxGlobalOrderCursor, IndexerError> {
327        self.execute_in_blocking_worker(move |this| this.get_global_order_for_tx_seq(tx_seq))
328            .await
329    }
330
331    fn get_global_order_for_tx_seq(
332        &self,
333        tx_seq: i64,
334    ) -> Result<TxGlobalOrderCursor, IndexerError> {
335        let result = read_only_blocking!(&self.blocking_cp, |conn| {
336            tx_global_order::dsl::tx_global_order
337                .select((
338                    tx_global_order::global_sequence_number,
339                    tx_global_order::optimistic_sequence_number,
340                ))
341                .filter(tx_global_order::chk_tx_sequence_number.eq(tx_seq))
342                .first::<(i64, i64)>(conn)
343        })
344        .context(
345            format!("failed reading global sequence number from PostgresDB for tx seq {tx_seq}")
346                .as_str(),
347        )?;
348        let (global_sequence_number, optimistic_sequence_number) = result;
349        Ok(TxGlobalOrderCursor {
350            global_sequence_number,
351            optimistic_sequence_number,
352        })
353    }
354
355    pub(crate) async fn prune_optimistic_transactions_up_to_in_blocking_worker(
356        &self,
357        to: TxGlobalOrderCursor,
358        limit: i64,
359    ) -> IndexerResult<usize> {
360        self.execute_in_blocking_worker(move |this| {
361            this.prune_optimistic_transactions_up_to(to, limit)
362        })
363        .await
364    }
365
366    fn prune_optimistic_transactions_up_to(
367        &self,
368        to: TxGlobalOrderCursor,
369        limit: i64,
370    ) -> IndexerResult<usize> {
371        transactional_blocking_with_retry!(
372            &self.blocking_cp,
373            |conn| {
374                let sql = r#"
375                    WITH ids_to_delete AS (
376                         SELECT global_sequence_number, optimistic_sequence_number
377                         FROM optimistic_transactions
378                         WHERE (global_sequence_number, optimistic_sequence_number) <= ($1, $2)
379                         ORDER BY global_sequence_number, optimistic_sequence_number
380                         FOR UPDATE LIMIT $3
381                     )
382                     DELETE FROM optimistic_transactions otx
383                     USING ids_to_delete
384                     WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
385                           (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
386                "#;
387                diesel::sql_query(sql)
388                    .bind::<BigInt, _>(to.global_sequence_number)
389                    .bind::<BigInt, _>(to.optimistic_sequence_number)
390                    .bind::<BigInt, _>(limit)
391                    .execute(conn)
392                    .map_err(IndexerError::from)
393                    .context(
394                        format!("failed to prune optimistic_transactions table to {to:?} with limit {limit}").as_str(),
395                    )
396            },
397            PG_DB_COMMIT_SLEEP_DURATION
398        )
399    }
400
401    fn get_latest_object_snapshot_watermark(
402        &self,
403    ) -> Result<Option<CommitterWatermark>, IndexerError> {
404        read_only_blocking!(&self.blocking_cp, |conn| {
405            watermarks::table
406                .select((
407                    watermarks::epoch_hi_inclusive,
408                    watermarks::checkpoint_hi_inclusive,
409                    watermarks::tx_hi,
410                ))
411                .filter(
412                    watermarks::entity
413                        .eq(ObjectsSnapshotHandlerTables::ObjectsSnapshot.to_string()),
414                )
415                .first::<(i64, i64, i64)>(conn)
416                // Handle case where the watermark is not set yet
417                .optional()
418                .map(|v| {
419                    v.map(|(epoch, cp, tx)| CommitterWatermark {
420                        epoch_hi_inclusive: epoch as u64,
421                        checkpoint_hi_inclusive: cp as u64,
422                        tx_hi: tx as u64,
423                    })
424                })
425        })
426        .context("Failed reading latest object snapshot watermark from PostgresDB")
427    }
428
429    fn get_latest_object_snapshot_checkpoint_sequence_number(
430        &self,
431    ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
432        read_only_blocking!(&self.blocking_cp, |conn| {
433            objects_snapshot::table
434                .select(max(objects_snapshot::checkpoint_sequence_number))
435                .first::<Option<i64>>(conn)
436                .map(|v| v.map(|v| v as CheckpointSequenceNumber))
437        })
438        .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
439    }
440
441    fn persist_display_updates(
442        &self,
443        display_updates: BTreeMap<String, StoredDisplay>,
444    ) -> Result<(), IndexerError> {
445        transactional_blocking_with_retry!(
446            &self.blocking_cp,
447            {
448                let value = display_updates.values().collect::<Vec<_>>();
449                |conn| self.persist_displays_in_existing_transaction(conn, value)
450            },
451            PG_DB_COMMIT_SLEEP_DURATION
452        )?;
453
454        Ok(())
455    }
456
457    fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
458        let guard = self
459            .metrics
460            .checkpoint_db_commit_latency_objects_chunks
461            .start_timer();
462        let len = objects.len();
463        let raw_query = r#"
464            INSERT INTO objects (
465                object_id,
466                object_version,
467                object_digest,
468                owner_type,
469                owner_id,
470                object_type,
471                object_type_package,
472                object_type_module,
473                object_type_name,
474                serialized_object,
475                coin_type,
476                coin_balance,
477                df_kind
478            )
479            SELECT
480                u.object_id,
481                u.object_version,
482                u.object_digest,
483                u.owner_type,
484                u.owner_id,
485                u.object_type,
486                u.object_type_package,
487                u.object_type_module,
488                u.object_type_name,
489                u.serialized_object,
490                u.coin_type,
491                u.coin_balance,
492                u.df_kind
493            FROM UNNEST(
494                $1::BYTEA[],
495                $2::BIGINT[],
496                $3::BYTEA[],
497                $4::SMALLINT[],
498                $5::BYTEA[],
499                $6::TEXT[],
500                $7::BYTEA[],
501                $8::TEXT[],
502                $9::TEXT[],
503                $10::BYTEA[],
504                $11::TEXT[],
505                $12::BIGINT[],
506                $13::SMALLINT[],
507                $14::BYTEA[]
508            ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest)
509            LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
510            WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
511            ON CONFLICT (object_id) DO UPDATE
512            SET
513                object_version = EXCLUDED.object_version,
514                object_digest = EXCLUDED.object_digest,
515                owner_type = EXCLUDED.owner_type,
516                owner_id = EXCLUDED.owner_id,
517                object_type = EXCLUDED.object_type,
518                object_type_package = EXCLUDED.object_type_package,
519                object_type_module = EXCLUDED.object_type_module,
520                object_type_name = EXCLUDED.object_type_name,
521                serialized_object = EXCLUDED.serialized_object,
522                coin_type = EXCLUDED.coin_type,
523                coin_balance = EXCLUDED.coin_balance,
524                df_kind = EXCLUDED.df_kind
525        "#;
526        let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
527            .into_iter()
528            .map(LiveObject::split)
529            .map(|(indexed_object, tx_digest)| {
530                (
531                    StoredObject::from(indexed_object),
532                    tx_digest.into_inner().to_vec(),
533                )
534            })
535            .unzip();
536        let query = diesel::sql_query(raw_query)
537            .bind::<Array<Bytea>, _>(objects.object_ids)
538            .bind::<Array<BigInt>, _>(objects.object_versions)
539            .bind::<Array<Bytea>, _>(objects.object_digests)
540            .bind::<Array<SmallInt>, _>(objects.owner_types)
541            .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
542            .bind::<Array<Nullable<Text>>, _>(objects.object_types)
543            .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
544            .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
545            .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
546            .bind::<Array<Bytea>, _>(objects.serialized_objects)
547            .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
548            .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
549            .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
550            .bind::<Array<Bytea>, _>(tx_digests);
551        transactional_blocking_with_retry!(
552            &self.blocking_cp,
553            |conn| {
554                query.clone().execute(conn)?;
555                Ok::<(), IndexerError>(())
556            },
557            PG_DB_COMMIT_SLEEP_DURATION
558        )
559        .tap_ok(|_| {
560            let elapsed = guard.stop_and_record();
561            info!(elapsed, "Persisted {len} chunked objects");
562        })
563        .tap_err(|e| {
564            tracing::error!("failed to persist object mutations with error: {e}");
565        })
566    }
567
568    fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
569        let guard = self
570            .metrics
571            .checkpoint_db_commit_latency_objects_chunks
572            .start_timer();
573        let len = objects.len();
574        let raw_query = r#"
575            DELETE FROM objects
576            WHERE object_id IN (
577                SELECT u.object_id
578                FROM UNNEST(
579                    $1::BYTEA[],
580                    $2::BYTEA[]
581                ) AS u(object_id, tx_digest)
582                LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
583                WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
584            )
585        "#;
586        let (object_ids, tx_digests): (Vec<_>, Vec<_>) = objects
587            .into_iter()
588            .map(|removed_object| {
589                (
590                    removed_object.object_id().to_vec(),
591                    removed_object.transaction_digest.into_inner().to_vec(),
592                )
593            })
594            .unzip();
595        let query = diesel::sql_query(raw_query)
596            .bind::<Array<Bytea>, _>(object_ids)
597            .bind::<Array<Bytea>, _>(tx_digests);
598        transactional_blocking_with_retry!(
599            &self.blocking_cp,
600            |conn| {
601                query.clone().execute(conn)?;
602                Ok::<(), IndexerError>(())
603            },
604            PG_DB_COMMIT_SLEEP_DURATION
605        )
606        .tap_ok(|_| {
607            let elapsed = guard.stop_and_record();
608            info!(elapsed, "Deleted {len} chunked objects");
609        })
610        .tap_err(|e| {
611            tracing::error!("failed to persist object deletions with error: {e}");
612        })
613    }
614
615    fn persist_object_mutation_chunk_in_existing_transaction(
616        &self,
617        conn: &mut PgConnection,
618        mutated_object_mutation_chunk: Vec<StoredObject>,
619    ) -> Result<(), IndexerError> {
620        on_conflict_do_update!(
621            objects::table,
622            mutated_object_mutation_chunk,
623            objects::object_id,
624            (
625                objects::object_id.eq(excluded(objects::object_id)),
626                objects::object_version.eq(excluded(objects::object_version)),
627                objects::object_digest.eq(excluded(objects::object_digest)),
628                objects::owner_type.eq(excluded(objects::owner_type)),
629                objects::owner_id.eq(excluded(objects::owner_id)),
630                objects::object_type.eq(excluded(objects::object_type)),
631                objects::serialized_object.eq(excluded(objects::serialized_object)),
632                objects::coin_type.eq(excluded(objects::coin_type)),
633                objects::coin_balance.eq(excluded(objects::coin_balance)),
634                objects::df_kind.eq(excluded(objects::df_kind)),
635            ),
636            conn
637        );
638        Ok::<(), IndexerError>(())
639    }
640
641    fn persist_object_deletion_chunk_in_existing_transaction(
642        &self,
643        conn: &mut PgConnection,
644        deleted_objects_chunk: Vec<StoredDeletedObject>,
645    ) -> Result<(), IndexerError> {
646        diesel::delete(
647            objects::table.filter(
648                objects::object_id.eq_any(
649                    deleted_objects_chunk
650                        .iter()
651                        .map(|o| o.object_id.clone())
652                        .collect::<Vec<_>>(),
653                ),
654            ),
655        )
656        .execute(conn)
657        .map_err(IndexerError::from)
658        .context("Failed to write object deletion to PostgresDB")?;
659
660        Ok::<(), IndexerError>(())
661    }
662
663    fn backfill_objects_snapshot_chunk(
664        &self,
665        objects_snapshot: Vec<StoredObjectSnapshot>,
666    ) -> Result<(), IndexerError> {
667        let guard = self
668            .metrics
669            .checkpoint_db_commit_latency_objects_snapshot_chunks
670            .start_timer();
671        transactional_blocking_with_retry!(
672            &self.blocking_cp,
673            |conn| {
674                for objects_snapshot_chunk in
675                    objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
676                {
677                    on_conflict_do_update!(
678                        objects_snapshot::table,
679                        objects_snapshot_chunk,
680                        objects_snapshot::object_id,
681                        (
682                            objects_snapshot::object_version
683                                .eq(excluded(objects_snapshot::object_version)),
684                            objects_snapshot::object_status
685                                .eq(excluded(objects_snapshot::object_status)),
686                            objects_snapshot::object_digest
687                                .eq(excluded(objects_snapshot::object_digest)),
688                            objects_snapshot::checkpoint_sequence_number
689                                .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
690                            objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
691                            objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
692                            objects_snapshot::object_type_package
693                                .eq(excluded(objects_snapshot::object_type_package)),
694                            objects_snapshot::object_type_module
695                                .eq(excluded(objects_snapshot::object_type_module)),
696                            objects_snapshot::object_type_name
697                                .eq(excluded(objects_snapshot::object_type_name)),
698                            objects_snapshot::object_type
699                                .eq(excluded(objects_snapshot::object_type)),
700                            objects_snapshot::serialized_object
701                                .eq(excluded(objects_snapshot::serialized_object)),
702                            objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
703                            objects_snapshot::coin_balance
704                                .eq(excluded(objects_snapshot::coin_balance)),
705                            objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
706                        ),
707                        conn
708                    );
709                }
710                Ok::<(), IndexerError>(())
711            },
712            PG_DB_COMMIT_SLEEP_DURATION
713        )
714        .tap_ok(|_| {
715            let elapsed = guard.stop_and_record();
716            info!(
717                elapsed,
718                "Persisted {} chunked objects snapshot",
719                objects_snapshot.len(),
720            );
721        })
722        .tap_err(|e| {
723            tracing::error!("failed to persist object snapshot with error: {e}");
724        })
725    }
726
727    fn persist_objects_history_chunk(
728        &self,
729        stored_objects_history: Vec<StoredHistoryObject>,
730    ) -> Result<(), IndexerError> {
731        let guard = self
732            .metrics
733            .checkpoint_db_commit_latency_objects_history_chunks
734            .start_timer();
735        transactional_blocking_with_retry!(
736            &self.blocking_cp,
737            |conn| {
738                for stored_objects_history_chunk in
739                    stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
740                {
741                    insert_or_ignore_into!(
742                        objects_history::table,
743                        stored_objects_history_chunk,
744                        conn
745                    );
746                }
747                Ok::<(), IndexerError>(())
748            },
749            PG_DB_COMMIT_SLEEP_DURATION
750        )
751        .tap_ok(|_| {
752            let elapsed = guard.stop_and_record();
753            info!(
754                elapsed,
755                "Persisted {} chunked objects history",
756                stored_objects_history.len(),
757            );
758        })
759        .tap_err(|e| {
760            tracing::error!("failed to persist object history with error: {e}");
761        })
762    }
763
764    fn persist_object_version_chunk(
765        &self,
766        object_versions: Vec<StoredObjectVersion>,
767    ) -> Result<(), IndexerError> {
768        let guard = self
769            .metrics
770            .checkpoint_db_commit_latency_objects_version_chunks
771            .start_timer();
772
773        transactional_blocking_with_retry!(
774            &self.blocking_cp,
775            |conn| {
776                for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
777                {
778                    insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
779                }
780                Ok::<(), IndexerError>(())
781            },
782            PG_DB_COMMIT_SLEEP_DURATION
783        )
784        .tap_ok(|_| {
785            let elapsed = guard.stop_and_record();
786            info!(
787                elapsed,
788                "Persisted {} chunked object versions",
789                object_versions.len(),
790            );
791        })
792        .tap_err(|e| {
793            tracing::error!("failed to persist object versions with error: {e}");
794        })
795    }
796
797    fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
798        let Some(first_checkpoint) = checkpoints.first() else {
799            return Ok(());
800        };
801
802        // If the first checkpoint has sequence number 0, we need to persist the digest
803        // as chain identifier.
804        if first_checkpoint.sequence_number == 0 {
805            let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
806            self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
807            transactional_blocking_with_retry!(
808                &self.blocking_cp,
809                |conn| {
810                    let checkpoint_digest =
811                        first_checkpoint.checkpoint_digest.into_inner().to_vec();
812                    insert_or_ignore_into!(
813                        chain_identifier::table,
814                        StoredChainIdentifier { checkpoint_digest },
815                        conn
816                    );
817                    Ok::<(), IndexerError>(())
818                },
819                PG_DB_COMMIT_SLEEP_DURATION
820            )?;
821        }
822        let guard = self
823            .metrics
824            .checkpoint_db_commit_latency_checkpoints
825            .start_timer();
826
827        let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
828        transactional_blocking_with_retry!(
829            &self.blocking_cp,
830            |conn| {
831                for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
832                    insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
833                }
834                Ok::<(), IndexerError>(())
835            },
836            PG_DB_COMMIT_SLEEP_DURATION
837        )
838        .tap_ok(|_| {
839            info!(
840                "Persisted {} pruner_cp_watermark rows.",
841                stored_cp_txs.len(),
842            );
843        })
844        .tap_err(|e| {
845            tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
846        })?;
847
848        let stored_checkpoints = checkpoints
849            .iter()
850            .map(StoredCheckpoint::from)
851            .collect::<Vec<_>>();
852        transactional_blocking_with_retry!(
853            &self.blocking_cp,
854            |conn| {
855                for stored_checkpoint_chunk in
856                    stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
857                {
858                    insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
859                    let time_now_ms = chrono::Utc::now().timestamp_millis();
860                    for stored_checkpoint in stored_checkpoint_chunk {
861                        self.metrics
862                            .db_commit_lag_ms
863                            .set(time_now_ms - stored_checkpoint.timestamp_ms);
864                        self.metrics.max_committed_checkpoint_sequence_number.set(
865                            stored_checkpoint.sequence_number,
866                        );
867                        self.metrics.committed_checkpoint_timestamp_ms.set(
868                            stored_checkpoint.timestamp_ms,
869                        );
870                    }
871                    for stored_checkpoint in stored_checkpoint_chunk {
872                        info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
873                    }
874                }
875                Ok::<(), IndexerError>(())
876            },
877            PG_DB_COMMIT_SLEEP_DURATION
878        )
879        .tap_ok(|_| {
880            let elapsed = guard.stop_and_record();
881            info!(
882                elapsed,
883                "Persisted {} checkpoints",
884                stored_checkpoints.len()
885            );
886        })
887        .tap_err(|e| {
888            tracing::error!("failed to persist checkpoints with error: {e}");
889        })
890    }
891
892    fn persist_transactions_chunk(
893        &self,
894        transactions: Vec<IndexedTransaction>,
895    ) -> Result<(), IndexerError> {
896        let guard = self
897            .metrics
898            .checkpoint_db_commit_latency_transactions_chunks
899            .start_timer();
900        let transformation_guard = self
901            .metrics
902            .checkpoint_db_commit_latency_transactions_chunks_transformation
903            .start_timer();
904        let transactions = transactions
905            .iter()
906            .map(StoredTransaction::from)
907            .collect::<Vec<_>>();
908        drop(transformation_guard);
909
910        transactional_blocking_with_retry!(
911            &self.blocking_cp,
912            |conn| {
913                for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
914                    insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
915                }
916                Ok::<(), IndexerError>(())
917            },
918            PG_DB_COMMIT_SLEEP_DURATION
919        )
920        .tap_ok(|_| {
921            let elapsed = guard.stop_and_record();
922            info!(
923                elapsed,
924                "Persisted {} chunked transactions",
925                transactions.len()
926            );
927        })
928        .tap_err(|e| {
929            tracing::error!("failed to persist transactions with error: {e}");
930        })
931    }
932
933    fn persist_tx_global_order_chunk(
934        &self,
935        tx_order: Vec<CheckpointTxGlobalOrder>,
936    ) -> Result<(), IndexerError> {
937        let guard = self
938            .metrics
939            .checkpoint_db_commit_latency_tx_insertion_order_chunks
940            .start_timer();
941
942        transactional_blocking_with_retry!(
943            &self.blocking_cp,
944            |conn| {
945                for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
946                    insert_or_ignore_into!(tx_global_order::table, tx_order_chunk, conn);
947                }
948                Ok::<(), IndexerError>(())
949            },
950            PG_DB_COMMIT_SLEEP_DURATION
951        )
952        .tap_ok(|_| {
953            let elapsed = guard.stop_and_record();
954            info!(
955                elapsed,
956                "Persisted {} chunked txs insertion order",
957                tx_order.len()
958            );
959        })
960        .tap_err(|e| {
961            tracing::error!("failed to persist txs insertion order with error: {e}");
962        })
963    }
964
965    /// We enforce index-status semantics for checkpointed transactions
966    /// in `tx_global_order`.
967    ///
968    /// Namely, checkpointed transactions (i.e. with `optimistic_sequence_number
969    /// == 0`) are updated to `optimistic_sequence_number == -1` to indicate
970    /// that they have been persisted in the database.
971    fn update_status_for_checkpoint_transactions_chunk(
972        &self,
973        tx_order: Vec<CheckpointTxGlobalOrder>,
974    ) -> Result<(), IndexerError> {
975        let guard = self
976            .metrics
977            .checkpoint_db_commit_latency_tx_insertion_order_chunks
978            .start_timer();
979
980        let num_transactions = tx_order.len();
981        transactional_blocking_with_retry!(
982            &self.blocking_cp,
983            |conn| {
984                on_conflict_do_update_with_condition!(
985                    tx_global_order::table,
986                    tx_order.clone(),
987                    tx_global_order::tx_digest,
988                    tx_global_order::optimistic_sequence_number.eq(IndexStatus::Completed),
989                    tx_global_order::optimistic_sequence_number.eq(IndexStatus::Started),
990                    conn
991                );
992                on_conflict_do_update_with_condition!(
993                    tx_global_order::table,
994                    tx_order.clone(),
995                    tx_global_order::tx_digest,
996                    tx_global_order::chk_tx_sequence_number
997                        .eq(excluded(tx_global_order::chk_tx_sequence_number)),
998                    tx_global_order::chk_tx_sequence_number.is_null(),
999                    conn
1000                );
1001                Ok::<(), IndexerError>(())
1002            },
1003            PG_DB_COMMIT_SLEEP_DURATION
1004        )
1005        .tap_ok(|_| {
1006            let elapsed = guard.stop_and_record();
1007            info!(
1008                elapsed,
1009                "Updated {} chunked values of `tx_global_order`", num_transactions
1010            );
1011        })
1012        .tap_err(|e| {
1013            tracing::error!("failed to update `tx_global_order` with error: {e}");
1014        })
1015    }
1016
1017    fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1018        let guard = self
1019            .metrics
1020            .checkpoint_db_commit_latency_events_chunks
1021            .start_timer();
1022        let len = events.len();
1023        let events = events
1024            .into_iter()
1025            .map(StoredEvent::from)
1026            .collect::<Vec<_>>();
1027
1028        transactional_blocking_with_retry!(
1029            &self.blocking_cp,
1030            |conn| {
1031                for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1032                    insert_or_ignore_into!(events::table, event_chunk, conn);
1033                }
1034                Ok::<(), IndexerError>(())
1035            },
1036            PG_DB_COMMIT_SLEEP_DURATION
1037        )
1038        .tap_ok(|_| {
1039            let elapsed = guard.stop_and_record();
1040            info!(elapsed, "Persisted {} chunked events", len);
1041        })
1042        .tap_err(|e| {
1043            tracing::error!("failed to persist events with error: {e}");
1044        })
1045    }
1046
1047    fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1048        if packages.is_empty() {
1049            return Ok(());
1050        }
1051        let guard = self
1052            .metrics
1053            .checkpoint_db_commit_latency_packages
1054            .start_timer();
1055        let packages = packages
1056            .into_iter()
1057            .map(StoredPackage::from)
1058            .collect::<Vec<_>>();
1059        transactional_blocking_with_retry!(
1060            &self.blocking_cp,
1061            |conn| {
1062                for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1063                    on_conflict_do_update!(
1064                        packages::table,
1065                        packages_chunk,
1066                        packages::package_id,
1067                        (
1068                            packages::package_id.eq(excluded(packages::package_id)),
1069                            packages::move_package.eq(excluded(packages::move_package)),
1070                        ),
1071                        conn
1072                    );
1073                }
1074                Ok::<(), IndexerError>(())
1075            },
1076            PG_DB_COMMIT_SLEEP_DURATION
1077        )
1078        .tap_ok(|_| {
1079            let elapsed = guard.stop_and_record();
1080            info!(elapsed, "Persisted {} packages", packages.len());
1081        })
1082        .tap_err(|e| {
1083            tracing::error!("failed to persist packages with error: {e}");
1084        })
1085    }
1086
1087    async fn persist_event_indices_chunk(
1088        &self,
1089        indices: Vec<EventIndex>,
1090    ) -> Result<(), IndexerError> {
1091        let guard = self
1092            .metrics
1093            .checkpoint_db_commit_latency_event_indices_chunks
1094            .start_timer();
1095        let len = indices.len();
1096        let (
1097            event_emit_packages,
1098            event_emit_modules,
1099            event_senders,
1100            event_struct_packages,
1101            event_struct_modules,
1102            event_struct_names,
1103            event_struct_instantiations,
1104        ) = indices.into_iter().map(|i| i.split()).fold(
1105            (
1106                Vec::new(),
1107                Vec::new(),
1108                Vec::new(),
1109                Vec::new(),
1110                Vec::new(),
1111                Vec::new(),
1112                Vec::new(),
1113            ),
1114            |(
1115                mut event_emit_packages,
1116                mut event_emit_modules,
1117                mut event_senders,
1118                mut event_struct_packages,
1119                mut event_struct_modules,
1120                mut event_struct_names,
1121                mut event_struct_instantiations,
1122            ),
1123             index| {
1124                event_emit_packages.push(index.0);
1125                event_emit_modules.push(index.1);
1126                event_senders.push(index.2);
1127                event_struct_packages.push(index.3);
1128                event_struct_modules.push(index.4);
1129                event_struct_names.push(index.5);
1130                event_struct_instantiations.push(index.6);
1131                (
1132                    event_emit_packages,
1133                    event_emit_modules,
1134                    event_senders,
1135                    event_struct_packages,
1136                    event_struct_modules,
1137                    event_struct_names,
1138                    event_struct_instantiations,
1139                )
1140            },
1141        );
1142
1143        // Now persist all the event indices in parallel into their tables.
1144        let mut futures = vec![];
1145        futures.push(self.spawn_blocking_task(move |this| {
1146            persist_chunk_into_table!(
1147                event_emit_package::table,
1148                event_emit_packages,
1149                &this.blocking_cp
1150            )
1151        }));
1152
1153        futures.push(self.spawn_blocking_task(move |this| {
1154            persist_chunk_into_table!(
1155                event_emit_module::table,
1156                event_emit_modules,
1157                &this.blocking_cp
1158            )
1159        }));
1160
1161        futures.push(self.spawn_blocking_task(move |this| {
1162            persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
1163        }));
1164
1165        futures.push(self.spawn_blocking_task(move |this| {
1166            persist_chunk_into_table!(
1167                event_struct_package::table,
1168                event_struct_packages,
1169                &this.blocking_cp
1170            )
1171        }));
1172
1173        futures.push(self.spawn_blocking_task(move |this| {
1174            persist_chunk_into_table!(
1175                event_struct_module::table,
1176                event_struct_modules,
1177                &this.blocking_cp
1178            )
1179        }));
1180
1181        futures.push(self.spawn_blocking_task(move |this| {
1182            persist_chunk_into_table!(
1183                event_struct_name::table,
1184                event_struct_names,
1185                &this.blocking_cp
1186            )
1187        }));
1188
1189        futures.push(self.spawn_blocking_task(move |this| {
1190            persist_chunk_into_table!(
1191                event_struct_instantiation::table,
1192                event_struct_instantiations,
1193                &this.blocking_cp
1194            )
1195        }));
1196
1197        futures::future::try_join_all(futures)
1198            .await
1199            .map_err(|e| {
1200                tracing::error!("failed to join event indices futures in a chunk: {e}");
1201                IndexerError::from(e)
1202            })?
1203            .into_iter()
1204            .collect::<Result<Vec<_>, _>>()
1205            .map_err(|e| {
1206                IndexerError::PostgresWrite(format!(
1207                    "Failed to persist all event indices in a chunk: {e:?}"
1208                ))
1209            })?;
1210        let elapsed = guard.stop_and_record();
1211        info!(elapsed, "Persisted {} chunked event indices", len);
1212        Ok(())
1213    }
1214
1215    async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1216        let guard = self
1217            .metrics
1218            .checkpoint_db_commit_latency_tx_indices_chunks
1219            .start_timer();
1220        let len = indices.len();
1221
1222        let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
1223
1224        let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1225        let recipients: Vec<_> = splits
1226            .iter()
1227            .flat_map(|ix| ix.tx_recipients.clone())
1228            .collect();
1229        let input_objects: Vec<_> = splits
1230            .iter()
1231            .flat_map(|ix| ix.tx_input_objects.clone())
1232            .collect();
1233        let changed_objects: Vec<_> = splits
1234            .iter()
1235            .flat_map(|ix| ix.tx_changed_objects.clone())
1236            .collect();
1237        let wrapped_or_deleted_objects: Vec<_> = splits
1238            .iter()
1239            .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1240            .collect();
1241        let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1242        let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1243        let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1244        let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1245        let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1246
1247        let futures = [
1248            self.spawn_blocking_task(move |this| {
1249                persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1250            }),
1251            self.spawn_blocking_task(move |this| {
1252                persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1253            }),
1254            self.spawn_blocking_task(move |this| {
1255                persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1256            }),
1257            self.spawn_blocking_task(move |this| {
1258                persist_chunk_into_table!(
1259                    tx_changed_objects::table,
1260                    changed_objects,
1261                    &this.blocking_cp
1262                )
1263            }),
1264            self.spawn_blocking_task(move |this| {
1265                persist_chunk_into_table!(
1266                    tx_wrapped_or_deleted_objects::table,
1267                    wrapped_or_deleted_objects,
1268                    &this.blocking_cp
1269                )
1270            }),
1271            self.spawn_blocking_task(move |this| {
1272                persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1273            }),
1274            self.spawn_blocking_task(move |this| {
1275                persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1276            }),
1277            self.spawn_blocking_task(move |this| {
1278                persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1279            }),
1280            self.spawn_blocking_task(move |this| {
1281                persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1282            }),
1283            self.spawn_blocking_task(move |this| {
1284                persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1285            }),
1286        ];
1287
1288        futures::future::try_join_all(futures)
1289            .await
1290            .map_err(|e| {
1291                tracing::error!("failed to join tx indices futures in a chunk: {e}");
1292                IndexerError::from(e)
1293            })?
1294            .into_iter()
1295            .collect::<Result<Vec<_>, _>>()
1296            .map_err(|e| {
1297                IndexerError::PostgresWrite(format!(
1298                    "Failed to persist all tx indices in a chunk: {e:?}"
1299                ))
1300            })?;
1301        let elapsed = guard.stop_and_record();
1302        info!(elapsed, "Persisted {} chunked tx_indices", len);
1303        Ok(())
1304    }
1305
1306    fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1307        let guard = self
1308            .metrics
1309            .checkpoint_db_commit_latency_epoch
1310            .start_timer();
1311        let epoch_id = epoch.new_epoch.epoch;
1312
1313        transactional_blocking_with_retry!(
1314            &self.blocking_cp,
1315            |conn| {
1316                if let Some(last_epoch) = &epoch.last_epoch {
1317                    info!(last_epoch.epoch, "Persisting epoch end data.");
1318                    diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1319                        .set(last_epoch)
1320                        .execute(conn)?;
1321                }
1322
1323                info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1324                insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1325                Ok::<(), IndexerError>(())
1326            },
1327            PG_DB_COMMIT_SLEEP_DURATION
1328        )
1329        .tap_ok(|_| {
1330            let elapsed = guard.stop_and_record();
1331            info!(elapsed, epoch_id, "Persisted epoch beginning info");
1332        })
1333        .tap_err(|e| {
1334            tracing::error!("failed to persist epoch with error: {e}");
1335        })
1336    }
1337
1338    fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1339        let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1340        // partition_0 has been created, so no need to advance it.
1341        if let Some(last_epoch_id) = last_epoch_id {
1342            let last_db_epoch: Option<StoredEpochInfo> =
1343                read_only_blocking!(&self.blocking_cp, |conn| {
1344                    epochs::table
1345                        .filter(epochs::epoch.eq(last_epoch_id))
1346                        .first::<StoredEpochInfo>(conn)
1347                        .optional()
1348                })
1349                .context("Failed to read last epoch from PostgresDB")?;
1350            if let Some(last_epoch) = last_db_epoch {
1351                let epoch_partition_data =
1352                    EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1353                let table_partitions = self.partition_manager.get_table_partitions()?;
1354                for (table, (_, last_partition)) in table_partitions {
1355                    // Only advance epoch partition for epoch partitioned tables.
1356                    if !self
1357                        .partition_manager
1358                        .get_strategy(&table)
1359                        .is_epoch_partitioned()
1360                    {
1361                        continue;
1362                    }
1363                    let guard = self.metrics.advance_epoch_latency.start_timer();
1364                    self.partition_manager.advance_epoch(
1365                        table.clone(),
1366                        last_partition,
1367                        &epoch_partition_data,
1368                    )?;
1369                    let elapsed = guard.stop_and_record();
1370                    info!(
1371                        elapsed,
1372                        "Advanced epoch partition {} for table {}",
1373                        last_partition,
1374                        table.clone()
1375                    );
1376                }
1377            } else {
1378                tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1379            }
1380        }
1381
1382        Ok(())
1383    }
1384
1385    fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1386        transactional_blocking_with_retry!(
1387            &self.blocking_cp,
1388            |conn| {
1389                diesel::delete(
1390                    checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1391                )
1392                .execute(conn)
1393                .map_err(IndexerError::from)
1394                .context("Failed to prune checkpoints table")?;
1395
1396                Ok::<(), IndexerError>(())
1397            },
1398            PG_DB_COMMIT_SLEEP_DURATION
1399        )
1400    }
1401
1402    fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1403        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1404        transactional_blocking_with_retry!(
1405            &self.blocking_cp,
1406            |conn| {
1407                prune_tx_or_event_indice_table!(
1408                    event_emit_module,
1409                    conn,
1410                    min_tx,
1411                    max_tx,
1412                    "Failed to prune event_emit_module table"
1413                );
1414                prune_tx_or_event_indice_table!(
1415                    event_emit_package,
1416                    conn,
1417                    min_tx,
1418                    max_tx,
1419                    "Failed to prune event_emit_package table"
1420                );
1421                prune_tx_or_event_indice_table![
1422                    event_senders,
1423                    conn,
1424                    min_tx,
1425                    max_tx,
1426                    "Failed to prune event_senders table"
1427                ];
1428                prune_tx_or_event_indice_table![
1429                    event_struct_instantiation,
1430                    conn,
1431                    min_tx,
1432                    max_tx,
1433                    "Failed to prune event_struct_instantiation table"
1434                ];
1435                prune_tx_or_event_indice_table![
1436                    event_struct_module,
1437                    conn,
1438                    min_tx,
1439                    max_tx,
1440                    "Failed to prune event_struct_module table"
1441                ];
1442                prune_tx_or_event_indice_table![
1443                    event_struct_name,
1444                    conn,
1445                    min_tx,
1446                    max_tx,
1447                    "Failed to prune event_struct_name table"
1448                ];
1449                prune_tx_or_event_indice_table![
1450                    event_struct_package,
1451                    conn,
1452                    min_tx,
1453                    max_tx,
1454                    "Failed to prune event_struct_package table"
1455                ];
1456                Ok::<(), IndexerError>(())
1457            },
1458            PG_DB_COMMIT_SLEEP_DURATION
1459        )
1460    }
1461
1462    fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1463        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1464        transactional_blocking_with_retry!(
1465            &self.blocking_cp,
1466            |conn| {
1467                prune_tx_or_event_indice_table!(
1468                    tx_senders,
1469                    conn,
1470                    min_tx,
1471                    max_tx,
1472                    "Failed to prune tx_senders table"
1473                );
1474                prune_tx_or_event_indice_table!(
1475                    tx_recipients,
1476                    conn,
1477                    min_tx,
1478                    max_tx,
1479                    "Failed to prune tx_recipients table"
1480                );
1481                prune_tx_or_event_indice_table![
1482                    tx_input_objects,
1483                    conn,
1484                    min_tx,
1485                    max_tx,
1486                    "Failed to prune tx_input_objects table"
1487                ];
1488                prune_tx_or_event_indice_table![
1489                    tx_changed_objects,
1490                    conn,
1491                    min_tx,
1492                    max_tx,
1493                    "Failed to prune tx_changed_objects table"
1494                ];
1495                prune_tx_or_event_indice_table![
1496                    tx_wrapped_or_deleted_objects,
1497                    conn,
1498                    min_tx,
1499                    max_tx,
1500                    "Failed to prune tx_wrapped_or_deleted_objects table"
1501                ];
1502                prune_tx_or_event_indice_table![
1503                    tx_calls_pkg,
1504                    conn,
1505                    min_tx,
1506                    max_tx,
1507                    "Failed to prune tx_calls_pkg table"
1508                ];
1509                prune_tx_or_event_indice_table![
1510                    tx_calls_mod,
1511                    conn,
1512                    min_tx,
1513                    max_tx,
1514                    "Failed to prune tx_calls_mod table"
1515                ];
1516                prune_tx_or_event_indice_table![
1517                    tx_calls_fun,
1518                    conn,
1519                    min_tx,
1520                    max_tx,
1521                    "Failed to prune tx_calls_fun table"
1522                ];
1523                prune_tx_or_event_indice_table![
1524                    tx_digests,
1525                    conn,
1526                    min_tx,
1527                    max_tx,
1528                    "Failed to prune tx_digests table"
1529                ];
1530                Ok::<(), IndexerError>(())
1531            },
1532            PG_DB_COMMIT_SLEEP_DURATION
1533        )
1534    }
1535
1536    fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1537        transactional_blocking_with_retry!(
1538            &self.blocking_cp,
1539            |conn| {
1540                diesel::delete(
1541                    pruner_cp_watermark::table
1542                        .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1543                )
1544                .execute(conn)
1545                .map_err(IndexerError::from)
1546                .context("Failed to prune pruner_cp_watermark table")?;
1547                Ok::<(), IndexerError>(())
1548            },
1549            PG_DB_COMMIT_SLEEP_DURATION
1550        )
1551    }
1552
1553    fn get_network_total_transactions_by_end_of_epoch(
1554        &self,
1555        epoch: u64,
1556    ) -> Result<Option<u64>, IndexerError> {
1557        read_only_blocking!(&self.blocking_cp, |conn| {
1558            epochs::table
1559                .filter(epochs::epoch.eq(epoch as i64))
1560                .select(epochs::network_total_transactions)
1561                .get_result::<Option<i64>>(conn)
1562        })
1563        .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1564        .map(|option| option.map(|v| v as u64))
1565    }
1566
1567    fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1568        transactional_blocking_with_retry!(
1569            &self.blocking_cp,
1570            |conn| {
1571                diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1572                    .execute(conn)?;
1573                Ok::<(), IndexerError>(())
1574            },
1575            PG_DB_COMMIT_SLEEP_DURATION
1576        )
1577        .tap_ok(|_| {
1578            info!("Successfully refreshed participation_metrics");
1579        })
1580        .tap_err(|e| {
1581            tracing::error!("failed to refresh participation_metrics: {e}");
1582        })
1583    }
1584
1585    fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1586        &self,
1587        watermark: CommitterWatermark,
1588    ) -> Result<(), IndexerError>
1589    where
1590        E::Iterator: Iterator<Item: AsRef<str>>,
1591    {
1592        let guard = self
1593            .metrics
1594            .checkpoint_db_commit_latency_watermarks
1595            .start_timer();
1596
1597        let upper_bound_updates = E::iter()
1598            .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1599            .collect::<Vec<_>>();
1600
1601        transactional_blocking_with_retry!(
1602            &self.blocking_cp,
1603            |conn| {
1604                diesel::insert_into(watermarks::table)
1605                    .values(&upper_bound_updates)
1606                    .on_conflict(watermarks::entity)
1607                    .do_update()
1608                    .set((
1609                        watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)),
1610                        watermarks::checkpoint_hi_inclusive
1611                            .eq(excluded(watermarks::checkpoint_hi_inclusive)),
1612                        watermarks::tx_hi.eq(excluded(watermarks::tx_hi)),
1613                    ))
1614                    .execute(conn)
1615                    .map_err(IndexerError::from)
1616                    .context("Failed to update watermarks upper bound")?;
1617                Ok::<(), IndexerError>(())
1618            },
1619            PG_DB_COMMIT_SLEEP_DURATION
1620        )
1621        .tap_ok(|_| {
1622            let elapsed = guard.stop_and_record();
1623            info!(elapsed, "Persisted watermarks");
1624        })
1625        .tap_err(|e| {
1626            tracing::error!("Failed to persist watermarks with error: {}", e);
1627        })
1628    }
1629
1630    fn map_epochs_to_cp_tx(
1631        &self,
1632        epochs: &[u64],
1633    ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1634        let pool = &self.blocking_cp;
1635        let results: Vec<(i64, i64, i64)> = run_query!(pool, move |conn| {
1636            epochs::table
1637                .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1638                .select((
1639                    epochs::epoch,
1640                    epochs::first_checkpoint_id,
1641                    epochs::first_tx_sequence_number,
1642                ))
1643                .load::<(i64, i64, i64)>(conn)
1644        })
1645        .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1646
1647        Ok(results
1648            .into_iter()
1649            .map(|(epoch, checkpoint, tx)| (epoch as u64, (checkpoint as u64, tx as u64)))
1650            .collect())
1651    }
1652
1653    fn update_watermarks_lower_bound(
1654        &self,
1655        watermarks: Vec<(PrunableTable, u64)>,
1656    ) -> Result<(), IndexerError> {
1657        use diesel::query_dsl::methods::FilterDsl;
1658
1659        let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1660        let epoch_mapping = self.map_epochs_to_cp_tx(&epochs)?;
1661        let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1662            .into_iter()
1663            .map(|(table, epoch)| {
1664                let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1665                    IndexerError::PersistentStorageDataCorruption(format!(
1666                        "epoch {epoch} not found in epoch mapping",
1667                    ))
1668                })?;
1669                Ok(StoredWatermark::from_lower_bound_update(
1670                    table.as_ref(),
1671                    epoch,
1672                    table.select_reader_lo(*checkpoint, *tx),
1673                ))
1674            })
1675            .collect();
1676        let lower_bound_updates = lookups?;
1677        let guard = self
1678            .metrics
1679            .checkpoint_db_commit_latency_watermarks
1680            .start_timer();
1681        transactional_blocking_with_retry!(
1682            &self.blocking_cp,
1683            |conn| {
1684                diesel::insert_into(watermarks::table)
1685                    .values(&lower_bound_updates)
1686                    .on_conflict(watermarks::entity)
1687                    .do_update()
1688                    .set((
1689                        watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),
1690                        watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)),
1691                        watermarks::timestamp_ms.eq(sql::<diesel::sql_types::BigInt>(
1692                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1693                        )),
1694                    ))
1695                    .filter(excluded(watermarks::reader_lo).gt(watermarks::reader_lo))
1696                    .filter(excluded(watermarks::epoch_lo).gt(watermarks::epoch_lo))
1697                    .filter(
1698                        diesel::dsl::sql::<diesel::sql_types::BigInt>(
1699                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1700                        )
1701                        .gt(watermarks::timestamp_ms),
1702                    )
1703                    .execute(conn)
1704            },
1705            PG_DB_COMMIT_SLEEP_DURATION
1706        )
1707        .tap_ok(|_| {
1708            let elapsed = guard.stop_and_record();
1709            info!(elapsed, "Persisted watermarks");
1710        })
1711        .tap_err(|e| {
1712            tracing::error!("Failed to persist watermarks with error: {}", e);
1713        })?;
1714        Ok(())
1715    }
1716
1717    fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1718        // read_only transaction, otherwise this will block and get blocked by write
1719        // transactions to the same table.
1720        run_query_with_retry!(
1721            &self.blocking_cp,
1722            |conn| {
1723                let stored = watermarks::table
1724                    .load::<StoredWatermark>(conn)
1725                    .map_err(Into::into)
1726                    .context("Failed reading watermarks from PostgresDB")?;
1727                let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1728                    "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1729                ))
1730                .get_result(conn)
1731                .map_err(Into::into)
1732                .context("Failed reading current timestamp from PostgresDB")?;
1733                Ok::<_, IndexerError>((stored, timestamp))
1734            },
1735            PG_DB_COMMIT_SLEEP_DURATION
1736        )
1737    }
1738
1739    async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1740    where
1741        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1742        R: Send + 'static,
1743    {
1744        let this = self.clone();
1745        let current_span = tracing::Span::current();
1746        tokio::task::spawn_blocking(move || {
1747            let _guard = current_span.enter();
1748            f(this)
1749        })
1750        .await
1751        .map_err(Into::into)
1752        .and_then(std::convert::identity)
1753    }
1754
1755    fn spawn_blocking_task<F, R>(
1756        &self,
1757        f: F,
1758    ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1759    where
1760        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1761        R: Send + 'static,
1762    {
1763        let this = self.clone();
1764        let current_span = tracing::Span::current();
1765        let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1766        tokio::task::spawn_blocking(move || {
1767            let _guard = current_span.enter();
1768            let _elapsed = guard.stop_and_record();
1769            f(this)
1770        })
1771    }
1772
1773    fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1774    where
1775        F: FnOnce(Self) -> Fut + Send + 'static,
1776        Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1777        R: Send + 'static,
1778    {
1779        let this = self.clone();
1780        tokio::task::spawn(async move { f(this).await })
1781    }
1782}
1783
1784#[async_trait]
1785impl IndexerStore for PgIndexerStore {
1786    async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1787        self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1788            .await
1789    }
1790
1791    async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1792        self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1793            .await
1794    }
1795
1796    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1797        self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1798            .await
1799    }
1800
1801    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1802        self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1803            .await
1804    }
1805
1806    async fn get_latest_object_snapshot_watermark(
1807        &self,
1808    ) -> Result<Option<CommitterWatermark>, IndexerError> {
1809        self.execute_in_blocking_worker(|this| this.get_latest_object_snapshot_watermark())
1810            .await
1811    }
1812
1813    async fn get_latest_object_snapshot_checkpoint_sequence_number(
1814        &self,
1815    ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
1816        self.execute_in_blocking_worker(|this| {
1817            this.get_latest_object_snapshot_checkpoint_sequence_number()
1818        })
1819        .await
1820    }
1821
1822    fn persist_objects_in_existing_transaction(
1823        &self,
1824        conn: &mut PgConnection,
1825        object_changes: Vec<TransactionObjectChangesToCommit>,
1826    ) -> Result<(), IndexerError> {
1827        if object_changes.is_empty() {
1828            return Ok(());
1829        }
1830
1831        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1832        let object_mutations = indexed_mutations
1833            .into_iter()
1834            .map(StoredObject::from)
1835            .collect::<Vec<_>>();
1836        let object_deletions = indexed_deletions
1837            .into_iter()
1838            .map(StoredDeletedObject::from)
1839            .collect::<Vec<_>>();
1840
1841        self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1842        self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1843
1844        Ok(())
1845    }
1846
1847    async fn persist_objects_snapshot(
1848        &self,
1849        object_changes: Vec<TransactionObjectChangesToCommit>,
1850    ) -> Result<(), IndexerError> {
1851        if object_changes.is_empty() {
1852            return Ok(());
1853        }
1854        let guard = self
1855            .metrics
1856            .checkpoint_db_commit_latency_objects_snapshot
1857            .start_timer();
1858        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1859        let objects_snapshot = indexed_mutations
1860            .into_iter()
1861            .map(StoredObjectSnapshot::from)
1862            .chain(
1863                indexed_deletions
1864                    .into_iter()
1865                    .map(StoredObjectSnapshot::from),
1866            )
1867            .collect::<Vec<_>>();
1868        let len = objects_snapshot.len();
1869        let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1870        let futures = chunks
1871            .into_iter()
1872            .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1873
1874        futures::future::try_join_all(futures)
1875            .await
1876            .map_err(|e| {
1877                tracing::error!("failed to join backfill_objects_snapshot_chunk futures: {e}");
1878                IndexerError::from(e)
1879            })?
1880            .into_iter()
1881            .collect::<Result<Vec<_>, _>>()
1882            .map_err(|e| {
1883                IndexerError::PostgresWrite(format!(
1884                    "Failed to persist all objects snapshot chunks: {e:?}"
1885                ))
1886            })?;
1887        let elapsed = guard.stop_and_record();
1888        info!(elapsed, "Persisted {} objects snapshot", len);
1889        Ok(())
1890    }
1891
1892    async fn persist_object_history(
1893        &self,
1894        object_changes: Vec<TransactionObjectChangesToCommit>,
1895    ) -> Result<(), IndexerError> {
1896        let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1897            .map(|val| val.eq_ignore_ascii_case("true"))
1898            .unwrap_or(false);
1899        if skip_history {
1900            info!("skipping object history");
1901            return Ok(());
1902        }
1903
1904        if object_changes.is_empty() {
1905            return Ok(());
1906        }
1907        let objects = make_objects_history_to_commit(object_changes);
1908        let guard = self
1909            .metrics
1910            .checkpoint_db_commit_latency_objects_history
1911            .start_timer();
1912
1913        let len = objects.len();
1914        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1915        let futures = chunks
1916            .into_iter()
1917            .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1918
1919        futures::future::try_join_all(futures)
1920            .await
1921            .map_err(|e| {
1922                tracing::error!("failed to join persist_objects_history_chunk futures: {e}");
1923                IndexerError::from(e)
1924            })?
1925            .into_iter()
1926            .collect::<Result<Vec<_>, _>>()
1927            .map_err(|e| {
1928                IndexerError::PostgresWrite(format!(
1929                    "Failed to persist all objects history chunks: {e:?}"
1930                ))
1931            })?;
1932        let elapsed = guard.stop_and_record();
1933        info!(elapsed, "Persisted {} objects history", len);
1934        Ok(())
1935    }
1936
1937    async fn persist_object_versions(
1938        &self,
1939        object_versions: Vec<StoredObjectVersion>,
1940    ) -> Result<(), IndexerError> {
1941        if object_versions.is_empty() {
1942            return Ok(());
1943        }
1944
1945        let guard = self
1946            .metrics
1947            .checkpoint_db_commit_latency_objects_version
1948            .start_timer();
1949
1950        let object_versions_count = object_versions.len();
1951
1952        let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1953        let futures = chunks
1954            .into_iter()
1955            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1956            .collect::<Vec<_>>();
1957
1958        futures::future::try_join_all(futures)
1959            .await
1960            .map_err(|e| {
1961                tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1962                IndexerError::from(e)
1963            })?
1964            .into_iter()
1965            .collect::<Result<Vec<_>, _>>()
1966            .map_err(|e| {
1967                IndexerError::PostgresWrite(format!(
1968                    "Failed to persist all objects version chunks: {e:?}"
1969                ))
1970            })?;
1971        let elapsed = guard.stop_and_record();
1972        info!(elapsed, "Persisted {object_versions_count} object versions");
1973        Ok(())
1974    }
1975
1976    async fn persist_checkpoints(
1977        &self,
1978        checkpoints: Vec<IndexedCheckpoint>,
1979    ) -> Result<(), IndexerError> {
1980        self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1981            .await
1982    }
1983
1984    async fn persist_transactions(
1985        &self,
1986        transactions: Vec<IndexedTransaction>,
1987    ) -> Result<(), IndexerError> {
1988        let guard = self
1989            .metrics
1990            .checkpoint_db_commit_latency_transactions
1991            .start_timer();
1992        let len = transactions.len();
1993
1994        let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1995        let futures = chunks
1996            .into_iter()
1997            .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1998
1999        futures::future::try_join_all(futures)
2000            .await
2001            .map_err(|e| {
2002                tracing::error!("failed to join persist_transactions_chunk futures: {e}");
2003                IndexerError::from(e)
2004            })?
2005            .into_iter()
2006            .collect::<Result<Vec<_>, _>>()
2007            .map_err(|e| {
2008                IndexerError::PostgresWrite(format!(
2009                    "Failed to persist all transactions chunks: {e:?}"
2010                ))
2011            })?;
2012        let elapsed = guard.stop_and_record();
2013        info!(elapsed, "Persisted {} transactions", len);
2014        Ok(())
2015    }
2016
2017    fn persist_optimistic_transaction_in_existing_transaction(
2018        &self,
2019        conn: &mut PgConnection,
2020        transaction: OptimisticTransaction,
2021    ) -> Result<(), IndexerError> {
2022        insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
2023        Ok(())
2024    }
2025
2026    async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
2027        if events.is_empty() {
2028            return Ok(());
2029        }
2030        let len = events.len();
2031        let guard = self
2032            .metrics
2033            .checkpoint_db_commit_latency_events
2034            .start_timer();
2035        let chunks = chunk!(events, self.config.parallel_chunk_size);
2036        let futures = chunks
2037            .into_iter()
2038            .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
2039
2040        futures::future::try_join_all(futures)
2041            .await
2042            .map_err(|e| {
2043                tracing::error!("failed to join persist_events_chunk futures: {e}");
2044                IndexerError::from(e)
2045            })?
2046            .into_iter()
2047            .collect::<Result<Vec<_>, _>>()
2048            .map_err(|e| {
2049                IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
2050            })?;
2051        let elapsed = guard.stop_and_record();
2052        info!(elapsed, "Persisted {} events", len);
2053        Ok(())
2054    }
2055
2056    async fn persist_displays(
2057        &self,
2058        display_updates: BTreeMap<String, StoredDisplay>,
2059    ) -> Result<(), IndexerError> {
2060        if display_updates.is_empty() {
2061            return Ok(());
2062        }
2063
2064        self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
2065            .await?
2066    }
2067
2068    fn persist_displays_in_existing_transaction(
2069        &self,
2070        conn: &mut PgConnection,
2071        display_updates: Vec<&StoredDisplay>,
2072    ) -> Result<(), IndexerError> {
2073        if display_updates.is_empty() {
2074            return Ok(());
2075        }
2076
2077        on_conflict_do_update_with_condition!(
2078            display::table,
2079            display_updates,
2080            display::object_type,
2081            (
2082                display::id.eq(excluded(display::id)),
2083                display::version.eq(excluded(display::version)),
2084                display::bcs.eq(excluded(display::bcs)),
2085            ),
2086            excluded(display::version).gt(display::version),
2087            conn
2088        );
2089
2090        Ok(())
2091    }
2092
2093    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2094        if packages.is_empty() {
2095            return Ok(());
2096        }
2097        self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
2098            .await
2099    }
2100
2101    async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2102        if indices.is_empty() {
2103            return Ok(());
2104        }
2105        let len = indices.len();
2106        let guard = self
2107            .metrics
2108            .checkpoint_db_commit_latency_event_indices
2109            .start_timer();
2110        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2111
2112        let futures = chunks.into_iter().map(|chunk| {
2113            self.spawn_task(move |this: Self| async move {
2114                this.persist_event_indices_chunk(chunk).await
2115            })
2116        });
2117
2118        futures::future::try_join_all(futures)
2119            .await
2120            .map_err(|e| {
2121                tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
2122                IndexerError::from(e)
2123            })?
2124            .into_iter()
2125            .collect::<Result<Vec<_>, _>>()
2126            .map_err(|e| {
2127                IndexerError::PostgresWrite(format!(
2128                    "Failed to persist all event_indices chunks: {e:?}"
2129                ))
2130            })?;
2131        let elapsed = guard.stop_and_record();
2132        info!(elapsed, "Persisted {} event_indices chunks", len);
2133        Ok(())
2134    }
2135
2136    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2137        self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2138            .await
2139    }
2140
2141    async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2142        self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2143            .await
2144    }
2145
2146    async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2147        let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2148            (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2149            _ => Err(IndexerError::PostgresRead(format!(
2150                "Failed to get checkpoint range for epoch {epoch}"
2151            ))),
2152        }?;
2153
2154        // NOTE: for disaster recovery, min_cp is the min cp of the current epoch, which
2155        // is likely partially pruned already. min_prunable_cp is the min cp to
2156        // be pruned. By std::cmp::max, we will resume the pruning process from
2157        // the next checkpoint, instead of the first cp of the current epoch.
2158        let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2159        min_cp = std::cmp::max(min_cp, min_prunable_cp);
2160        for cp in min_cp..=max_cp {
2161            // NOTE: the order of pruning tables is crucial:
2162            // 1. prune checkpoints table, checkpoints table is the source table of
2163            //    available range,
2164            // we prune it first to make sure that we always have full data for checkpoints
2165            // within the available range;
2166            // 2. then prune tx_* tables;
2167            // 3. then prune pruner_cp_watermark table, which is the checkpoint pruning
2168            //    watermark table and also tx seq source
2169            // of a checkpoint to prune tx_* tables;
2170            // 4. lastly we prune epochs table when all checkpoints of the epoch have been
2171            //    pruned.
2172            info!(
2173                "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2174                cp, epoch, min_prunable_cp
2175            );
2176            self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2177                .await
2178                .unwrap_or_else(|e| {
2179                    tracing::error!("failed to prune checkpoint {cp}: {e}");
2180                });
2181
2182            let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2183            self.execute_in_blocking_worker(move |this| {
2184                this.prune_tx_indices_table(min_tx, max_tx)
2185            })
2186            .await
2187            .unwrap_or_else(|e| {
2188                tracing::error!("failed to prune transactions for cp {cp}: {e}");
2189            });
2190            info!(
2191                "Pruned transactions for checkpoint {} from tx {} to tx {}",
2192                cp, min_tx, max_tx
2193            );
2194            self.execute_in_blocking_worker(move |this| {
2195                this.prune_event_indices_table(min_tx, max_tx)
2196            })
2197            .await
2198            .unwrap_or_else(|e| {
2199                tracing::error!("failed to prune events of transactions for cp {cp}: {e}");
2200            });
2201            info!(
2202                "Pruned events of transactions for checkpoint {cp} from tx {min_tx} to tx {max_tx}"
2203            );
2204            self.metrics.last_pruned_transaction.set(max_tx as i64);
2205
2206            self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2207                .await
2208                .unwrap_or_else(|e| {
2209                    tracing::error!("failed to prune pruner_cp_watermark table for cp {cp}: {e}");
2210                });
2211            info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2212            self.metrics.last_pruned_checkpoint.set(cp as i64);
2213        }
2214
2215        Ok(())
2216    }
2217
2218    async fn get_network_total_transactions_by_end_of_epoch(
2219        &self,
2220        epoch: u64,
2221    ) -> Result<Option<u64>, IndexerError> {
2222        self.execute_in_blocking_worker(move |this| {
2223            this.get_network_total_transactions_by_end_of_epoch(epoch)
2224        })
2225        .await
2226    }
2227
2228    async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2229        self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2230            .await
2231    }
2232
2233    async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2234        &self,
2235        watermark: CommitterWatermark,
2236    ) -> Result<(), IndexerError>
2237    where
2238        E::Iterator: Iterator<Item: AsRef<str>>,
2239    {
2240        self.execute_in_blocking_worker(move |this| {
2241            this.update_watermarks_upper_bound::<E>(watermark)
2242        })
2243        .await
2244    }
2245
2246    fn as_any(&self) -> &dyn StdAny {
2247        self
2248    }
2249
2250    /// Persist protocol configs and feature flags until the protocol version
2251    /// for the latest epoch we have stored in the db, inclusive.
2252    fn persist_protocol_configs_and_feature_flags(
2253        &self,
2254        chain_id: Vec<u8>,
2255    ) -> Result<(), IndexerError> {
2256        let chain_id = ChainIdentifier::from(
2257            CheckpointDigest::try_from(chain_id).expect("unable to convert chain id"),
2258        );
2259
2260        let mut all_configs = vec![];
2261        let mut all_flags = vec![];
2262
2263        let (start_version, end_version) = self.get_protocol_version_index_range()?;
2264        info!(
2265            "Persisting protocol configs with start_version: {}, end_version: {}",
2266            start_version, end_version
2267        );
2268
2269        // Gather all protocol configs and feature flags for all versions between start
2270        // and end.
2271        for version in start_version..=end_version {
2272            let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2273                (version as u64).into(),
2274                chain_id.chain(),
2275            )
2276            .ok_or(IndexerError::Generic(format!(
2277                "Unable to fetch protocol version {} and chain {:?}",
2278                version,
2279                chain_id.chain()
2280            )))?;
2281            let configs_vec = protocol_configs
2282                .attr_map()
2283                .into_iter()
2284                .map(|(k, v)| StoredProtocolConfig {
2285                    protocol_version: version,
2286                    config_name: k,
2287                    config_value: v.map(|v| v.to_string()),
2288                })
2289                .collect::<Vec<_>>();
2290            all_configs.extend(configs_vec);
2291
2292            let feature_flags = protocol_configs
2293                .feature_map()
2294                .into_iter()
2295                .map(|(k, v)| StoredFeatureFlag {
2296                    protocol_version: version,
2297                    flag_name: k,
2298                    flag_value: v,
2299                })
2300                .collect::<Vec<_>>();
2301            all_flags.extend(feature_flags);
2302        }
2303
2304        transactional_blocking_with_retry!(
2305            &self.blocking_cp,
2306            |conn| {
2307                for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2308                    insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2309                }
2310                insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2311                Ok::<(), IndexerError>(())
2312            },
2313            PG_DB_COMMIT_SLEEP_DURATION
2314        )?;
2315        Ok(())
2316    }
2317
2318    async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2319        if indices.is_empty() {
2320            return Ok(());
2321        }
2322        let len = indices.len();
2323        let guard = self
2324            .metrics
2325            .checkpoint_db_commit_latency_tx_indices
2326            .start_timer();
2327        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2328
2329        let futures = chunks.into_iter().map(|chunk| {
2330            self.spawn_task(move |this: Self| async move {
2331                this.persist_tx_indices_chunk_v2(chunk).await
2332            })
2333        });
2334        futures::future::try_join_all(futures)
2335            .await
2336            .map_err(|e| {
2337                tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2338                IndexerError::from(e)
2339            })?
2340            .into_iter()
2341            .collect::<Result<Vec<_>, _>>()
2342            .map_err(|e| {
2343                IndexerError::PostgresWrite(format!(
2344                    "Failed to persist all tx_indices chunks: {e:?}"
2345                ))
2346            })?;
2347        let elapsed = guard.stop_and_record();
2348        info!(elapsed, "Persisted {} tx_indices chunks", len);
2349        Ok(())
2350    }
2351
2352    async fn persist_checkpoint_objects(
2353        &self,
2354        objects: Vec<CheckpointObjectChanges>,
2355    ) -> Result<(), IndexerError> {
2356        if objects.is_empty() {
2357            return Ok(());
2358        }
2359        let guard = self
2360            .metrics
2361            .checkpoint_db_commit_latency_objects
2362            .start_timer();
2363        let CheckpointObjectChanges {
2364            changed_objects: mutations,
2365            deleted_objects: deletions,
2366        } = retain_latest_objects_from_checkpoint_batch(objects);
2367        let mutation_len = mutations.len();
2368        let deletion_len = deletions.len();
2369
2370        let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2371        let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2372        let mutation_futures = mutation_chunks
2373            .into_iter()
2374            .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2375        let deletion_futures = deletion_chunks
2376            .into_iter()
2377            .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2378        futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2379            .await
2380            .map_err(|e| {
2381                tracing::error!("failed to join futures for persisting objects: {e}");
2382                IndexerError::from(e)
2383            })?
2384            .into_iter()
2385            .collect::<Result<Vec<_>, _>>()
2386            .map_err(|e| {
2387                IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2388            })?;
2389
2390        let elapsed = guard.stop_and_record();
2391        info!(
2392            elapsed,
2393            "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2394        );
2395        Ok(())
2396    }
2397
2398    async fn update_status_for_checkpoint_transactions(
2399        &self,
2400        tx_order: Vec<CheckpointTxGlobalOrder>,
2401    ) -> Result<(), IndexerError> {
2402        let guard = self
2403            .metrics
2404            .checkpoint_db_commit_latency_tx_insertion_order
2405            .start_timer();
2406        let len = tx_order.len();
2407
2408        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2409        let futures = chunks.into_iter().map(|c| {
2410            self.spawn_blocking_task(move |this| {
2411                this.update_status_for_checkpoint_transactions_chunk(c)
2412            })
2413        });
2414
2415        futures::future::try_join_all(futures)
2416            .await
2417            .map_err(|e| {
2418                tracing::error!(
2419                    "failed to join update_status_for_checkpoint_transactions_chunk futures: {e}",
2420                );
2421                IndexerError::from(e)
2422            })?
2423            .into_iter()
2424            .collect::<Result<Vec<_>, _>>()
2425            .map_err(|e| {
2426                IndexerError::PostgresWrite(format!(
2427                    "Failed to update all `tx_global_order` chunks: {e:?}",
2428                ))
2429            })?;
2430        let elapsed = guard.stop_and_record();
2431        info!(
2432            elapsed,
2433            "Updated index status for {len} txs insertion orders"
2434        );
2435        Ok(())
2436    }
2437
2438    async fn persist_tx_global_order(
2439        &self,
2440        tx_order: Vec<CheckpointTxGlobalOrder>,
2441    ) -> Result<(), IndexerError> {
2442        let guard = self
2443            .metrics
2444            .checkpoint_db_commit_latency_tx_insertion_order
2445            .start_timer();
2446        let len = tx_order.len();
2447
2448        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2449        let futures = chunks
2450            .into_iter()
2451            .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2452
2453        futures::future::try_join_all(futures)
2454            .await
2455            .map_err(|e| {
2456                tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2457                IndexerError::from(e)
2458            })?
2459            .into_iter()
2460            .collect::<Result<Vec<_>, _>>()
2461            .map_err(|e| {
2462                IndexerError::PostgresWrite(format!(
2463                    "Failed to persist all txs insertion order chunks: {e:?}",
2464                ))
2465            })?;
2466        let elapsed = guard.stop_and_record();
2467        info!(elapsed, "Persisted {len} txs insertion orders");
2468        Ok(())
2469    }
2470
2471    async fn update_watermarks_lower_bound(
2472        &self,
2473        watermarks: Vec<(PrunableTable, u64)>,
2474    ) -> Result<(), IndexerError> {
2475        self.execute_in_blocking_worker(move |this| this.update_watermarks_lower_bound(watermarks))
2476            .await
2477    }
2478
2479    async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2480        self.execute_in_blocking_worker(move |this| this.get_watermarks())
2481            .await
2482    }
2483}
2484
2485fn make_objects_history_to_commit(
2486    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2487) -> Vec<StoredHistoryObject> {
2488    let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2489        .clone()
2490        .into_iter()
2491        .flat_map(|changes| changes.deleted_objects)
2492        .map(|o| o.into())
2493        .collect();
2494    let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2495        .into_iter()
2496        .flat_map(|changes| changes.changed_objects)
2497        .map(|o| o.into())
2498        .collect();
2499    deleted_objects.into_iter().chain(mutated_objects).collect()
2500}
2501
2502/// Partitions object changes into deletions and mutations.
2503///
2504/// Retains only the highest version of each object among deletions and
2505/// mutations. This allows concurrent insertion into the DB of the resulting
2506/// partitions.
2507fn retain_latest_indexed_objects(
2508    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2509) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2510    use std::collections::HashMap;
2511
2512    let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2513    let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2514
2515    for change in tx_object_changes {
2516        // Remove mutation / deletion with a following deletion / mutation,
2517        // as we expect that following deletion / mutation has a higher version.
2518        // Technically, assertions below are not required, double check just in case.
2519        for mutation in change.changed_objects {
2520            let id = mutation.object.id();
2521            let version = mutation.object.version();
2522
2523            if let Some(existing) = deletions.remove(&id) {
2524                assert!(
2525                    existing.object_version < version.value(),
2526                    "mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2527                    existing.object_version
2528                );
2529            }
2530
2531            if let Some(existing) = mutations.insert(id, mutation) {
2532                assert!(
2533                    existing.object.version() < version,
2534                    "mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2535                    existing.object.version()
2536                );
2537            }
2538        }
2539        // Handle deleted objects
2540        for deletion in change.deleted_objects {
2541            let id = deletion.object_id;
2542            let version = deletion.object_version;
2543
2544            if let Some(existing) = mutations.remove(&id) {
2545                assert!(
2546                    existing.object.version().value() < version,
2547                    "deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2548                    existing.object.version(),
2549                );
2550            }
2551
2552            if let Some(existing) = deletions.insert(id, deletion) {
2553                assert!(
2554                    existing.object_version < version,
2555                    "deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2556                    existing.object_version
2557                );
2558            }
2559        }
2560    }
2561
2562    (
2563        mutations.into_values().collect(),
2564        deletions.into_values().collect(),
2565    )
2566}