iota_indexer/store/
pg_indexer_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use core::result::Result::Ok;
6use std::{
7    any::Any as StdAny,
8    collections::{BTreeMap, HashMap},
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use diesel::{
14    ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
15    dsl::{max, min, sql},
16    sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
17    upsert::excluded,
18};
19use downcast::Any;
20use iota_protocol_config::ProtocolConfig;
21use iota_types::{
22    base_types::ObjectID,
23    digests::{ChainIdentifier, CheckpointDigest},
24    messages_checkpoint::CheckpointSequenceNumber,
25};
26use itertools::Itertools;
27use strum::IntoEnumIterator;
28use tap::TapFallible;
29use tracing::info;
30
31use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
32use crate::{
33    blocking_call_is_ok_or_panic,
34    db::ConnectionPool,
35    errors::{Context, IndexerError, IndexerResult},
36    ingestion::{
37        common::{
38            persist::{CommitterWatermark, ObjectsSnapshotHandlerTables},
39            prepare::{
40                CheckpointObjectChanges, LiveObject, RemovedObject,
41                retain_latest_objects_from_checkpoint_batch,
42            },
43        },
44        primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
45    },
46    insert_or_ignore_into,
47    metrics::IndexerMetrics,
48    models::{
49        checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
50        display::StoredDisplay,
51        epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
52        events::StoredEvent,
53        obj_indices::StoredObjectVersion,
54        objects::{
55            StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
56            StoredObjects,
57        },
58        packages::StoredPackage,
59        transactions::{
60            CheckpointTxGlobalOrder, IndexStatus, OptimisticTransaction, StoredTransaction,
61        },
62        tx_indices::TxIndexSplit,
63        watermarks::StoredWatermark,
64    },
65    on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
66    persist_chunk_into_table_in_existing_connection,
67    pruning::pruner::PrunableTable,
68    read_only_blocking, run_query, run_query_with_retry,
69    schema::{
70        chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
71        event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
72        event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
73        objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
74        transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
75        tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
76        tx_wrapped_or_deleted_objects, watermarks,
77    },
78    store::{IndexerStore, diesel_macro::mark_in_blocking_pool},
79    transactional_blocking_with_retry,
80    types::{
81        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
82        IndexedPackage, IndexedTransaction, TxIndex,
83    },
84};
85
86/// A cursor representing the global order position of transaction according to
87/// tx_global_order table
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct TxGlobalOrderCursor {
90    pub global_sequence_number: i64,
91    pub optimistic_sequence_number: i64,
92}
93
94#[macro_export]
95macro_rules! chunk {
96    ($data: expr, $size: expr) => {{
97        $data
98            .into_iter()
99            .chunks($size)
100            .into_iter()
101            .map(|c| c.collect())
102            .collect::<Vec<Vec<_>>>()
103    }};
104}
105
106macro_rules! prune_tx_or_event_indice_table {
107    ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
108        diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
109            .execute($conn)
110            .map_err(IndexerError::from)
111            .context($context_msg)?;
112    };
113}
114
115// In one DB transaction, the update could be chunked into
116// a few statements, this is the amount of rows to update in one statement
117// TODO: I think with the `per_db_tx` params, `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX`
118// is now less relevant. We should do experiments and remove it if it's true.
119const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
120// The amount of rows to update in one DB transaction
121const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
122// The amount of rows to update in one DB transaction, for objects particularly
123// Having this number too high may cause many db deadlocks because of
124// optimistic locking.
125const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
126const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
127
128#[derive(Clone)]
129pub struct PgIndexerStoreConfig {
130    pub parallel_chunk_size: usize,
131    pub parallel_objects_chunk_size: usize,
132}
133
134pub struct PgIndexerStore {
135    blocking_cp: ConnectionPool,
136    metrics: IndexerMetrics,
137    partition_manager: PgPartitionManager,
138    config: PgIndexerStoreConfig,
139}
140
141impl Clone for PgIndexerStore {
142    fn clone(&self) -> PgIndexerStore {
143        Self {
144            blocking_cp: self.blocking_cp.clone(),
145            metrics: self.metrics.clone(),
146            partition_manager: self.partition_manager.clone(),
147            config: self.config.clone(),
148        }
149    }
150}
151
152impl PgIndexerStore {
153    pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
154        let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
155            .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
156            .parse::<usize>()
157            .unwrap();
158        let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
159            .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
160            .parse::<usize>()
161            .unwrap();
162        let partition_manager = PgPartitionManager::new(blocking_cp.clone())
163            .expect("failed to initialize partition manager");
164        let config = PgIndexerStoreConfig {
165            parallel_chunk_size,
166            parallel_objects_chunk_size,
167        };
168
169        Self {
170            blocking_cp,
171            metrics,
172            partition_manager,
173            config,
174        }
175    }
176
177    pub fn get_metrics(&self) -> IndexerMetrics {
178        self.metrics.clone()
179    }
180
181    pub fn blocking_cp(&self) -> ConnectionPool {
182        self.blocking_cp.clone()
183    }
184
185    pub(crate) async fn get_latest_epoch_id_in_blocking_worker(
186        &self,
187    ) -> Result<Option<u64>, IndexerError> {
188        self.execute_in_blocking_worker(move |this| this.get_latest_epoch_id())
189            .await
190    }
191
192    pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
193        read_only_blocking!(&self.blocking_cp, |conn| {
194            epochs::dsl::epochs
195                .select(max(epochs::epoch))
196                .first::<Option<i64>>(conn)
197                .map(|v| v.map(|v| v as u64))
198        })
199        .context("Failed reading latest epoch id from PostgresDB")
200    }
201
202    /// Get the range of the protocol versions that need to be indexed.
203    pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
204        // We start indexing from the next protocol version after the latest one stored
205        // in the db.
206        let start = read_only_blocking!(&self.blocking_cp, |conn| {
207            protocol_configs::dsl::protocol_configs
208                .select(max(protocol_configs::protocol_version))
209                .first::<Option<i64>>(conn)
210        })
211        .context("Failed reading latest protocol version from PostgresDB")?
212        .map_or(1, |v| v + 1);
213
214        // We end indexing at the protocol version of the latest epoch stored in the db.
215        let end = read_only_blocking!(&self.blocking_cp, |conn| {
216            epochs::dsl::epochs
217                .select(max(epochs::protocol_version))
218                .first::<Option<i64>>(conn)
219        })
220        .context("Failed reading latest epoch protocol version from PostgresDB")?
221        .unwrap_or(1);
222        Ok((start, end))
223    }
224
225    pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
226        read_only_blocking!(&self.blocking_cp, |conn| {
227            chain_identifier::dsl::chain_identifier
228                .select(chain_identifier::checkpoint_digest)
229                .first::<Vec<u8>>(conn)
230                .optional()
231        })
232        .context("Failed reading chain id from PostgresDB")
233    }
234
235    fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
236        read_only_blocking!(&self.blocking_cp, |conn| {
237            checkpoints::dsl::checkpoints
238                .select(max(checkpoints::sequence_number))
239                .first::<Option<i64>>(conn)
240                .map(|v| v.map(|v| v as u64))
241        })
242        .context("Failed reading latest checkpoint sequence number from PostgresDB")
243    }
244
245    fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
246        read_only_blocking!(&self.blocking_cp, |conn| {
247            checkpoints::dsl::checkpoints
248                .select((
249                    min(checkpoints::sequence_number),
250                    max(checkpoints::sequence_number),
251                ))
252                .first::<(Option<i64>, Option<i64>)>(conn)
253                .map(|(min, max)| {
254                    (
255                        min.unwrap_or_default() as u64,
256                        max.unwrap_or_default() as u64,
257                    )
258                })
259        })
260        .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
261    }
262
263    fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
264        read_only_blocking!(&self.blocking_cp, |conn| {
265            epochs::dsl::epochs
266                .select((min(epochs::epoch), max(epochs::epoch)))
267                .first::<(Option<i64>, Option<i64>)>(conn)
268                .map(|(min, max)| {
269                    (
270                        min.unwrap_or_default() as u64,
271                        max.unwrap_or_default() as u64,
272                    )
273                })
274        })
275        .context("Failed reading min and max epoch numbers from PostgresDB")
276    }
277
278    fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
279        read_only_blocking!(&self.blocking_cp, |conn| {
280            pruner_cp_watermark::dsl::pruner_cp_watermark
281                .select(min(pruner_cp_watermark::checkpoint_sequence_number))
282                .first::<Option<i64>>(conn)
283                .map(|v| v.unwrap_or_default() as u64)
284        })
285        .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
286    }
287
288    fn get_checkpoint_range_for_epoch(
289        &self,
290        epoch: u64,
291    ) -> Result<(u64, Option<u64>), IndexerError> {
292        read_only_blocking!(&self.blocking_cp, |conn| {
293            epochs::dsl::epochs
294                .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
295                .filter(epochs::epoch.eq(epoch as i64))
296                .first::<(i64, Option<i64>)>(conn)
297                .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
298        })
299        .context(
300            format!("failed reading checkpoint range from PostgresDB for epoch {epoch}").as_str(),
301        )
302    }
303
304    fn get_transaction_range_for_checkpoint(
305        &self,
306        checkpoint: u64,
307    ) -> Result<(u64, u64), IndexerError> {
308        read_only_blocking!(&self.blocking_cp, |conn| {
309            pruner_cp_watermark::dsl::pruner_cp_watermark
310                .select((
311                    pruner_cp_watermark::min_tx_sequence_number,
312                    pruner_cp_watermark::max_tx_sequence_number,
313                ))
314                .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
315                .first::<(i64, i64)>(conn)
316                .map(|(min, max)| (min as u64, max as u64))
317        })
318        .context(
319            format!("failed reading transaction range from PostgresDB for checkpoint {checkpoint}")
320                .as_str(),
321        )
322    }
323
324    pub(crate) async fn get_global_order_for_tx_seq_in_blocking_worker(
325        &self,
326        tx_seq: i64,
327    ) -> Result<TxGlobalOrderCursor, IndexerError> {
328        self.execute_in_blocking_worker(move |this| this.get_global_order_for_tx_seq(tx_seq))
329            .await
330    }
331
332    fn get_global_order_for_tx_seq(
333        &self,
334        tx_seq: i64,
335    ) -> Result<TxGlobalOrderCursor, IndexerError> {
336        let result = read_only_blocking!(&self.blocking_cp, |conn| {
337            tx_global_order::dsl::tx_global_order
338                .select((
339                    tx_global_order::global_sequence_number,
340                    tx_global_order::optimistic_sequence_number,
341                ))
342                .filter(tx_global_order::chk_tx_sequence_number.eq(tx_seq))
343                .first::<(i64, i64)>(conn)
344        })
345        .context(
346            format!("failed reading global sequence number from PostgresDB for tx seq {tx_seq}")
347                .as_str(),
348        )?;
349        let (global_sequence_number, optimistic_sequence_number) = result;
350        Ok(TxGlobalOrderCursor {
351            global_sequence_number,
352            optimistic_sequence_number,
353        })
354    }
355
356    pub(crate) async fn prune_optimistic_transactions_up_to_in_blocking_worker(
357        &self,
358        to: TxGlobalOrderCursor,
359        limit: i64,
360    ) -> IndexerResult<usize> {
361        self.execute_in_blocking_worker(move |this| {
362            this.prune_optimistic_transactions_up_to(to, limit)
363        })
364        .await
365    }
366
367    fn prune_optimistic_transactions_up_to(
368        &self,
369        to: TxGlobalOrderCursor,
370        limit: i64,
371    ) -> IndexerResult<usize> {
372        transactional_blocking_with_retry!(
373            &self.blocking_cp,
374            |conn| {
375                let sql = r#"
376                    WITH ids_to_delete AS (
377                         SELECT global_sequence_number, optimistic_sequence_number
378                         FROM optimistic_transactions
379                         WHERE (global_sequence_number, optimistic_sequence_number) <= ($1, $2)
380                         ORDER BY global_sequence_number, optimistic_sequence_number
381                         FOR UPDATE LIMIT $3
382                     )
383                     DELETE FROM optimistic_transactions otx
384                     USING ids_to_delete
385                     WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
386                           (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
387                "#;
388                diesel::sql_query(sql)
389                    .bind::<BigInt, _>(to.global_sequence_number)
390                    .bind::<BigInt, _>(to.optimistic_sequence_number)
391                    .bind::<BigInt, _>(limit)
392                    .execute(conn)
393                    .map_err(IndexerError::from)
394                    .context(
395                        format!("failed to prune optimistic_transactions table to {to:?} with limit {limit}").as_str(),
396                    )
397            },
398            PG_DB_COMMIT_SLEEP_DURATION
399        )
400    }
401
402    fn get_latest_object_snapshot_watermark(
403        &self,
404    ) -> Result<Option<CommitterWatermark>, IndexerError> {
405        read_only_blocking!(&self.blocking_cp, |conn| {
406            watermarks::table
407                .select((
408                    watermarks::epoch_hi_inclusive,
409                    watermarks::checkpoint_hi_inclusive,
410                    watermarks::tx_hi,
411                ))
412                .filter(
413                    watermarks::entity
414                        .eq(ObjectsSnapshotHandlerTables::ObjectsSnapshot.to_string()),
415                )
416                .first::<(i64, i64, i64)>(conn)
417                // Handle case where the watermark is not set yet
418                .optional()
419                .map(|v| {
420                    v.map(|(epoch, cp, tx)| CommitterWatermark {
421                        epoch_hi_inclusive: epoch as u64,
422                        checkpoint_hi_inclusive: cp as u64,
423                        tx_hi: tx as u64,
424                    })
425                })
426        })
427        .context("Failed reading latest object snapshot watermark from PostgresDB")
428    }
429
430    fn get_latest_object_snapshot_checkpoint_sequence_number(
431        &self,
432    ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
433        read_only_blocking!(&self.blocking_cp, |conn| {
434            objects_snapshot::table
435                .select(max(objects_snapshot::checkpoint_sequence_number))
436                .first::<Option<i64>>(conn)
437                .map(|v| v.map(|v| v as CheckpointSequenceNumber))
438        })
439        .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
440    }
441
442    fn persist_display_updates(
443        &self,
444        display_updates: BTreeMap<String, StoredDisplay>,
445    ) -> Result<(), IndexerError> {
446        transactional_blocking_with_retry!(
447            &self.blocking_cp,
448            {
449                let value = display_updates.values().collect::<Vec<_>>();
450                |conn| self.persist_displays_in_existing_transaction(conn, value)
451            },
452            PG_DB_COMMIT_SLEEP_DURATION
453        )?;
454
455        Ok(())
456    }
457
458    fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
459        let guard = self
460            .metrics
461            .checkpoint_db_commit_latency_objects_chunks
462            .start_timer();
463        let len = objects.len();
464        let raw_query = r#"
465            INSERT INTO objects (
466                object_id,
467                object_version,
468                object_digest,
469                owner_type,
470                owner_id,
471                object_type,
472                object_type_package,
473                object_type_module,
474                object_type_name,
475                serialized_object,
476                coin_type,
477                coin_balance,
478                df_kind
479            )
480            SELECT
481                u.object_id,
482                u.object_version,
483                u.object_digest,
484                u.owner_type,
485                u.owner_id,
486                u.object_type,
487                u.object_type_package,
488                u.object_type_module,
489                u.object_type_name,
490                u.serialized_object,
491                u.coin_type,
492                u.coin_balance,
493                u.df_kind
494            FROM UNNEST(
495                $1::BYTEA[],
496                $2::BIGINT[],
497                $3::BYTEA[],
498                $4::SMALLINT[],
499                $5::BYTEA[],
500                $6::TEXT[],
501                $7::BYTEA[],
502                $8::TEXT[],
503                $9::TEXT[],
504                $10::BYTEA[],
505                $11::TEXT[],
506                $12::BIGINT[],
507                $13::SMALLINT[],
508                $14::BYTEA[]
509            ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest)
510            LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
511            WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
512            ON CONFLICT (object_id) DO UPDATE
513            SET
514                object_version = EXCLUDED.object_version,
515                object_digest = EXCLUDED.object_digest,
516                owner_type = EXCLUDED.owner_type,
517                owner_id = EXCLUDED.owner_id,
518                object_type = EXCLUDED.object_type,
519                object_type_package = EXCLUDED.object_type_package,
520                object_type_module = EXCLUDED.object_type_module,
521                object_type_name = EXCLUDED.object_type_name,
522                serialized_object = EXCLUDED.serialized_object,
523                coin_type = EXCLUDED.coin_type,
524                coin_balance = EXCLUDED.coin_balance,
525                df_kind = EXCLUDED.df_kind
526        "#;
527        let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
528            .into_iter()
529            .map(LiveObject::split)
530            .map(|(indexed_object, tx_digest)| {
531                (
532                    StoredObject::from(indexed_object),
533                    tx_digest.into_inner().to_vec(),
534                )
535            })
536            .unzip();
537        let query = diesel::sql_query(raw_query)
538            .bind::<Array<Bytea>, _>(objects.object_ids)
539            .bind::<Array<BigInt>, _>(objects.object_versions)
540            .bind::<Array<Bytea>, _>(objects.object_digests)
541            .bind::<Array<SmallInt>, _>(objects.owner_types)
542            .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
543            .bind::<Array<Nullable<Text>>, _>(objects.object_types)
544            .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
545            .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
546            .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
547            .bind::<Array<Bytea>, _>(objects.serialized_objects)
548            .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
549            .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
550            .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
551            .bind::<Array<Bytea>, _>(tx_digests);
552        transactional_blocking_with_retry!(
553            &self.blocking_cp,
554            |conn| {
555                query.clone().execute(conn)?;
556                Ok::<(), IndexerError>(())
557            },
558            PG_DB_COMMIT_SLEEP_DURATION
559        )
560        .tap_ok(|_| {
561            let elapsed = guard.stop_and_record();
562            info!(elapsed, "Persisted {len} chunked objects");
563        })
564        .tap_err(|e| {
565            tracing::error!("failed to persist object mutations with error: {e}");
566        })
567    }
568
569    fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
570        let guard = self
571            .metrics
572            .checkpoint_db_commit_latency_objects_chunks
573            .start_timer();
574        let len = objects.len();
575        let raw_query = r#"
576            DELETE FROM objects
577            WHERE object_id IN (
578                SELECT u.object_id
579                FROM UNNEST(
580                    $1::BYTEA[],
581                    $2::BYTEA[]
582                ) AS u(object_id, tx_digest)
583                LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
584                WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
585            )
586        "#;
587        let (object_ids, tx_digests): (Vec<_>, Vec<_>) = objects
588            .into_iter()
589            .map(|removed_object| {
590                (
591                    removed_object.object_id().to_vec(),
592                    removed_object.transaction_digest.into_inner().to_vec(),
593                )
594            })
595            .unzip();
596        let query = diesel::sql_query(raw_query)
597            .bind::<Array<Bytea>, _>(object_ids)
598            .bind::<Array<Bytea>, _>(tx_digests);
599        transactional_blocking_with_retry!(
600            &self.blocking_cp,
601            |conn| {
602                query.clone().execute(conn)?;
603                Ok::<(), IndexerError>(())
604            },
605            PG_DB_COMMIT_SLEEP_DURATION
606        )
607        .tap_ok(|_| {
608            let elapsed = guard.stop_and_record();
609            info!(elapsed, "Deleted {len} chunked objects");
610        })
611        .tap_err(|e| {
612            tracing::error!("failed to persist object deletions with error: {e}");
613        })
614    }
615
616    fn persist_object_mutation_chunk_in_existing_transaction(
617        &self,
618        conn: &mut PgConnection,
619        mutated_object_mutation_chunk: Vec<StoredObject>,
620    ) -> Result<(), IndexerError> {
621        on_conflict_do_update!(
622            objects::table,
623            mutated_object_mutation_chunk,
624            objects::object_id,
625            (
626                objects::object_id.eq(excluded(objects::object_id)),
627                objects::object_version.eq(excluded(objects::object_version)),
628                objects::object_digest.eq(excluded(objects::object_digest)),
629                objects::owner_type.eq(excluded(objects::owner_type)),
630                objects::owner_id.eq(excluded(objects::owner_id)),
631                objects::object_type.eq(excluded(objects::object_type)),
632                objects::serialized_object.eq(excluded(objects::serialized_object)),
633                objects::coin_type.eq(excluded(objects::coin_type)),
634                objects::coin_balance.eq(excluded(objects::coin_balance)),
635                objects::df_kind.eq(excluded(objects::df_kind)),
636            ),
637            conn
638        );
639        Ok::<(), IndexerError>(())
640    }
641
642    fn persist_object_deletion_chunk_in_existing_transaction(
643        &self,
644        conn: &mut PgConnection,
645        deleted_objects_chunk: Vec<StoredDeletedObject>,
646    ) -> Result<(), IndexerError> {
647        diesel::delete(
648            objects::table.filter(
649                objects::object_id.eq_any(
650                    deleted_objects_chunk
651                        .iter()
652                        .map(|o| o.object_id.clone())
653                        .collect::<Vec<_>>(),
654                ),
655            ),
656        )
657        .execute(conn)
658        .map_err(IndexerError::from)
659        .context("Failed to write object deletion to PostgresDB")?;
660
661        Ok::<(), IndexerError>(())
662    }
663
664    fn backfill_objects_snapshot_chunk(
665        &self,
666        objects_snapshot: Vec<StoredObjectSnapshot>,
667    ) -> Result<(), IndexerError> {
668        let guard = self
669            .metrics
670            .checkpoint_db_commit_latency_objects_snapshot_chunks
671            .start_timer();
672        transactional_blocking_with_retry!(
673            &self.blocking_cp,
674            |conn| {
675                for objects_snapshot_chunk in
676                    objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
677                {
678                    on_conflict_do_update!(
679                        objects_snapshot::table,
680                        objects_snapshot_chunk,
681                        objects_snapshot::object_id,
682                        (
683                            objects_snapshot::object_version
684                                .eq(excluded(objects_snapshot::object_version)),
685                            objects_snapshot::object_status
686                                .eq(excluded(objects_snapshot::object_status)),
687                            objects_snapshot::object_digest
688                                .eq(excluded(objects_snapshot::object_digest)),
689                            objects_snapshot::checkpoint_sequence_number
690                                .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
691                            objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
692                            objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
693                            objects_snapshot::object_type_package
694                                .eq(excluded(objects_snapshot::object_type_package)),
695                            objects_snapshot::object_type_module
696                                .eq(excluded(objects_snapshot::object_type_module)),
697                            objects_snapshot::object_type_name
698                                .eq(excluded(objects_snapshot::object_type_name)),
699                            objects_snapshot::object_type
700                                .eq(excluded(objects_snapshot::object_type)),
701                            objects_snapshot::serialized_object
702                                .eq(excluded(objects_snapshot::serialized_object)),
703                            objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
704                            objects_snapshot::coin_balance
705                                .eq(excluded(objects_snapshot::coin_balance)),
706                            objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
707                        ),
708                        conn
709                    );
710                }
711                Ok::<(), IndexerError>(())
712            },
713            PG_DB_COMMIT_SLEEP_DURATION
714        )
715        .tap_ok(|_| {
716            let elapsed = guard.stop_and_record();
717            info!(
718                elapsed,
719                "Persisted {} chunked objects snapshot",
720                objects_snapshot.len(),
721            );
722        })
723        .tap_err(|e| {
724            tracing::error!("failed to persist object snapshot with error: {e}");
725        })
726    }
727
728    fn persist_objects_history_chunk(
729        &self,
730        stored_objects_history: Vec<StoredHistoryObject>,
731    ) -> Result<(), IndexerError> {
732        let guard = self
733            .metrics
734            .checkpoint_db_commit_latency_objects_history_chunks
735            .start_timer();
736        transactional_blocking_with_retry!(
737            &self.blocking_cp,
738            |conn| {
739                for stored_objects_history_chunk in
740                    stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
741                {
742                    insert_or_ignore_into!(
743                        objects_history::table,
744                        stored_objects_history_chunk,
745                        conn
746                    );
747                }
748                Ok::<(), IndexerError>(())
749            },
750            PG_DB_COMMIT_SLEEP_DURATION
751        )
752        .tap_ok(|_| {
753            let elapsed = guard.stop_and_record();
754            info!(
755                elapsed,
756                "Persisted {} chunked objects history",
757                stored_objects_history.len(),
758            );
759        })
760        .tap_err(|e| {
761            tracing::error!("failed to persist object history with error: {e}");
762        })
763    }
764
765    fn persist_object_version_chunk(
766        &self,
767        object_versions: Vec<StoredObjectVersion>,
768    ) -> Result<(), IndexerError> {
769        let guard = self
770            .metrics
771            .checkpoint_db_commit_latency_objects_version_chunks
772            .start_timer();
773
774        transactional_blocking_with_retry!(
775            &self.blocking_cp,
776            |conn| {
777                for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
778                {
779                    insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
780                }
781                Ok::<(), IndexerError>(())
782            },
783            PG_DB_COMMIT_SLEEP_DURATION
784        )
785        .tap_ok(|_| {
786            let elapsed = guard.stop_and_record();
787            info!(
788                elapsed,
789                "Persisted {} chunked object versions",
790                object_versions.len(),
791            );
792        })
793        .tap_err(|e| {
794            tracing::error!("failed to persist object versions with error: {e}");
795        })
796    }
797
798    fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
799        let Some(first_checkpoint) = checkpoints.first() else {
800            return Ok(());
801        };
802
803        // If the first checkpoint has sequence number 0, we need to persist the digest
804        // as chain identifier.
805        if first_checkpoint.sequence_number == 0 {
806            let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
807            self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
808            transactional_blocking_with_retry!(
809                &self.blocking_cp,
810                |conn| {
811                    let checkpoint_digest =
812                        first_checkpoint.checkpoint_digest.into_inner().to_vec();
813                    insert_or_ignore_into!(
814                        chain_identifier::table,
815                        StoredChainIdentifier { checkpoint_digest },
816                        conn
817                    );
818                    Ok::<(), IndexerError>(())
819                },
820                PG_DB_COMMIT_SLEEP_DURATION
821            )?;
822        }
823        let guard = self
824            .metrics
825            .checkpoint_db_commit_latency_checkpoints
826            .start_timer();
827
828        let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
829        transactional_blocking_with_retry!(
830            &self.blocking_cp,
831            |conn| {
832                for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
833                    insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
834                }
835                Ok::<(), IndexerError>(())
836            },
837            PG_DB_COMMIT_SLEEP_DURATION
838        )
839        .tap_ok(|_| {
840            info!(
841                "Persisted {} pruner_cp_watermark rows.",
842                stored_cp_txs.len(),
843            );
844        })
845        .tap_err(|e| {
846            tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
847        })?;
848
849        let stored_checkpoints = checkpoints
850            .iter()
851            .map(StoredCheckpoint::from)
852            .collect::<Vec<_>>();
853        transactional_blocking_with_retry!(
854            &self.blocking_cp,
855            |conn| {
856                for stored_checkpoint_chunk in
857                    stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
858                {
859                    insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
860                    let time_now_ms = chrono::Utc::now().timestamp_millis();
861                    for stored_checkpoint in stored_checkpoint_chunk {
862                        self.metrics
863                            .db_commit_lag_ms
864                            .set(time_now_ms - stored_checkpoint.timestamp_ms);
865                        self.metrics.max_committed_checkpoint_sequence_number.set(
866                            stored_checkpoint.sequence_number,
867                        );
868                        self.metrics.committed_checkpoint_timestamp_ms.set(
869                            stored_checkpoint.timestamp_ms,
870                        );
871                    }
872                    for stored_checkpoint in stored_checkpoint_chunk {
873                        info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
874                    }
875                }
876                Ok::<(), IndexerError>(())
877            },
878            PG_DB_COMMIT_SLEEP_DURATION
879        )
880        .tap_ok(|_| {
881            let elapsed = guard.stop_and_record();
882            info!(
883                elapsed,
884                "Persisted {} checkpoints",
885                stored_checkpoints.len()
886            );
887        })
888        .tap_err(|e| {
889            tracing::error!("failed to persist checkpoints with error: {e}");
890        })
891    }
892
893    fn persist_transactions_chunk(
894        &self,
895        transactions: Vec<IndexedTransaction>,
896    ) -> Result<(), IndexerError> {
897        let guard = self
898            .metrics
899            .checkpoint_db_commit_latency_transactions_chunks
900            .start_timer();
901        let transformation_guard = self
902            .metrics
903            .checkpoint_db_commit_latency_transactions_chunks_transformation
904            .start_timer();
905        let transactions = transactions
906            .iter()
907            .map(StoredTransaction::from)
908            .collect::<Vec<_>>();
909        drop(transformation_guard);
910
911        transactional_blocking_with_retry!(
912            &self.blocking_cp,
913            |conn| {
914                for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
915                    insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
916                }
917                Ok::<(), IndexerError>(())
918            },
919            PG_DB_COMMIT_SLEEP_DURATION
920        )
921        .tap_ok(|_| {
922            let elapsed = guard.stop_and_record();
923            info!(
924                elapsed,
925                "Persisted {} chunked transactions",
926                transactions.len()
927            );
928        })
929        .tap_err(|e| {
930            tracing::error!("failed to persist transactions with error: {e}");
931        })
932    }
933
934    fn persist_tx_global_order_chunk(
935        &self,
936        tx_order: Vec<CheckpointTxGlobalOrder>,
937    ) -> Result<(), IndexerError> {
938        let guard = self
939            .metrics
940            .checkpoint_db_commit_latency_tx_insertion_order_chunks
941            .start_timer();
942
943        transactional_blocking_with_retry!(
944            &self.blocking_cp,
945            |conn| {
946                for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
947                    insert_or_ignore_into!(tx_global_order::table, tx_order_chunk, conn);
948                }
949                Ok::<(), IndexerError>(())
950            },
951            PG_DB_COMMIT_SLEEP_DURATION
952        )
953        .tap_ok(|_| {
954            let elapsed = guard.stop_and_record();
955            info!(
956                elapsed,
957                "Persisted {} chunked txs insertion order",
958                tx_order.len()
959            );
960        })
961        .tap_err(|e| {
962            tracing::error!("failed to persist txs insertion order with error: {e}");
963        })
964    }
965
966    /// We enforce index-status semantics for checkpointed transactions
967    /// in `tx_global_order`.
968    ///
969    /// Namely, checkpointed transactions (i.e. with `optimistic_sequence_number
970    /// == 0`) are updated to `optimistic_sequence_number == -1` to indicate
971    /// that they have been persisted in the database.
972    fn update_status_for_checkpoint_transactions_chunk(
973        &self,
974        tx_order: Vec<CheckpointTxGlobalOrder>,
975    ) -> Result<(), IndexerError> {
976        let guard = self
977            .metrics
978            .checkpoint_db_commit_latency_tx_insertion_order_chunks
979            .start_timer();
980
981        let num_transactions = tx_order.len();
982        transactional_blocking_with_retry!(
983            &self.blocking_cp,
984            |conn| {
985                on_conflict_do_update_with_condition!(
986                    tx_global_order::table,
987                    tx_order.clone(),
988                    tx_global_order::tx_digest,
989                    tx_global_order::optimistic_sequence_number.eq(IndexStatus::Completed),
990                    tx_global_order::optimistic_sequence_number.eq(IndexStatus::Started),
991                    conn
992                );
993                on_conflict_do_update_with_condition!(
994                    tx_global_order::table,
995                    tx_order.clone(),
996                    tx_global_order::tx_digest,
997                    tx_global_order::chk_tx_sequence_number
998                        .eq(excluded(tx_global_order::chk_tx_sequence_number)),
999                    tx_global_order::chk_tx_sequence_number.is_null(),
1000                    conn
1001                );
1002                Ok::<(), IndexerError>(())
1003            },
1004            PG_DB_COMMIT_SLEEP_DURATION
1005        )
1006        .tap_ok(|_| {
1007            let elapsed = guard.stop_and_record();
1008            info!(
1009                elapsed,
1010                "Updated {} chunked values of `tx_global_order`", num_transactions
1011            );
1012        })
1013        .tap_err(|e| {
1014            tracing::error!("failed to update `tx_global_order` with error: {e}");
1015        })
1016    }
1017
1018    fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1019        let guard = self
1020            .metrics
1021            .checkpoint_db_commit_latency_events_chunks
1022            .start_timer();
1023        let len = events.len();
1024        let events = events
1025            .into_iter()
1026            .map(StoredEvent::from)
1027            .collect::<Vec<_>>();
1028
1029        transactional_blocking_with_retry!(
1030            &self.blocking_cp,
1031            |conn| {
1032                for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1033                    insert_or_ignore_into!(events::table, event_chunk, conn);
1034                }
1035                Ok::<(), IndexerError>(())
1036            },
1037            PG_DB_COMMIT_SLEEP_DURATION
1038        )
1039        .tap_ok(|_| {
1040            let elapsed = guard.stop_and_record();
1041            info!(elapsed, "Persisted {} chunked events", len);
1042        })
1043        .tap_err(|e| {
1044            tracing::error!("failed to persist events with error: {e}");
1045        })
1046    }
1047
1048    fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1049        if packages.is_empty() {
1050            return Ok(());
1051        }
1052        let guard = self
1053            .metrics
1054            .checkpoint_db_commit_latency_packages
1055            .start_timer();
1056        let packages = packages
1057            .into_iter()
1058            .map(StoredPackage::from)
1059            .collect::<Vec<_>>();
1060        transactional_blocking_with_retry!(
1061            &self.blocking_cp,
1062            |conn| {
1063                for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1064                    on_conflict_do_update!(
1065                        packages::table,
1066                        packages_chunk,
1067                        packages::package_id,
1068                        (
1069                            packages::package_id.eq(excluded(packages::package_id)),
1070                            packages::move_package.eq(excluded(packages::move_package)),
1071                        ),
1072                        conn
1073                    );
1074                }
1075                Ok::<(), IndexerError>(())
1076            },
1077            PG_DB_COMMIT_SLEEP_DURATION
1078        )
1079        .tap_ok(|_| {
1080            let elapsed = guard.stop_and_record();
1081            info!(elapsed, "Persisted {} packages", packages.len());
1082        })
1083        .tap_err(|e| {
1084            tracing::error!("failed to persist packages with error: {e}");
1085        })
1086    }
1087
1088    async fn persist_event_indices_chunk(
1089        &self,
1090        indices: Vec<EventIndex>,
1091    ) -> Result<(), IndexerError> {
1092        let guard = self
1093            .metrics
1094            .checkpoint_db_commit_latency_event_indices_chunks
1095            .start_timer();
1096        let len = indices.len();
1097        let (
1098            event_emit_packages,
1099            event_emit_modules,
1100            event_senders,
1101            event_struct_packages,
1102            event_struct_modules,
1103            event_struct_names,
1104            event_struct_instantiations,
1105        ) = indices.into_iter().map(|i| i.split()).fold(
1106            (
1107                Vec::new(),
1108                Vec::new(),
1109                Vec::new(),
1110                Vec::new(),
1111                Vec::new(),
1112                Vec::new(),
1113                Vec::new(),
1114            ),
1115            |(
1116                mut event_emit_packages,
1117                mut event_emit_modules,
1118                mut event_senders,
1119                mut event_struct_packages,
1120                mut event_struct_modules,
1121                mut event_struct_names,
1122                mut event_struct_instantiations,
1123            ),
1124             index| {
1125                event_emit_packages.push(index.0);
1126                event_emit_modules.push(index.1);
1127                event_senders.push(index.2);
1128                event_struct_packages.push(index.3);
1129                event_struct_modules.push(index.4);
1130                event_struct_names.push(index.5);
1131                event_struct_instantiations.push(index.6);
1132                (
1133                    event_emit_packages,
1134                    event_emit_modules,
1135                    event_senders,
1136                    event_struct_packages,
1137                    event_struct_modules,
1138                    event_struct_names,
1139                    event_struct_instantiations,
1140                )
1141            },
1142        );
1143
1144        // Now persist all the event indices in parallel into their tables.
1145        let mut futures = vec![];
1146        futures.push(self.spawn_blocking_task(move |this| {
1147            persist_chunk_into_table!(
1148                event_emit_package::table,
1149                event_emit_packages,
1150                &this.blocking_cp
1151            )
1152        }));
1153
1154        futures.push(self.spawn_blocking_task(move |this| {
1155            persist_chunk_into_table!(
1156                event_emit_module::table,
1157                event_emit_modules,
1158                &this.blocking_cp
1159            )
1160        }));
1161
1162        futures.push(self.spawn_blocking_task(move |this| {
1163            persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
1164        }));
1165
1166        futures.push(self.spawn_blocking_task(move |this| {
1167            persist_chunk_into_table!(
1168                event_struct_package::table,
1169                event_struct_packages,
1170                &this.blocking_cp
1171            )
1172        }));
1173
1174        futures.push(self.spawn_blocking_task(move |this| {
1175            persist_chunk_into_table!(
1176                event_struct_module::table,
1177                event_struct_modules,
1178                &this.blocking_cp
1179            )
1180        }));
1181
1182        futures.push(self.spawn_blocking_task(move |this| {
1183            persist_chunk_into_table!(
1184                event_struct_name::table,
1185                event_struct_names,
1186                &this.blocking_cp
1187            )
1188        }));
1189
1190        futures.push(self.spawn_blocking_task(move |this| {
1191            persist_chunk_into_table!(
1192                event_struct_instantiation::table,
1193                event_struct_instantiations,
1194                &this.blocking_cp
1195            )
1196        }));
1197
1198        futures::future::try_join_all(futures)
1199            .await
1200            .map_err(|e| {
1201                tracing::error!("failed to join event indices futures in a chunk: {e}");
1202                IndexerError::from(e)
1203            })?
1204            .into_iter()
1205            .collect::<Result<Vec<_>, _>>()
1206            .map_err(|e| {
1207                IndexerError::PostgresWrite(format!(
1208                    "Failed to persist all event indices in a chunk: {e:?}"
1209                ))
1210            })?;
1211        let elapsed = guard.stop_and_record();
1212        info!(elapsed, "Persisted {} chunked event indices", len);
1213        Ok(())
1214    }
1215
1216    async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1217        let guard = self
1218            .metrics
1219            .checkpoint_db_commit_latency_tx_indices_chunks
1220            .start_timer();
1221        let len = indices.len();
1222
1223        let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
1224
1225        let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1226        let recipients: Vec<_> = splits
1227            .iter()
1228            .flat_map(|ix| ix.tx_recipients.clone())
1229            .collect();
1230        let input_objects: Vec<_> = splits
1231            .iter()
1232            .flat_map(|ix| ix.tx_input_objects.clone())
1233            .collect();
1234        let changed_objects: Vec<_> = splits
1235            .iter()
1236            .flat_map(|ix| ix.tx_changed_objects.clone())
1237            .collect();
1238        let wrapped_or_deleted_objects: Vec<_> = splits
1239            .iter()
1240            .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1241            .collect();
1242        let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1243        let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1244        let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1245        let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1246        let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1247
1248        let futures = [
1249            self.spawn_blocking_task(move |this| {
1250                persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1251            }),
1252            self.spawn_blocking_task(move |this| {
1253                persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1254            }),
1255            self.spawn_blocking_task(move |this| {
1256                persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1257            }),
1258            self.spawn_blocking_task(move |this| {
1259                persist_chunk_into_table!(
1260                    tx_changed_objects::table,
1261                    changed_objects,
1262                    &this.blocking_cp
1263                )
1264            }),
1265            self.spawn_blocking_task(move |this| {
1266                persist_chunk_into_table!(
1267                    tx_wrapped_or_deleted_objects::table,
1268                    wrapped_or_deleted_objects,
1269                    &this.blocking_cp
1270                )
1271            }),
1272            self.spawn_blocking_task(move |this| {
1273                persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1274            }),
1275            self.spawn_blocking_task(move |this| {
1276                persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1277            }),
1278            self.spawn_blocking_task(move |this| {
1279                persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1280            }),
1281            self.spawn_blocking_task(move |this| {
1282                persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1283            }),
1284            self.spawn_blocking_task(move |this| {
1285                persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1286            }),
1287        ];
1288
1289        futures::future::try_join_all(futures)
1290            .await
1291            .map_err(|e| {
1292                tracing::error!("failed to join tx indices futures in a chunk: {e}");
1293                IndexerError::from(e)
1294            })?
1295            .into_iter()
1296            .collect::<Result<Vec<_>, _>>()
1297            .map_err(|e| {
1298                IndexerError::PostgresWrite(format!(
1299                    "Failed to persist all tx indices in a chunk: {e:?}"
1300                ))
1301            })?;
1302        let elapsed = guard.stop_and_record();
1303        info!(elapsed, "Persisted {} chunked tx_indices", len);
1304        Ok(())
1305    }
1306
1307    fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1308        let guard = self
1309            .metrics
1310            .checkpoint_db_commit_latency_epoch
1311            .start_timer();
1312        let epoch_id = epoch.new_epoch.epoch;
1313
1314        transactional_blocking_with_retry!(
1315            &self.blocking_cp,
1316            |conn| {
1317                if let Some(last_epoch) = &epoch.last_epoch {
1318                    info!(last_epoch.epoch, "Persisting epoch end data.");
1319                    diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1320                        .set(last_epoch)
1321                        .execute(conn)?;
1322                }
1323
1324                info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1325                insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1326                Ok::<(), IndexerError>(())
1327            },
1328            PG_DB_COMMIT_SLEEP_DURATION
1329        )
1330        .tap_ok(|_| {
1331            let elapsed = guard.stop_and_record();
1332            info!(elapsed, epoch_id, "Persisted epoch beginning info");
1333        })
1334        .tap_err(|e| {
1335            tracing::error!("failed to persist epoch with error: {e}");
1336        })
1337    }
1338
1339    fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1340        let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1341        // partition_0 has been created, so no need to advance it.
1342        if let Some(last_epoch_id) = last_epoch_id {
1343            let last_db_epoch: Option<StoredEpochInfo> =
1344                read_only_blocking!(&self.blocking_cp, |conn| {
1345                    epochs::table
1346                        .filter(epochs::epoch.eq(last_epoch_id))
1347                        .first::<StoredEpochInfo>(conn)
1348                        .optional()
1349                })
1350                .context("Failed to read last epoch from PostgresDB")?;
1351            if let Some(last_epoch) = last_db_epoch {
1352                let epoch_partition_data =
1353                    EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1354                let table_partitions = self.partition_manager.get_table_partitions()?;
1355                for (table, (_, last_partition)) in table_partitions {
1356                    // Only advance epoch partition for epoch partitioned tables.
1357                    if !self
1358                        .partition_manager
1359                        .get_strategy(&table)
1360                        .is_epoch_partitioned()
1361                    {
1362                        continue;
1363                    }
1364                    let guard = self.metrics.advance_epoch_latency.start_timer();
1365                    self.partition_manager.advance_epoch(
1366                        table.clone(),
1367                        last_partition,
1368                        &epoch_partition_data,
1369                    )?;
1370                    let elapsed = guard.stop_and_record();
1371                    info!(
1372                        elapsed,
1373                        "Advanced epoch partition {} for table {}",
1374                        last_partition,
1375                        table.clone()
1376                    );
1377                }
1378            } else {
1379                tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1380            }
1381        }
1382
1383        Ok(())
1384    }
1385
1386    fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1387        transactional_blocking_with_retry!(
1388            &self.blocking_cp,
1389            |conn| {
1390                diesel::delete(
1391                    checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1392                )
1393                .execute(conn)
1394                .map_err(IndexerError::from)
1395                .context("Failed to prune checkpoints table")?;
1396
1397                Ok::<(), IndexerError>(())
1398            },
1399            PG_DB_COMMIT_SLEEP_DURATION
1400        )
1401    }
1402
1403    fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1404        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1405        transactional_blocking_with_retry!(
1406            &self.blocking_cp,
1407            |conn| {
1408                prune_tx_or_event_indice_table!(
1409                    event_emit_module,
1410                    conn,
1411                    min_tx,
1412                    max_tx,
1413                    "Failed to prune event_emit_module table"
1414                );
1415                prune_tx_or_event_indice_table!(
1416                    event_emit_package,
1417                    conn,
1418                    min_tx,
1419                    max_tx,
1420                    "Failed to prune event_emit_package table"
1421                );
1422                prune_tx_or_event_indice_table![
1423                    event_senders,
1424                    conn,
1425                    min_tx,
1426                    max_tx,
1427                    "Failed to prune event_senders table"
1428                ];
1429                prune_tx_or_event_indice_table![
1430                    event_struct_instantiation,
1431                    conn,
1432                    min_tx,
1433                    max_tx,
1434                    "Failed to prune event_struct_instantiation table"
1435                ];
1436                prune_tx_or_event_indice_table![
1437                    event_struct_module,
1438                    conn,
1439                    min_tx,
1440                    max_tx,
1441                    "Failed to prune event_struct_module table"
1442                ];
1443                prune_tx_or_event_indice_table![
1444                    event_struct_name,
1445                    conn,
1446                    min_tx,
1447                    max_tx,
1448                    "Failed to prune event_struct_name table"
1449                ];
1450                prune_tx_or_event_indice_table![
1451                    event_struct_package,
1452                    conn,
1453                    min_tx,
1454                    max_tx,
1455                    "Failed to prune event_struct_package table"
1456                ];
1457                Ok::<(), IndexerError>(())
1458            },
1459            PG_DB_COMMIT_SLEEP_DURATION
1460        )
1461    }
1462
1463    fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1464        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1465        transactional_blocking_with_retry!(
1466            &self.blocking_cp,
1467            |conn| {
1468                prune_tx_or_event_indice_table!(
1469                    tx_senders,
1470                    conn,
1471                    min_tx,
1472                    max_tx,
1473                    "Failed to prune tx_senders table"
1474                );
1475                prune_tx_or_event_indice_table!(
1476                    tx_recipients,
1477                    conn,
1478                    min_tx,
1479                    max_tx,
1480                    "Failed to prune tx_recipients table"
1481                );
1482                prune_tx_or_event_indice_table![
1483                    tx_input_objects,
1484                    conn,
1485                    min_tx,
1486                    max_tx,
1487                    "Failed to prune tx_input_objects table"
1488                ];
1489                prune_tx_or_event_indice_table![
1490                    tx_changed_objects,
1491                    conn,
1492                    min_tx,
1493                    max_tx,
1494                    "Failed to prune tx_changed_objects table"
1495                ];
1496                prune_tx_or_event_indice_table![
1497                    tx_wrapped_or_deleted_objects,
1498                    conn,
1499                    min_tx,
1500                    max_tx,
1501                    "Failed to prune tx_wrapped_or_deleted_objects table"
1502                ];
1503                prune_tx_or_event_indice_table![
1504                    tx_calls_pkg,
1505                    conn,
1506                    min_tx,
1507                    max_tx,
1508                    "Failed to prune tx_calls_pkg table"
1509                ];
1510                prune_tx_or_event_indice_table![
1511                    tx_calls_mod,
1512                    conn,
1513                    min_tx,
1514                    max_tx,
1515                    "Failed to prune tx_calls_mod table"
1516                ];
1517                prune_tx_or_event_indice_table![
1518                    tx_calls_fun,
1519                    conn,
1520                    min_tx,
1521                    max_tx,
1522                    "Failed to prune tx_calls_fun table"
1523                ];
1524                prune_tx_or_event_indice_table![
1525                    tx_digests,
1526                    conn,
1527                    min_tx,
1528                    max_tx,
1529                    "Failed to prune tx_digests table"
1530                ];
1531                Ok::<(), IndexerError>(())
1532            },
1533            PG_DB_COMMIT_SLEEP_DURATION
1534        )
1535    }
1536
1537    fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1538        transactional_blocking_with_retry!(
1539            &self.blocking_cp,
1540            |conn| {
1541                diesel::delete(
1542                    pruner_cp_watermark::table
1543                        .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1544                )
1545                .execute(conn)
1546                .map_err(IndexerError::from)
1547                .context("Failed to prune pruner_cp_watermark table")?;
1548                Ok::<(), IndexerError>(())
1549            },
1550            PG_DB_COMMIT_SLEEP_DURATION
1551        )
1552    }
1553
1554    fn get_network_total_transactions_by_end_of_epoch(
1555        &self,
1556        epoch: u64,
1557    ) -> Result<Option<u64>, IndexerError> {
1558        read_only_blocking!(&self.blocking_cp, |conn| {
1559            epochs::table
1560                .filter(epochs::epoch.eq(epoch as i64))
1561                .select(epochs::network_total_transactions)
1562                .get_result::<Option<i64>>(conn)
1563        })
1564        .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1565        .map(|option| option.map(|v| v as u64))
1566    }
1567
1568    fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1569        transactional_blocking_with_retry!(
1570            &self.blocking_cp,
1571            |conn| {
1572                diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1573                    .execute(conn)?;
1574                Ok::<(), IndexerError>(())
1575            },
1576            PG_DB_COMMIT_SLEEP_DURATION
1577        )
1578        .tap_ok(|_| {
1579            info!("Successfully refreshed participation_metrics");
1580        })
1581        .tap_err(|e| {
1582            tracing::error!("failed to refresh participation_metrics: {e}");
1583        })
1584    }
1585
1586    fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1587        &self,
1588        watermark: CommitterWatermark,
1589    ) -> Result<(), IndexerError>
1590    where
1591        E::Iterator: Iterator<Item: AsRef<str>>,
1592    {
1593        let guard = self
1594            .metrics
1595            .checkpoint_db_commit_latency_watermarks
1596            .start_timer();
1597
1598        let upper_bound_updates = E::iter()
1599            .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1600            .collect::<Vec<_>>();
1601
1602        transactional_blocking_with_retry!(
1603            &self.blocking_cp,
1604            |conn| {
1605                diesel::insert_into(watermarks::table)
1606                    .values(&upper_bound_updates)
1607                    .on_conflict(watermarks::entity)
1608                    .do_update()
1609                    .set((
1610                        watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)),
1611                        watermarks::checkpoint_hi_inclusive
1612                            .eq(excluded(watermarks::checkpoint_hi_inclusive)),
1613                        watermarks::tx_hi.eq(excluded(watermarks::tx_hi)),
1614                    ))
1615                    .execute(conn)
1616                    .map_err(IndexerError::from)
1617                    .context("Failed to update watermarks upper bound")?;
1618                Ok::<(), IndexerError>(())
1619            },
1620            PG_DB_COMMIT_SLEEP_DURATION
1621        )
1622        .tap_ok(|_| {
1623            let elapsed = guard.stop_and_record();
1624            info!(elapsed, "Persisted watermarks");
1625        })
1626        .tap_err(|e| {
1627            tracing::error!("Failed to persist watermarks with error: {}", e);
1628        })
1629    }
1630
1631    fn map_epochs_to_cp_tx(
1632        &self,
1633        epochs: &[u64],
1634    ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1635        let pool = &self.blocking_cp;
1636        let results: Vec<(i64, i64, i64)> = run_query!(pool, move |conn| {
1637            epochs::table
1638                .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1639                .select((
1640                    epochs::epoch,
1641                    epochs::first_checkpoint_id,
1642                    epochs::first_tx_sequence_number,
1643                ))
1644                .load::<(i64, i64, i64)>(conn)
1645        })
1646        .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1647
1648        Ok(results
1649            .into_iter()
1650            .map(|(epoch, checkpoint, tx)| (epoch as u64, (checkpoint as u64, tx as u64)))
1651            .collect())
1652    }
1653
1654    fn update_watermarks_lower_bound(
1655        &self,
1656        watermarks: Vec<(PrunableTable, u64)>,
1657    ) -> Result<(), IndexerError> {
1658        use diesel::query_dsl::methods::FilterDsl;
1659
1660        let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1661        let epoch_mapping = self.map_epochs_to_cp_tx(&epochs)?;
1662        let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1663            .into_iter()
1664            .map(|(table, epoch)| {
1665                let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1666                    IndexerError::PersistentStorageDataCorruption(format!(
1667                        "epoch {epoch} not found in epoch mapping",
1668                    ))
1669                })?;
1670                Ok(StoredWatermark::from_lower_bound_update(
1671                    table.as_ref(),
1672                    epoch,
1673                    table.select_reader_lo(*checkpoint, *tx),
1674                ))
1675            })
1676            .collect();
1677        let lower_bound_updates = lookups?;
1678        let guard = self
1679            .metrics
1680            .checkpoint_db_commit_latency_watermarks
1681            .start_timer();
1682        transactional_blocking_with_retry!(
1683            &self.blocking_cp,
1684            |conn| {
1685                diesel::insert_into(watermarks::table)
1686                    .values(&lower_bound_updates)
1687                    .on_conflict(watermarks::entity)
1688                    .do_update()
1689                    .set((
1690                        watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),
1691                        watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)),
1692                        watermarks::timestamp_ms.eq(sql::<diesel::sql_types::BigInt>(
1693                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1694                        )),
1695                    ))
1696                    .filter(excluded(watermarks::reader_lo).gt(watermarks::reader_lo))
1697                    .filter(excluded(watermarks::epoch_lo).gt(watermarks::epoch_lo))
1698                    .filter(
1699                        diesel::dsl::sql::<diesel::sql_types::BigInt>(
1700                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1701                        )
1702                        .gt(watermarks::timestamp_ms),
1703                    )
1704                    .execute(conn)
1705            },
1706            PG_DB_COMMIT_SLEEP_DURATION
1707        )
1708        .tap_ok(|_| {
1709            let elapsed = guard.stop_and_record();
1710            info!(elapsed, "Persisted watermarks");
1711        })
1712        .tap_err(|e| {
1713            tracing::error!("Failed to persist watermarks with error: {}", e);
1714        })?;
1715        Ok(())
1716    }
1717
1718    fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1719        // read_only transaction, otherwise this will block and get blocked by write
1720        // transactions to the same table.
1721        run_query_with_retry!(
1722            &self.blocking_cp,
1723            |conn| {
1724                let stored = watermarks::table
1725                    .load::<StoredWatermark>(conn)
1726                    .map_err(Into::into)
1727                    .context("Failed reading watermarks from PostgresDB")?;
1728                let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1729                    "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1730                ))
1731                .get_result(conn)
1732                .map_err(Into::into)
1733                .context("Failed reading current timestamp from PostgresDB")?;
1734                Ok::<_, IndexerError>((stored, timestamp))
1735            },
1736            PG_DB_COMMIT_SLEEP_DURATION
1737        )
1738    }
1739
1740    async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1741    where
1742        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1743        R: Send + 'static,
1744    {
1745        let this = self.clone();
1746        let current_span = tracing::Span::current();
1747        tokio::task::spawn_blocking(move || {
1748            mark_in_blocking_pool();
1749            let _guard = current_span.enter();
1750            f(this)
1751        })
1752        .await
1753        .map_err(Into::into)
1754        .and_then(std::convert::identity)
1755    }
1756
1757    fn spawn_blocking_task<F, R>(
1758        &self,
1759        f: F,
1760    ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1761    where
1762        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1763        R: Send + 'static,
1764    {
1765        let this = self.clone();
1766        let current_span = tracing::Span::current();
1767        let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1768        tokio::task::spawn_blocking(move || {
1769            mark_in_blocking_pool();
1770            let _guard = current_span.enter();
1771            let _elapsed = guard.stop_and_record();
1772            f(this)
1773        })
1774    }
1775
1776    fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1777    where
1778        F: FnOnce(Self) -> Fut + Send + 'static,
1779        Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1780        R: Send + 'static,
1781    {
1782        let this = self.clone();
1783        tokio::task::spawn(async move { f(this).await })
1784    }
1785}
1786
1787#[async_trait]
1788impl IndexerStore for PgIndexerStore {
1789    async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1790        self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1791            .await
1792    }
1793
1794    async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1795        self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1796            .await
1797    }
1798
1799    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1800        self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1801            .await
1802    }
1803
1804    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1805        self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1806            .await
1807    }
1808
1809    async fn get_latest_object_snapshot_watermark(
1810        &self,
1811    ) -> Result<Option<CommitterWatermark>, IndexerError> {
1812        self.execute_in_blocking_worker(|this| this.get_latest_object_snapshot_watermark())
1813            .await
1814    }
1815
1816    async fn get_latest_object_snapshot_checkpoint_sequence_number(
1817        &self,
1818    ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
1819        self.execute_in_blocking_worker(|this| {
1820            this.get_latest_object_snapshot_checkpoint_sequence_number()
1821        })
1822        .await
1823    }
1824
1825    fn persist_objects_in_existing_transaction(
1826        &self,
1827        conn: &mut PgConnection,
1828        object_changes: Vec<TransactionObjectChangesToCommit>,
1829    ) -> Result<(), IndexerError> {
1830        if object_changes.is_empty() {
1831            return Ok(());
1832        }
1833
1834        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1835        let object_mutations = indexed_mutations
1836            .into_iter()
1837            .map(StoredObject::from)
1838            .collect::<Vec<_>>();
1839        let object_deletions = indexed_deletions
1840            .into_iter()
1841            .map(StoredDeletedObject::from)
1842            .collect::<Vec<_>>();
1843
1844        self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1845        self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1846
1847        Ok(())
1848    }
1849
1850    async fn persist_objects_snapshot(
1851        &self,
1852        object_changes: Vec<TransactionObjectChangesToCommit>,
1853    ) -> Result<(), IndexerError> {
1854        if object_changes.is_empty() {
1855            return Ok(());
1856        }
1857        let guard = self
1858            .metrics
1859            .checkpoint_db_commit_latency_objects_snapshot
1860            .start_timer();
1861        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1862        let objects_snapshot = indexed_mutations
1863            .into_iter()
1864            .map(StoredObjectSnapshot::from)
1865            .chain(
1866                indexed_deletions
1867                    .into_iter()
1868                    .map(StoredObjectSnapshot::from),
1869            )
1870            .collect::<Vec<_>>();
1871        let len = objects_snapshot.len();
1872        let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1873        let futures = chunks
1874            .into_iter()
1875            .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1876
1877        futures::future::try_join_all(futures)
1878            .await
1879            .map_err(|e| {
1880                tracing::error!("failed to join backfill_objects_snapshot_chunk futures: {e}");
1881                IndexerError::from(e)
1882            })?
1883            .into_iter()
1884            .collect::<Result<Vec<_>, _>>()
1885            .map_err(|e| {
1886                IndexerError::PostgresWrite(format!(
1887                    "Failed to persist all objects snapshot chunks: {e:?}"
1888                ))
1889            })?;
1890        let elapsed = guard.stop_and_record();
1891        info!(elapsed, "Persisted {} objects snapshot", len);
1892        Ok(())
1893    }
1894
1895    async fn persist_object_history(
1896        &self,
1897        object_changes: Vec<TransactionObjectChangesToCommit>,
1898    ) -> Result<(), IndexerError> {
1899        let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1900            .map(|val| val.eq_ignore_ascii_case("true"))
1901            .unwrap_or(false);
1902        if skip_history {
1903            info!("skipping object history");
1904            return Ok(());
1905        }
1906
1907        if object_changes.is_empty() {
1908            return Ok(());
1909        }
1910        let objects = make_objects_history_to_commit(object_changes);
1911        let guard = self
1912            .metrics
1913            .checkpoint_db_commit_latency_objects_history
1914            .start_timer();
1915
1916        let len = objects.len();
1917        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1918        let futures = chunks
1919            .into_iter()
1920            .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1921
1922        futures::future::try_join_all(futures)
1923            .await
1924            .map_err(|e| {
1925                tracing::error!("failed to join persist_objects_history_chunk futures: {e}");
1926                IndexerError::from(e)
1927            })?
1928            .into_iter()
1929            .collect::<Result<Vec<_>, _>>()
1930            .map_err(|e| {
1931                IndexerError::PostgresWrite(format!(
1932                    "Failed to persist all objects history chunks: {e:?}"
1933                ))
1934            })?;
1935        let elapsed = guard.stop_and_record();
1936        info!(elapsed, "Persisted {} objects history", len);
1937        Ok(())
1938    }
1939
1940    async fn persist_object_versions(
1941        &self,
1942        object_versions: Vec<StoredObjectVersion>,
1943    ) -> Result<(), IndexerError> {
1944        if object_versions.is_empty() {
1945            return Ok(());
1946        }
1947
1948        let guard = self
1949            .metrics
1950            .checkpoint_db_commit_latency_objects_version
1951            .start_timer();
1952
1953        let object_versions_count = object_versions.len();
1954
1955        let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1956        let futures = chunks
1957            .into_iter()
1958            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1959            .collect::<Vec<_>>();
1960
1961        futures::future::try_join_all(futures)
1962            .await
1963            .map_err(|e| {
1964                tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1965                IndexerError::from(e)
1966            })?
1967            .into_iter()
1968            .collect::<Result<Vec<_>, _>>()
1969            .map_err(|e| {
1970                IndexerError::PostgresWrite(format!(
1971                    "Failed to persist all objects version chunks: {e:?}"
1972                ))
1973            })?;
1974        let elapsed = guard.stop_and_record();
1975        info!(elapsed, "Persisted {object_versions_count} object versions");
1976        Ok(())
1977    }
1978
1979    async fn persist_checkpoints(
1980        &self,
1981        checkpoints: Vec<IndexedCheckpoint>,
1982    ) -> Result<(), IndexerError> {
1983        self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1984            .await
1985    }
1986
1987    async fn persist_transactions(
1988        &self,
1989        transactions: Vec<IndexedTransaction>,
1990    ) -> Result<(), IndexerError> {
1991        let guard = self
1992            .metrics
1993            .checkpoint_db_commit_latency_transactions
1994            .start_timer();
1995        let len = transactions.len();
1996
1997        let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1998        let futures = chunks
1999            .into_iter()
2000            .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
2001
2002        futures::future::try_join_all(futures)
2003            .await
2004            .map_err(|e| {
2005                tracing::error!("failed to join persist_transactions_chunk futures: {e}");
2006                IndexerError::from(e)
2007            })?
2008            .into_iter()
2009            .collect::<Result<Vec<_>, _>>()
2010            .map_err(|e| {
2011                IndexerError::PostgresWrite(format!(
2012                    "Failed to persist all transactions chunks: {e:?}"
2013                ))
2014            })?;
2015        let elapsed = guard.stop_and_record();
2016        info!(elapsed, "Persisted {} transactions", len);
2017        Ok(())
2018    }
2019
2020    fn persist_optimistic_transaction_in_existing_transaction(
2021        &self,
2022        conn: &mut PgConnection,
2023        transaction: OptimisticTransaction,
2024    ) -> Result<(), IndexerError> {
2025        insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
2026        Ok(())
2027    }
2028
2029    async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
2030        if events.is_empty() {
2031            return Ok(());
2032        }
2033        let len = events.len();
2034        let guard = self
2035            .metrics
2036            .checkpoint_db_commit_latency_events
2037            .start_timer();
2038        let chunks = chunk!(events, self.config.parallel_chunk_size);
2039        let futures = chunks
2040            .into_iter()
2041            .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
2042
2043        futures::future::try_join_all(futures)
2044            .await
2045            .map_err(|e| {
2046                tracing::error!("failed to join persist_events_chunk futures: {e}");
2047                IndexerError::from(e)
2048            })?
2049            .into_iter()
2050            .collect::<Result<Vec<_>, _>>()
2051            .map_err(|e| {
2052                IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
2053            })?;
2054        let elapsed = guard.stop_and_record();
2055        info!(elapsed, "Persisted {} events", len);
2056        Ok(())
2057    }
2058
2059    async fn persist_displays(
2060        &self,
2061        display_updates: BTreeMap<String, StoredDisplay>,
2062    ) -> Result<(), IndexerError> {
2063        if display_updates.is_empty() {
2064            return Ok(());
2065        }
2066
2067        self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
2068            .await?
2069    }
2070
2071    fn persist_displays_in_existing_transaction(
2072        &self,
2073        conn: &mut PgConnection,
2074        display_updates: Vec<&StoredDisplay>,
2075    ) -> Result<(), IndexerError> {
2076        if display_updates.is_empty() {
2077            return Ok(());
2078        }
2079
2080        on_conflict_do_update_with_condition!(
2081            display::table,
2082            display_updates,
2083            display::object_type,
2084            (
2085                display::id.eq(excluded(display::id)),
2086                display::version.eq(excluded(display::version)),
2087                display::bcs.eq(excluded(display::bcs)),
2088            ),
2089            excluded(display::version).gt(display::version),
2090            conn
2091        );
2092
2093        Ok(())
2094    }
2095
2096    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2097        if packages.is_empty() {
2098            return Ok(());
2099        }
2100        self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
2101            .await
2102    }
2103
2104    async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2105        if indices.is_empty() {
2106            return Ok(());
2107        }
2108        let len = indices.len();
2109        let guard = self
2110            .metrics
2111            .checkpoint_db_commit_latency_event_indices
2112            .start_timer();
2113        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2114
2115        let futures = chunks.into_iter().map(|chunk| {
2116            self.spawn_task(move |this: Self| async move {
2117                this.persist_event_indices_chunk(chunk).await
2118            })
2119        });
2120
2121        futures::future::try_join_all(futures)
2122            .await
2123            .map_err(|e| {
2124                tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
2125                IndexerError::from(e)
2126            })?
2127            .into_iter()
2128            .collect::<Result<Vec<_>, _>>()
2129            .map_err(|e| {
2130                IndexerError::PostgresWrite(format!(
2131                    "Failed to persist all event_indices chunks: {e:?}"
2132                ))
2133            })?;
2134        let elapsed = guard.stop_and_record();
2135        info!(elapsed, "Persisted {} event_indices chunks", len);
2136        Ok(())
2137    }
2138
2139    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2140        self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2141            .await
2142    }
2143
2144    async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2145        self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2146            .await
2147    }
2148
2149    async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2150        let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2151            (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2152            _ => Err(IndexerError::PostgresRead(format!(
2153                "Failed to get checkpoint range for epoch {epoch}"
2154            ))),
2155        }?;
2156
2157        // NOTE: for disaster recovery, min_cp is the min cp of the current epoch, which
2158        // is likely partially pruned already. min_prunable_cp is the min cp to
2159        // be pruned. By std::cmp::max, we will resume the pruning process from
2160        // the next checkpoint, instead of the first cp of the current epoch.
2161        let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2162        min_cp = std::cmp::max(min_cp, min_prunable_cp);
2163        for cp in min_cp..=max_cp {
2164            // NOTE: the order of pruning tables is crucial:
2165            // 1. prune checkpoints table, checkpoints table is the source table of
2166            //    available range,
2167            // we prune it first to make sure that we always have full data for checkpoints
2168            // within the available range;
2169            // 2. then prune tx_* tables;
2170            // 3. then prune pruner_cp_watermark table, which is the checkpoint pruning
2171            //    watermark table and also tx seq source
2172            // of a checkpoint to prune tx_* tables;
2173            // 4. lastly we prune epochs table when all checkpoints of the epoch have been
2174            //    pruned.
2175            info!(
2176                "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2177                cp, epoch, min_prunable_cp
2178            );
2179            self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2180                .await
2181                .unwrap_or_else(|e| {
2182                    tracing::error!("failed to prune checkpoint {cp}: {e}");
2183                });
2184
2185            let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2186            self.execute_in_blocking_worker(move |this| {
2187                this.prune_tx_indices_table(min_tx, max_tx)
2188            })
2189            .await
2190            .unwrap_or_else(|e| {
2191                tracing::error!("failed to prune transactions for cp {cp}: {e}");
2192            });
2193            info!(
2194                "Pruned transactions for checkpoint {} from tx {} to tx {}",
2195                cp, min_tx, max_tx
2196            );
2197            self.execute_in_blocking_worker(move |this| {
2198                this.prune_event_indices_table(min_tx, max_tx)
2199            })
2200            .await
2201            .unwrap_or_else(|e| {
2202                tracing::error!("failed to prune events of transactions for cp {cp}: {e}");
2203            });
2204            info!(
2205                "Pruned events of transactions for checkpoint {cp} from tx {min_tx} to tx {max_tx}"
2206            );
2207            self.metrics.last_pruned_transaction.set(max_tx as i64);
2208
2209            self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2210                .await
2211                .unwrap_or_else(|e| {
2212                    tracing::error!("failed to prune pruner_cp_watermark table for cp {cp}: {e}");
2213                });
2214            info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2215            self.metrics.last_pruned_checkpoint.set(cp as i64);
2216        }
2217
2218        Ok(())
2219    }
2220
2221    async fn get_network_total_transactions_by_end_of_epoch(
2222        &self,
2223        epoch: u64,
2224    ) -> Result<Option<u64>, IndexerError> {
2225        self.execute_in_blocking_worker(move |this| {
2226            this.get_network_total_transactions_by_end_of_epoch(epoch)
2227        })
2228        .await
2229    }
2230
2231    async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2232        self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2233            .await
2234    }
2235
2236    async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2237        &self,
2238        watermark: CommitterWatermark,
2239    ) -> Result<(), IndexerError>
2240    where
2241        E::Iterator: Iterator<Item: AsRef<str>>,
2242    {
2243        self.execute_in_blocking_worker(move |this| {
2244            this.update_watermarks_upper_bound::<E>(watermark)
2245        })
2246        .await
2247    }
2248
2249    fn as_any(&self) -> &dyn StdAny {
2250        self
2251    }
2252
2253    /// Persist protocol configs and feature flags until the protocol version
2254    /// for the latest epoch we have stored in the db, inclusive.
2255    fn persist_protocol_configs_and_feature_flags(
2256        &self,
2257        chain_id: Vec<u8>,
2258    ) -> Result<(), IndexerError> {
2259        let chain_id = ChainIdentifier::from(
2260            CheckpointDigest::try_from(chain_id).expect("unable to convert chain id"),
2261        );
2262
2263        let mut all_configs = vec![];
2264        let mut all_flags = vec![];
2265
2266        let (start_version, end_version) = self.get_protocol_version_index_range()?;
2267        info!(
2268            "Persisting protocol configs with start_version: {}, end_version: {}",
2269            start_version, end_version
2270        );
2271
2272        // Gather all protocol configs and feature flags for all versions between start
2273        // and end.
2274        for version in start_version..=end_version {
2275            let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2276                (version as u64).into(),
2277                chain_id.chain(),
2278            )
2279            .ok_or(IndexerError::Generic(format!(
2280                "Unable to fetch protocol version {} and chain {:?}",
2281                version,
2282                chain_id.chain()
2283            )))?;
2284            let configs_vec = protocol_configs
2285                .attr_map()
2286                .into_iter()
2287                .map(|(k, v)| StoredProtocolConfig {
2288                    protocol_version: version,
2289                    config_name: k,
2290                    config_value: v.map(|v| v.to_string()),
2291                })
2292                .collect::<Vec<_>>();
2293            all_configs.extend(configs_vec);
2294
2295            let feature_flags = protocol_configs
2296                .feature_map()
2297                .into_iter()
2298                .map(|(k, v)| StoredFeatureFlag {
2299                    protocol_version: version,
2300                    flag_name: k,
2301                    flag_value: v,
2302                })
2303                .collect::<Vec<_>>();
2304            all_flags.extend(feature_flags);
2305        }
2306
2307        transactional_blocking_with_retry!(
2308            &self.blocking_cp,
2309            |conn| {
2310                for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2311                    insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2312                }
2313                insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2314                Ok::<(), IndexerError>(())
2315            },
2316            PG_DB_COMMIT_SLEEP_DURATION
2317        )?;
2318        Ok(())
2319    }
2320
2321    async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2322        if indices.is_empty() {
2323            return Ok(());
2324        }
2325        let len = indices.len();
2326        let guard = self
2327            .metrics
2328            .checkpoint_db_commit_latency_tx_indices
2329            .start_timer();
2330        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2331
2332        let futures = chunks.into_iter().map(|chunk| {
2333            self.spawn_task(move |this: Self| async move {
2334                this.persist_tx_indices_chunk_v2(chunk).await
2335            })
2336        });
2337        futures::future::try_join_all(futures)
2338            .await
2339            .map_err(|e| {
2340                tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2341                IndexerError::from(e)
2342            })?
2343            .into_iter()
2344            .collect::<Result<Vec<_>, _>>()
2345            .map_err(|e| {
2346                IndexerError::PostgresWrite(format!(
2347                    "Failed to persist all tx_indices chunks: {e:?}"
2348                ))
2349            })?;
2350        let elapsed = guard.stop_and_record();
2351        info!(elapsed, "Persisted {} tx_indices chunks", len);
2352        Ok(())
2353    }
2354
2355    async fn persist_checkpoint_objects(
2356        &self,
2357        objects: Vec<CheckpointObjectChanges>,
2358    ) -> Result<(), IndexerError> {
2359        if objects.is_empty() {
2360            return Ok(());
2361        }
2362        let guard = self
2363            .metrics
2364            .checkpoint_db_commit_latency_objects
2365            .start_timer();
2366        let CheckpointObjectChanges {
2367            changed_objects: mutations,
2368            deleted_objects: deletions,
2369        } = retain_latest_objects_from_checkpoint_batch(objects);
2370        let mutation_len = mutations.len();
2371        let deletion_len = deletions.len();
2372
2373        let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2374        let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2375        let mutation_futures = mutation_chunks
2376            .into_iter()
2377            .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2378        let deletion_futures = deletion_chunks
2379            .into_iter()
2380            .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2381        futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2382            .await
2383            .map_err(|e| {
2384                tracing::error!("failed to join futures for persisting objects: {e}");
2385                IndexerError::from(e)
2386            })?
2387            .into_iter()
2388            .collect::<Result<Vec<_>, _>>()
2389            .map_err(|e| {
2390                IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2391            })?;
2392
2393        let elapsed = guard.stop_and_record();
2394        info!(
2395            elapsed,
2396            "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2397        );
2398        Ok(())
2399    }
2400
2401    async fn update_status_for_checkpoint_transactions(
2402        &self,
2403        tx_order: Vec<CheckpointTxGlobalOrder>,
2404    ) -> Result<(), IndexerError> {
2405        let guard = self
2406            .metrics
2407            .checkpoint_db_commit_latency_tx_insertion_order
2408            .start_timer();
2409        let len = tx_order.len();
2410
2411        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2412        let futures = chunks.into_iter().map(|c| {
2413            self.spawn_blocking_task(move |this| {
2414                this.update_status_for_checkpoint_transactions_chunk(c)
2415            })
2416        });
2417
2418        futures::future::try_join_all(futures)
2419            .await
2420            .map_err(|e| {
2421                tracing::error!(
2422                    "failed to join update_status_for_checkpoint_transactions_chunk futures: {e}",
2423                );
2424                IndexerError::from(e)
2425            })?
2426            .into_iter()
2427            .collect::<Result<Vec<_>, _>>()
2428            .map_err(|e| {
2429                IndexerError::PostgresWrite(format!(
2430                    "Failed to update all `tx_global_order` chunks: {e:?}",
2431                ))
2432            })?;
2433        let elapsed = guard.stop_and_record();
2434        info!(
2435            elapsed,
2436            "Updated index status for {len} txs insertion orders"
2437        );
2438        Ok(())
2439    }
2440
2441    async fn persist_tx_global_order(
2442        &self,
2443        tx_order: Vec<CheckpointTxGlobalOrder>,
2444    ) -> Result<(), IndexerError> {
2445        let guard = self
2446            .metrics
2447            .checkpoint_db_commit_latency_tx_insertion_order
2448            .start_timer();
2449        let len = tx_order.len();
2450
2451        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2452        let futures = chunks
2453            .into_iter()
2454            .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2455
2456        futures::future::try_join_all(futures)
2457            .await
2458            .map_err(|e| {
2459                tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2460                IndexerError::from(e)
2461            })?
2462            .into_iter()
2463            .collect::<Result<Vec<_>, _>>()
2464            .map_err(|e| {
2465                IndexerError::PostgresWrite(format!(
2466                    "Failed to persist all txs insertion order chunks: {e:?}",
2467                ))
2468            })?;
2469        let elapsed = guard.stop_and_record();
2470        info!(elapsed, "Persisted {len} txs insertion orders");
2471        Ok(())
2472    }
2473
2474    async fn update_watermarks_lower_bound(
2475        &self,
2476        watermarks: Vec<(PrunableTable, u64)>,
2477    ) -> Result<(), IndexerError> {
2478        self.execute_in_blocking_worker(move |this| this.update_watermarks_lower_bound(watermarks))
2479            .await
2480    }
2481
2482    async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2483        self.execute_in_blocking_worker(move |this| this.get_watermarks())
2484            .await
2485    }
2486}
2487
2488fn make_objects_history_to_commit(
2489    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2490) -> Vec<StoredHistoryObject> {
2491    let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2492        .clone()
2493        .into_iter()
2494        .flat_map(|changes| changes.deleted_objects)
2495        .map(|o| o.into())
2496        .collect();
2497    let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2498        .into_iter()
2499        .flat_map(|changes| changes.changed_objects)
2500        .map(|o| o.into())
2501        .collect();
2502    deleted_objects.into_iter().chain(mutated_objects).collect()
2503}
2504
2505/// Partitions object changes into deletions and mutations.
2506///
2507/// Retains only the highest version of each object among deletions and
2508/// mutations. This allows concurrent insertion into the DB of the resulting
2509/// partitions.
2510fn retain_latest_indexed_objects(
2511    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2512) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2513    use std::collections::HashMap;
2514
2515    let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2516    let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2517
2518    for change in tx_object_changes {
2519        // Remove mutation / deletion with a following deletion / mutation,
2520        // as we expect that following deletion / mutation has a higher version.
2521        // Technically, assertions below are not required, double check just in case.
2522        for mutation in change.changed_objects {
2523            let id = mutation.object.id();
2524            let version = mutation.object.version();
2525
2526            if let Some(existing) = deletions.remove(&id) {
2527                assert!(
2528                    existing.object_version < version.value(),
2529                    "mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2530                    existing.object_version
2531                );
2532            }
2533
2534            if let Some(existing) = mutations.insert(id, mutation) {
2535                assert!(
2536                    existing.object.version() < version,
2537                    "mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2538                    existing.object.version()
2539                );
2540            }
2541        }
2542        // Handle deleted objects
2543        for deletion in change.deleted_objects {
2544            let id = deletion.object_id;
2545            let version = deletion.object_version;
2546
2547            if let Some(existing) = mutations.remove(&id) {
2548                assert!(
2549                    existing.object.version().value() < version,
2550                    "deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2551                    existing.object.version(),
2552                );
2553            }
2554
2555            if let Some(existing) = deletions.insert(id, deletion) {
2556                assert!(
2557                    existing.object_version < version,
2558                    "deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2559                    existing.object_version
2560                );
2561            }
2562        }
2563    }
2564
2565    (
2566        mutations.into_values().collect(),
2567        deletions.into_values().collect(),
2568    )
2569}