iota_indexer/store/
pg_indexer_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use core::result::Result::Ok;
6use std::{
7    any::Any as StdAny,
8    collections::{BTreeMap, HashMap},
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use diesel::{
14    ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
15    dsl::{max, min, sql},
16    sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
17    upsert::excluded,
18};
19use downcast::Any;
20use iota_protocol_config::ProtocolConfig;
21use iota_types::{
22    base_types::ObjectID,
23    digests::{ChainIdentifier, CheckpointDigest},
24    messages_checkpoint::CheckpointSequenceNumber,
25};
26use itertools::Itertools;
27use strum::IntoEnumIterator;
28use tap::TapFallible;
29use tracing::info;
30
31use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
32use crate::{
33    blocking_call_is_ok_or_panic,
34    db::ConnectionPool,
35    errors::{Context, IndexerError},
36    ingestion::{
37        common::{
38            persist::{CommitterWatermark, ObjectsSnapshotHandlerTables},
39            prepare::{
40                CheckpointObjectChanges, LiveObject, RemovedObject,
41                retain_latest_objects_from_checkpoint_batch,
42            },
43        },
44        primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
45    },
46    insert_or_ignore_into,
47    metrics::IndexerMetrics,
48    models::{
49        checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
50        display::StoredDisplay,
51        epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
52        events::StoredEvent,
53        obj_indices::StoredObjectVersion,
54        objects::{
55            StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
56            StoredObjects,
57        },
58        packages::StoredPackage,
59        transactions::{OptimisticTransaction, StoredTransaction, TxGlobalOrder},
60        tx_indices::TxIndexSplit,
61        watermarks::StoredWatermark,
62    },
63    on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
64    persist_chunk_into_table_in_existing_connection,
65    pruning::pruner::PrunableTable,
66    read_only_blocking, run_query, run_query_with_retry,
67    schema::{
68        chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
69        event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
70        event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
71        objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
72        transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
73        tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
74        tx_wrapped_or_deleted_objects, watermarks,
75    },
76    store::{IndexerStore, diesel_macro::mark_in_blocking_pool},
77    transactional_blocking_with_retry,
78    types::{
79        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
80        IndexedPackage, IndexedTransaction, TxIndex,
81    },
82};
83
84/// A cursor representing the global order position of transaction according to
85/// tx_global_order table
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct TxGlobalOrderCursor {
88    pub global_sequence_number: i64,
89    pub optimistic_sequence_number: i64,
90}
91
92#[macro_export]
93macro_rules! chunk {
94    ($data: expr, $size: expr) => {{
95        $data
96            .into_iter()
97            .chunks($size)
98            .into_iter()
99            .map(|c| c.collect())
100            .collect::<Vec<Vec<_>>>()
101    }};
102}
103
104macro_rules! prune_tx_or_event_indice_table {
105    ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
106        diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
107            .execute($conn)
108            .map_err(IndexerError::from)
109            .context($context_msg)?;
110    };
111}
112
113// In one DB transaction, the update could be chunked into
114// a few statements, this is the amount of rows to update in one statement
115// TODO: I think with the `per_db_tx` params, `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX`
116// is now less relevant. We should do experiments and remove it if it's true.
117const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
118// The amount of rows to update in one DB transaction
119const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
120// The amount of rows to update in one DB transaction, for objects particularly
121// Having this number too high may cause many db deadlocks because of
122// optimistic locking.
123const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
124const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
125
126#[derive(Clone)]
127pub struct PgIndexerStoreConfig {
128    pub parallel_chunk_size: usize,
129    pub parallel_objects_chunk_size: usize,
130}
131
132pub struct PgIndexerStore {
133    blocking_cp: ConnectionPool,
134    metrics: IndexerMetrics,
135    partition_manager: PgPartitionManager,
136    config: PgIndexerStoreConfig,
137}
138
139impl Clone for PgIndexerStore {
140    fn clone(&self) -> PgIndexerStore {
141        Self {
142            blocking_cp: self.blocking_cp.clone(),
143            metrics: self.metrics.clone(),
144            partition_manager: self.partition_manager.clone(),
145            config: self.config.clone(),
146        }
147    }
148}
149
150impl PgIndexerStore {
151    pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
152        let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
153            .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
154            .parse::<usize>()
155            .unwrap();
156        let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
157            .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
158            .parse::<usize>()
159            .unwrap();
160        let partition_manager = PgPartitionManager::new(blocking_cp.clone())
161            .expect("failed to initialize partition manager");
162        let config = PgIndexerStoreConfig {
163            parallel_chunk_size,
164            parallel_objects_chunk_size,
165        };
166
167        Self {
168            blocking_cp,
169            metrics,
170            partition_manager,
171            config,
172        }
173    }
174
175    pub fn get_metrics(&self) -> IndexerMetrics {
176        self.metrics.clone()
177    }
178
179    pub fn blocking_cp(&self) -> ConnectionPool {
180        self.blocking_cp.clone()
181    }
182
183    /// Get the range of the protocol versions that need to be indexed.
184    pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
185        // We start indexing from the next protocol version after the latest one stored
186        // in the db.
187        let start = read_only_blocking!(&self.blocking_cp, |conn| {
188            protocol_configs::dsl::protocol_configs
189                .select(max(protocol_configs::protocol_version))
190                .first::<Option<i64>>(conn)
191        })
192        .context("Failed reading latest protocol version from PostgresDB")?
193        .map_or(1, |v| v + 1);
194
195        // We end indexing at the protocol version of the latest epoch stored in the db.
196        let end = read_only_blocking!(&self.blocking_cp, |conn| {
197            epochs::dsl::epochs
198                .select(max(epochs::protocol_version))
199                .first::<Option<i64>>(conn)
200        })
201        .context("Failed reading latest epoch protocol version from PostgresDB")?
202        .unwrap_or(1);
203        Ok((start, end))
204    }
205
206    pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
207        read_only_blocking!(&self.blocking_cp, |conn| {
208            chain_identifier::dsl::chain_identifier
209                .select(chain_identifier::checkpoint_digest)
210                .first::<Vec<u8>>(conn)
211                .optional()
212        })
213        .context("Failed reading chain id from PostgresDB")
214    }
215
216    fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
217        read_only_blocking!(&self.blocking_cp, |conn| {
218            checkpoints::dsl::checkpoints
219                .select(max(checkpoints::sequence_number))
220                .first::<Option<i64>>(conn)
221                .map(|v| v.map(|v| v as u64))
222        })
223        .context("Failed reading latest checkpoint sequence number from PostgresDB")
224    }
225
226    fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
227        read_only_blocking!(&self.blocking_cp, |conn| {
228            checkpoints::dsl::checkpoints
229                .select((
230                    min(checkpoints::sequence_number),
231                    max(checkpoints::sequence_number),
232                ))
233                .first::<(Option<i64>, Option<i64>)>(conn)
234                .map(|(min, max)| {
235                    (
236                        min.unwrap_or_default() as u64,
237                        max.unwrap_or_default() as u64,
238                    )
239                })
240        })
241        .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
242    }
243
244    fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
245        read_only_blocking!(&self.blocking_cp, |conn| {
246            epochs::dsl::epochs
247                .select((min(epochs::epoch), max(epochs::epoch)))
248                .first::<(Option<i64>, Option<i64>)>(conn)
249                .map(|(min, max)| {
250                    (
251                        min.unwrap_or_default() as u64,
252                        max.unwrap_or_default() as u64,
253                    )
254                })
255        })
256        .context("Failed reading min and max epoch numbers from PostgresDB")
257    }
258
259    fn get_latest_object_snapshot_watermark(
260        &self,
261    ) -> Result<Option<CommitterWatermark>, IndexerError> {
262        read_only_blocking!(&self.blocking_cp, |conn| {
263            watermarks::table
264                .select((
265                    watermarks::current_epoch,
266                    watermarks::max_committed_cp,
267                    watermarks::max_committed_tx,
268                ))
269                .filter(
270                    watermarks::entity
271                        .eq(ObjectsSnapshotHandlerTables::ObjectsSnapshot.to_string()),
272                )
273                .first::<(i64, i64, i64)>(conn)
274                // Handle case where the watermark is not set yet
275                .optional()
276                .map(|v| {
277                    v.map(|(epoch, cp, tx)| CommitterWatermark {
278                        current_epoch: epoch as u64,
279                        max_committed_cp: cp as u64,
280                        max_committed_tx: tx as u64,
281                    })
282                })
283        })
284        .context("Failed reading latest object snapshot watermark from PostgresDB")
285    }
286
287    fn get_latest_object_snapshot_checkpoint_sequence_number(
288        &self,
289    ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
290        read_only_blocking!(&self.blocking_cp, |conn| {
291            objects_snapshot::table
292                .select(max(objects_snapshot::checkpoint_sequence_number))
293                .first::<Option<i64>>(conn)
294                .map(|v| v.map(|v| v as CheckpointSequenceNumber))
295        })
296        .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
297    }
298
299    fn persist_display_updates(
300        &self,
301        display_updates: BTreeMap<String, StoredDisplay>,
302    ) -> Result<(), IndexerError> {
303        transactional_blocking_with_retry!(
304            &self.blocking_cp,
305            {
306                let value = display_updates.values().collect::<Vec<_>>();
307                |conn| self.persist_displays_in_existing_transaction(conn, value)
308            },
309            PG_DB_COMMIT_SLEEP_DURATION
310        )?;
311
312        Ok(())
313    }
314
315    fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
316        let guard = self
317            .metrics
318            .checkpoint_db_commit_latency_objects_chunks
319            .start_timer();
320        let len = objects.len();
321        let raw_query = r#"
322            INSERT INTO objects (
323                object_id,
324                object_version,
325                object_digest,
326                owner_type,
327                owner_id,
328                object_type,
329                object_type_package,
330                object_type_module,
331                object_type_name,
332                serialized_object,
333                coin_type,
334                coin_balance,
335                df_kind,
336                finalized_in_cp
337            )
338            SELECT
339                u.object_id,
340                u.object_version,
341                u.object_digest,
342                u.owner_type,
343                u.owner_id,
344                u.object_type,
345                u.object_type_package,
346                u.object_type_module,
347                u.object_type_name,
348                u.serialized_object,
349                u.coin_type,
350                u.coin_balance,
351                u.df_kind,
352                u.finalized_in_cp
353            FROM UNNEST(
354                $1::BYTEA[],
355                $2::BIGINT[],
356                $3::BYTEA[],
357                $4::SMALLINT[],
358                $5::BYTEA[],
359                $6::TEXT[],
360                $7::BYTEA[],
361                $8::TEXT[],
362                $9::TEXT[],
363                $10::BYTEA[],
364                $11::TEXT[],
365                $12::BIGINT[],
366                $13::SMALLINT[],
367                $14::BYTEA[],
368                $15::BIGINT[]
369            ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest, finalized_in_cp)
370            LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
371            WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = -1
372            ON CONFLICT (object_id) DO UPDATE
373            SET
374                object_version = EXCLUDED.object_version,
375                object_digest = EXCLUDED.object_digest,
376                owner_type = EXCLUDED.owner_type,
377                owner_id = EXCLUDED.owner_id,
378                object_type = EXCLUDED.object_type,
379                object_type_package = EXCLUDED.object_type_package,
380                object_type_module = EXCLUDED.object_type_module,
381                object_type_name = EXCLUDED.object_type_name,
382                serialized_object = EXCLUDED.serialized_object,
383                coin_type = EXCLUDED.coin_type,
384                coin_balance = EXCLUDED.coin_balance,
385                df_kind = EXCLUDED.df_kind,
386                finalized_in_cp = EXCLUDED.finalized_in_cp
387            WHERE EXCLUDED.object_version > objects.object_version
388        "#;
389        let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
390            .into_iter()
391            .map(LiveObject::split)
392            .map(|(indexed_object, tx_digest)| {
393                (
394                    StoredObject::from(indexed_object),
395                    tx_digest.into_inner().to_vec(),
396                )
397            })
398            .unzip();
399        let query = diesel::sql_query(raw_query)
400            .bind::<Array<Bytea>, _>(objects.object_ids)
401            .bind::<Array<BigInt>, _>(objects.object_versions)
402            .bind::<Array<Bytea>, _>(objects.object_digests)
403            .bind::<Array<SmallInt>, _>(objects.owner_types)
404            .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
405            .bind::<Array<Nullable<Text>>, _>(objects.object_types)
406            .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
407            .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
408            .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
409            .bind::<Array<Bytea>, _>(objects.serialized_objects)
410            .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
411            .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
412            .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
413            .bind::<Array<Bytea>, _>(tx_digests)
414            .bind::<Array<Nullable<BigInt>>, _>(objects.finalized_in_cps);
415        transactional_blocking_with_retry!(
416            &self.blocking_cp,
417            |conn| {
418                query.clone().execute(conn)?;
419                Ok::<(), IndexerError>(())
420            },
421            PG_DB_COMMIT_SLEEP_DURATION
422        )
423        .tap_ok(|_| {
424            let elapsed = guard.stop_and_record();
425            info!(elapsed, "Persisted {len} chunked objects");
426        })
427        .tap_err(|e| {
428            tracing::error!("failed to persist object mutations with error: {e}");
429        })
430    }
431
432    fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
433        let guard = self
434            .metrics
435            .checkpoint_db_commit_latency_objects_chunks
436            .start_timer();
437        let len = objects.len();
438        let raw_query = r#"
439            DELETE FROM objects
440            USING (
441                SELECT u.object_id, u.object_version
442                FROM UNNEST(
443                    $1::BYTEA[],
444                    $2::BIGINT[],
445                    $3::BYTEA[]
446                ) AS u(object_id, object_version, tx_digest)
447                LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
448                WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = -1
449            ) AS to_delete
450            WHERE objects.object_id = to_delete.object_id
451            AND objects.object_version < to_delete.object_version
452        "#;
453        let (object_ids, versions, tx_digests): (Vec<_>, Vec<_>, Vec<_>) = objects
454            .into_iter()
455            .map(|removed_object| {
456                (
457                    removed_object.object_id().as_bytes().to_vec(),
458                    removed_object.version() as i64,
459                    removed_object.transaction_digest.into_inner().to_vec(),
460                )
461            })
462            .multiunzip();
463        let query = diesel::sql_query(raw_query)
464            .bind::<Array<Bytea>, _>(object_ids)
465            .bind::<Array<BigInt>, _>(versions)
466            .bind::<Array<Bytea>, _>(tx_digests);
467        transactional_blocking_with_retry!(
468            &self.blocking_cp,
469            |conn| {
470                query.clone().execute(conn)?;
471                Ok::<(), IndexerError>(())
472            },
473            PG_DB_COMMIT_SLEEP_DURATION
474        )
475        .tap_ok(|_| {
476            let elapsed = guard.stop_and_record();
477            info!(elapsed, "Deleted {len} chunked objects");
478        })
479        .tap_err(|e| {
480            tracing::error!("failed to persist object deletions with error: {e}");
481        })
482    }
483
484    fn persist_object_mutation_chunk_in_existing_transaction(
485        &self,
486        conn: &mut PgConnection,
487        mutated_object_mutation_chunk: Vec<StoredObject>,
488    ) -> Result<(), IndexerError> {
489        on_conflict_do_update_with_condition!(
490            objects::table,
491            mutated_object_mutation_chunk,
492            objects::object_id,
493            (
494                objects::object_id.eq(excluded(objects::object_id)),
495                objects::object_version.eq(excluded(objects::object_version)),
496                objects::object_digest.eq(excluded(objects::object_digest)),
497                objects::owner_type.eq(excluded(objects::owner_type)),
498                objects::owner_id.eq(excluded(objects::owner_id)),
499                objects::object_type.eq(excluded(objects::object_type)),
500                objects::serialized_object.eq(excluded(objects::serialized_object)),
501                objects::coin_type.eq(excluded(objects::coin_type)),
502                objects::coin_balance.eq(excluded(objects::coin_balance)),
503                objects::df_kind.eq(excluded(objects::df_kind)),
504                objects::finalized_in_cp.eq(excluded(objects::finalized_in_cp)),
505            ),
506            excluded(objects::object_version).gt(objects::object_version),
507            conn
508        );
509        Ok::<(), IndexerError>(())
510    }
511
512    fn persist_object_deletion_chunk_in_existing_transaction(
513        &self,
514        conn: &mut PgConnection,
515        deleted_objects_chunk: Vec<StoredDeletedObject>,
516    ) -> Result<(), IndexerError> {
517        let (object_ids, object_versions): (Vec<_>, Vec<_>) = deleted_objects_chunk
518            .iter()
519            .map(|o| (o.object_id.clone(), o.object_version))
520            .unzip();
521        let raw_query = r#"
522            DELETE FROM objects
523            USING UNNEST($1::BYTEA[], $2::BIGINT[]) AS to_delete(object_id, object_version)
524            WHERE objects.object_id = to_delete.object_id
525            AND objects.object_version < to_delete.object_version
526        "#;
527        diesel::sql_query(raw_query)
528            .bind::<Array<Bytea>, _>(object_ids)
529            .bind::<Array<BigInt>, _>(object_versions)
530            .execute(conn)
531            .map_err(IndexerError::from)
532            .context("Failed to write object deletion to PostgresDB")?;
533        Ok::<(), IndexerError>(())
534    }
535
536    fn backfill_objects_snapshot_chunk(
537        &self,
538        objects_snapshot: Vec<StoredObjectSnapshot>,
539    ) -> Result<(), IndexerError> {
540        let guard = self
541            .metrics
542            .checkpoint_db_commit_latency_objects_snapshot_chunks
543            .start_timer();
544        transactional_blocking_with_retry!(
545            &self.blocking_cp,
546            |conn| {
547                for objects_snapshot_chunk in
548                    objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
549                {
550                    on_conflict_do_update!(
551                        objects_snapshot::table,
552                        objects_snapshot_chunk,
553                        objects_snapshot::object_id,
554                        (
555                            objects_snapshot::object_version
556                                .eq(excluded(objects_snapshot::object_version)),
557                            objects_snapshot::object_status
558                                .eq(excluded(objects_snapshot::object_status)),
559                            objects_snapshot::object_digest
560                                .eq(excluded(objects_snapshot::object_digest)),
561                            objects_snapshot::checkpoint_sequence_number
562                                .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
563                            objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
564                            objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
565                            objects_snapshot::object_type_package
566                                .eq(excluded(objects_snapshot::object_type_package)),
567                            objects_snapshot::object_type_module
568                                .eq(excluded(objects_snapshot::object_type_module)),
569                            objects_snapshot::object_type_name
570                                .eq(excluded(objects_snapshot::object_type_name)),
571                            objects_snapshot::object_type
572                                .eq(excluded(objects_snapshot::object_type)),
573                            objects_snapshot::serialized_object
574                                .eq(excluded(objects_snapshot::serialized_object)),
575                            objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
576                            objects_snapshot::coin_balance
577                                .eq(excluded(objects_snapshot::coin_balance)),
578                            objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
579                        ),
580                        conn
581                    );
582                }
583                Ok::<(), IndexerError>(())
584            },
585            PG_DB_COMMIT_SLEEP_DURATION
586        )
587        .tap_ok(|_| {
588            let elapsed = guard.stop_and_record();
589            info!(
590                elapsed,
591                "Persisted {} chunked objects snapshot",
592                objects_snapshot.len(),
593            );
594        })
595        .tap_err(|e| {
596            tracing::error!("failed to persist object snapshot with error: {e}");
597        })
598    }
599
600    fn persist_objects_history_chunk(
601        &self,
602        stored_objects_history: Vec<StoredHistoryObject>,
603    ) -> Result<(), IndexerError> {
604        let guard = self
605            .metrics
606            .checkpoint_db_commit_latency_objects_history_chunks
607            .start_timer();
608        transactional_blocking_with_retry!(
609            &self.blocking_cp,
610            |conn| {
611                for stored_objects_history_chunk in
612                    stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
613                {
614                    insert_or_ignore_into!(
615                        objects_history::table,
616                        stored_objects_history_chunk,
617                        conn
618                    );
619                }
620                Ok::<(), IndexerError>(())
621            },
622            PG_DB_COMMIT_SLEEP_DURATION
623        )
624        .tap_ok(|_| {
625            let elapsed = guard.stop_and_record();
626            info!(
627                elapsed,
628                "Persisted {} chunked objects history",
629                stored_objects_history.len(),
630            );
631        })
632        .tap_err(|e| {
633            tracing::error!("failed to persist object history with error: {e}");
634        })
635    }
636
637    fn persist_object_version_chunk(
638        &self,
639        object_versions: Vec<StoredObjectVersion>,
640    ) -> Result<(), IndexerError> {
641        let guard = self
642            .metrics
643            .checkpoint_db_commit_latency_objects_version_chunks
644            .start_timer();
645
646        transactional_blocking_with_retry!(
647            &self.blocking_cp,
648            |conn| {
649                for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
650                {
651                    insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
652                }
653                Ok::<(), IndexerError>(())
654            },
655            PG_DB_COMMIT_SLEEP_DURATION
656        )
657        .tap_ok(|_| {
658            let elapsed = guard.stop_and_record();
659            info!(
660                elapsed,
661                "Persisted {} chunked object versions",
662                object_versions.len(),
663            );
664        })
665        .tap_err(|e| {
666            tracing::error!("failed to persist object versions with error: {e}");
667        })
668    }
669
670    fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
671        let Some(first_checkpoint) = checkpoints.first() else {
672            return Ok(());
673        };
674
675        // If the first checkpoint has sequence number 0, we need to persist the digest
676        // as chain identifier.
677        if first_checkpoint.sequence_number == 0 {
678            let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
679            self.persist_protocol_configs_and_feature_flags(checkpoint_digest)?;
680            transactional_blocking_with_retry!(
681                &self.blocking_cp,
682                |conn| {
683                    let checkpoint_digest =
684                        first_checkpoint.checkpoint_digest.into_inner().to_vec();
685                    insert_or_ignore_into!(
686                        chain_identifier::table,
687                        StoredChainIdentifier { checkpoint_digest },
688                        conn
689                    );
690                    Ok::<(), IndexerError>(())
691                },
692                PG_DB_COMMIT_SLEEP_DURATION
693            )?;
694        }
695        let guard = self
696            .metrics
697            .checkpoint_db_commit_latency_checkpoints
698            .start_timer();
699
700        let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
701        transactional_blocking_with_retry!(
702            &self.blocking_cp,
703            |conn| {
704                for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
705                    insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
706                }
707                Ok::<(), IndexerError>(())
708            },
709            PG_DB_COMMIT_SLEEP_DURATION
710        )
711        .tap_ok(|_| {
712            info!(
713                "Persisted {} pruner_cp_watermark rows.",
714                stored_cp_txs.len(),
715            );
716        })
717        .tap_err(|e| {
718            tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
719        })?;
720
721        let stored_checkpoints = checkpoints
722            .iter()
723            .map(StoredCheckpoint::from)
724            .collect::<Vec<_>>();
725        transactional_blocking_with_retry!(
726            &self.blocking_cp,
727            |conn| {
728                for stored_checkpoint_chunk in
729                    stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
730                {
731                    insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
732                    let time_now_ms = chrono::Utc::now().timestamp_millis();
733                    for stored_checkpoint in stored_checkpoint_chunk {
734                        self.metrics
735                            .db_commit_lag_ms
736                            .set(time_now_ms - stored_checkpoint.timestamp_ms);
737                        self.metrics.max_committed_checkpoint_sequence_number.set(
738                            stored_checkpoint.sequence_number,
739                        );
740                        self.metrics.committed_checkpoint_timestamp_ms.set(
741                            stored_checkpoint.timestamp_ms,
742                        );
743                    }
744                    for stored_checkpoint in stored_checkpoint_chunk {
745                        info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
746                    }
747                }
748                Ok::<(), IndexerError>(())
749            },
750            PG_DB_COMMIT_SLEEP_DURATION
751        )
752        .tap_ok(|_| {
753            let elapsed = guard.stop_and_record();
754            info!(
755                elapsed,
756                "Persisted {} checkpoints",
757                stored_checkpoints.len()
758            );
759        })
760        .tap_err(|e| {
761            tracing::error!("failed to persist checkpoints with error: {e}");
762        })
763    }
764
765    fn persist_transactions_chunk(
766        &self,
767        transactions: Vec<IndexedTransaction>,
768    ) -> Result<(), IndexerError> {
769        let guard = self
770            .metrics
771            .checkpoint_db_commit_latency_transactions_chunks
772            .start_timer();
773        let transformation_guard = self
774            .metrics
775            .checkpoint_db_commit_latency_transactions_chunks_transformation
776            .start_timer();
777        let transactions = transactions
778            .iter()
779            .map(StoredTransaction::from)
780            .collect::<Vec<_>>();
781        drop(transformation_guard);
782
783        transactional_blocking_with_retry!(
784            &self.blocking_cp,
785            |conn| {
786                for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
787                    insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
788                }
789                Ok::<(), IndexerError>(())
790            },
791            PG_DB_COMMIT_SLEEP_DURATION
792        )
793        .tap_ok(|_| {
794            let elapsed = guard.stop_and_record();
795            info!(
796                elapsed,
797                "Persisted {} chunked transactions",
798                transactions.len()
799            );
800        })
801        .tap_err(|e| {
802            tracing::error!("failed to persist transactions with error: {e}");
803        })
804    }
805
806    fn persist_tx_global_order_chunk(
807        &self,
808        tx_order: Vec<TxGlobalOrder>,
809    ) -> Result<(), IndexerError> {
810        let guard = self
811            .metrics
812            .checkpoint_db_commit_latency_tx_insertion_order_chunks
813            .start_timer();
814
815        transactional_blocking_with_retry!(
816            &self.blocking_cp,
817            |conn| {
818                for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
819                    // Upsert: on conflict (row already inserted by optimistic path),
820                    // set `chk_tx_sequence_number` so checkpoint data is available
821                    // immediately.
822                    on_conflict_do_update_with_condition!(
823                        tx_global_order::table,
824                        tx_order_chunk,
825                        tx_global_order::tx_digest,
826                        tx_global_order::chk_tx_sequence_number
827                            .eq(excluded(tx_global_order::chk_tx_sequence_number)),
828                        tx_global_order::chk_tx_sequence_number.is_null(),
829                        conn
830                    );
831                }
832                Ok::<(), IndexerError>(())
833            },
834            PG_DB_COMMIT_SLEEP_DURATION
835        )
836        .tap_ok(|_| {
837            let elapsed = guard.stop_and_record();
838            info!(
839                elapsed,
840                "Persisted {} chunked txs insertion order",
841                tx_order.len()
842            );
843        })
844        .tap_err(|e| {
845            tracing::error!("failed to persist txs insertion order with error: {e}");
846        })
847    }
848
849    fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
850        let guard = self
851            .metrics
852            .checkpoint_db_commit_latency_events_chunks
853            .start_timer();
854        let len = events.len();
855        let events = events
856            .into_iter()
857            .map(StoredEvent::from)
858            .collect::<Vec<_>>();
859
860        transactional_blocking_with_retry!(
861            &self.blocking_cp,
862            |conn| {
863                for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
864                    insert_or_ignore_into!(events::table, event_chunk, conn);
865                }
866                Ok::<(), IndexerError>(())
867            },
868            PG_DB_COMMIT_SLEEP_DURATION
869        )
870        .tap_ok(|_| {
871            let elapsed = guard.stop_and_record();
872            info!(elapsed, "Persisted {} chunked events", len);
873        })
874        .tap_err(|e| {
875            tracing::error!("failed to persist events with error: {e}");
876        })
877    }
878
879    fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
880        if packages.is_empty() {
881            return Ok(());
882        }
883        let guard = self
884            .metrics
885            .checkpoint_db_commit_latency_packages
886            .start_timer();
887        let packages = packages
888            .into_iter()
889            .map(StoredPackage::from)
890            .collect::<Vec<_>>();
891        transactional_blocking_with_retry!(
892            &self.blocking_cp,
893            |conn| {
894                for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
895                    on_conflict_do_update!(
896                        packages::table,
897                        packages_chunk,
898                        packages::package_id,
899                        (
900                            packages::package_id.eq(excluded(packages::package_id)),
901                            packages::move_package.eq(excluded(packages::move_package)),
902                        ),
903                        conn
904                    );
905                }
906                Ok::<(), IndexerError>(())
907            },
908            PG_DB_COMMIT_SLEEP_DURATION
909        )
910        .tap_ok(|_| {
911            let elapsed = guard.stop_and_record();
912            info!(elapsed, "Persisted {} packages", packages.len());
913        })
914        .tap_err(|e| {
915            tracing::error!("failed to persist packages with error: {e}");
916        })
917    }
918
919    async fn persist_event_indices_chunk(
920        &self,
921        indices: Vec<EventIndex>,
922    ) -> Result<(), IndexerError> {
923        let guard = self
924            .metrics
925            .checkpoint_db_commit_latency_event_indices_chunks
926            .start_timer();
927        let len = indices.len();
928        let (
929            event_emit_packages,
930            event_emit_modules,
931            event_senders,
932            event_struct_packages,
933            event_struct_modules,
934            event_struct_names,
935            event_struct_instantiations,
936        ) = indices.into_iter().map(|i| i.split()).fold(
937            (
938                Vec::new(),
939                Vec::new(),
940                Vec::new(),
941                Vec::new(),
942                Vec::new(),
943                Vec::new(),
944                Vec::new(),
945            ),
946            |(
947                mut event_emit_packages,
948                mut event_emit_modules,
949                mut event_senders,
950                mut event_struct_packages,
951                mut event_struct_modules,
952                mut event_struct_names,
953                mut event_struct_instantiations,
954            ),
955             index| {
956                event_emit_packages.push(index.0);
957                event_emit_modules.push(index.1);
958                event_senders.push(index.2);
959                event_struct_packages.push(index.3);
960                event_struct_modules.push(index.4);
961                event_struct_names.push(index.5);
962                event_struct_instantiations.push(index.6);
963                (
964                    event_emit_packages,
965                    event_emit_modules,
966                    event_senders,
967                    event_struct_packages,
968                    event_struct_modules,
969                    event_struct_names,
970                    event_struct_instantiations,
971                )
972            },
973        );
974
975        // Now persist all the event indices in parallel into their tables.
976        let mut futures = vec![];
977        futures.push(self.spawn_blocking_task(move |this| {
978            persist_chunk_into_table!(
979                event_emit_package::table,
980                event_emit_packages,
981                &this.blocking_cp
982            )
983        }));
984
985        futures.push(self.spawn_blocking_task(move |this| {
986            persist_chunk_into_table!(
987                event_emit_module::table,
988                event_emit_modules,
989                &this.blocking_cp
990            )
991        }));
992
993        futures.push(self.spawn_blocking_task(move |this| {
994            persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
995        }));
996
997        futures.push(self.spawn_blocking_task(move |this| {
998            persist_chunk_into_table!(
999                event_struct_package::table,
1000                event_struct_packages,
1001                &this.blocking_cp
1002            )
1003        }));
1004
1005        futures.push(self.spawn_blocking_task(move |this| {
1006            persist_chunk_into_table!(
1007                event_struct_module::table,
1008                event_struct_modules,
1009                &this.blocking_cp
1010            )
1011        }));
1012
1013        futures.push(self.spawn_blocking_task(move |this| {
1014            persist_chunk_into_table!(
1015                event_struct_name::table,
1016                event_struct_names,
1017                &this.blocking_cp
1018            )
1019        }));
1020
1021        futures.push(self.spawn_blocking_task(move |this| {
1022            persist_chunk_into_table!(
1023                event_struct_instantiation::table,
1024                event_struct_instantiations,
1025                &this.blocking_cp
1026            )
1027        }));
1028
1029        futures::future::try_join_all(futures)
1030            .await
1031            .map_err(|e| {
1032                tracing::error!("failed to join event indices futures in a chunk: {e}");
1033                IndexerError::from(e)
1034            })?
1035            .into_iter()
1036            .collect::<Result<Vec<_>, _>>()
1037            .map_err(|e| {
1038                IndexerError::PostgresWrite(format!(
1039                    "Failed to persist all event indices in a chunk: {e:?}"
1040                ))
1041            })?;
1042        let elapsed = guard.stop_and_record();
1043        info!(elapsed, "Persisted {} chunked event indices", len);
1044        Ok(())
1045    }
1046
1047    async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1048        let guard = self
1049            .metrics
1050            .checkpoint_db_commit_latency_tx_indices_chunks
1051            .start_timer();
1052        let len = indices.len();
1053
1054        let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
1055
1056        let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1057        let recipients: Vec<_> = splits
1058            .iter()
1059            .flat_map(|ix| ix.tx_recipients.clone())
1060            .collect();
1061        let input_objects: Vec<_> = splits
1062            .iter()
1063            .flat_map(|ix| ix.tx_input_objects.clone())
1064            .collect();
1065        let changed_objects: Vec<_> = splits
1066            .iter()
1067            .flat_map(|ix| ix.tx_changed_objects.clone())
1068            .collect();
1069        let wrapped_or_deleted_objects: Vec<_> = splits
1070            .iter()
1071            .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1072            .collect();
1073        let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1074        let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1075        let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1076        let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1077        let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1078
1079        let futures = [
1080            self.spawn_blocking_task(move |this| {
1081                persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1082            }),
1083            self.spawn_blocking_task(move |this| {
1084                persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1085            }),
1086            self.spawn_blocking_task(move |this| {
1087                persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1088            }),
1089            self.spawn_blocking_task(move |this| {
1090                persist_chunk_into_table!(
1091                    tx_changed_objects::table,
1092                    changed_objects,
1093                    &this.blocking_cp
1094                )
1095            }),
1096            self.spawn_blocking_task(move |this| {
1097                persist_chunk_into_table!(
1098                    tx_wrapped_or_deleted_objects::table,
1099                    wrapped_or_deleted_objects,
1100                    &this.blocking_cp
1101                )
1102            }),
1103            self.spawn_blocking_task(move |this| {
1104                persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1105            }),
1106            self.spawn_blocking_task(move |this| {
1107                persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1108            }),
1109            self.spawn_blocking_task(move |this| {
1110                persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1111            }),
1112            self.spawn_blocking_task(move |this| {
1113                persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1114            }),
1115            self.spawn_blocking_task(move |this| {
1116                persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1117            }),
1118        ];
1119
1120        futures::future::try_join_all(futures)
1121            .await
1122            .map_err(|e| {
1123                tracing::error!("failed to join tx indices futures in a chunk: {e}");
1124                IndexerError::from(e)
1125            })?
1126            .into_iter()
1127            .collect::<Result<Vec<_>, _>>()
1128            .map_err(|e| {
1129                IndexerError::PostgresWrite(format!(
1130                    "Failed to persist all tx indices in a chunk: {e:?}"
1131                ))
1132            })?;
1133        let elapsed = guard.stop_and_record();
1134        info!(elapsed, "Persisted {} chunked tx_indices", len);
1135        Ok(())
1136    }
1137
1138    fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1139        let guard = self
1140            .metrics
1141            .checkpoint_db_commit_latency_epoch
1142            .start_timer();
1143        let epoch_id = epoch.new_epoch.epoch;
1144
1145        transactional_blocking_with_retry!(
1146            &self.blocking_cp,
1147            |conn| {
1148                if let Some(last_epoch) = &epoch.last_epoch {
1149                    info!(last_epoch.epoch, "Persisting epoch end data.");
1150                    diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1151                        .set(last_epoch)
1152                        .execute(conn)?;
1153                }
1154
1155                info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1156                insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1157                Ok::<(), IndexerError>(())
1158            },
1159            PG_DB_COMMIT_SLEEP_DURATION
1160        )
1161        .tap_ok(|_| {
1162            let elapsed = guard.stop_and_record();
1163            info!(elapsed, epoch_id, "Persisted epoch beginning info");
1164        })
1165        .tap_err(|e| {
1166            tracing::error!("failed to persist epoch with error: {e}");
1167        })
1168    }
1169
1170    fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1171        let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1172        // partition_0 has been created, so no need to advance it.
1173        if let Some(last_epoch_id) = last_epoch_id {
1174            let last_db_epoch: Option<StoredEpochInfo> =
1175                read_only_blocking!(&self.blocking_cp, |conn| {
1176                    epochs::table
1177                        .filter(epochs::epoch.eq(last_epoch_id))
1178                        .first::<StoredEpochInfo>(conn)
1179                        .optional()
1180                })
1181                .context("Failed to read last epoch from PostgresDB")?;
1182            if let Some(last_epoch) = last_db_epoch {
1183                let epoch_partition_data =
1184                    EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1185                let table_partitions = self.partition_manager.get_table_partitions()?;
1186                for (table, (_, last_partition)) in table_partitions {
1187                    // Only advance epoch partition for epoch partitioned tables.
1188                    if !self
1189                        .partition_manager
1190                        .get_strategy(&table)
1191                        .is_epoch_partitioned()
1192                    {
1193                        continue;
1194                    }
1195                    let guard = self.metrics.advance_epoch_latency.start_timer();
1196                    self.partition_manager.advance_epoch(
1197                        table.clone(),
1198                        last_partition,
1199                        &epoch_partition_data,
1200                    )?;
1201                    let elapsed = guard.stop_and_record();
1202                    info!(
1203                        elapsed,
1204                        "Advanced epoch partition {} for table {}",
1205                        last_partition,
1206                        table.clone()
1207                    );
1208                }
1209            } else {
1210                tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1211            }
1212        }
1213
1214        Ok(())
1215    }
1216
1217    fn prune_checkpoints_table_by_range(
1218        &self,
1219        min_cp: u64,
1220        max_cp: u64,
1221    ) -> Result<(), IndexerError> {
1222        transactional_blocking_with_retry!(
1223            &self.blocking_cp,
1224            |conn| {
1225                diesel::delete(
1226                    checkpoints::table
1227                        .filter(checkpoints::sequence_number.between(min_cp as i64, max_cp as i64)),
1228                )
1229                .execute(conn)
1230                .map_err(IndexerError::from)
1231                .context("Failed to prune checkpoints table by range")?;
1232
1233                Ok::<(), IndexerError>(())
1234            },
1235            PG_DB_COMMIT_SLEEP_DURATION
1236        )
1237    }
1238
1239    /// Prunes tx_global_order table by transaction range using
1240    /// chk_tx_sequence_number
1241    fn prune_tx_global_order(
1242        &self,
1243        conn: &mut PgConnection,
1244        min_tx: i64,
1245        max_tx: i64,
1246    ) -> Result<(), IndexerError> {
1247        diesel::delete(
1248            tx_global_order::table
1249                .filter(tx_global_order::chk_tx_sequence_number.between(min_tx, max_tx)),
1250        )
1251        .execute(conn)
1252        .map_err(IndexerError::from)
1253        .context("Failed to prune tx_global_order table")
1254        .map(|_| ())
1255    }
1256
1257    /// Prunes a single transaction or event index table by transaction range
1258    fn prune_single_tx_or_event_table(
1259        &self,
1260        table: &crate::pruning::pruner::PrunableTable,
1261        min_tx: u64,
1262        max_tx: u64,
1263    ) -> Result<(), IndexerError> {
1264        use crate::pruning::pruner::PrunableTable;
1265
1266        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1267
1268        transactional_blocking_with_retry!(
1269            &self.blocking_cp,
1270            |conn| {
1271                match table {
1272                    // Event index tables
1273                    PrunableTable::EventEmitModule => {
1274                        prune_tx_or_event_indice_table!(
1275                            event_emit_module,
1276                            conn,
1277                            min_tx,
1278                            max_tx,
1279                            "Failed to prune event_emit_module table"
1280                        );
1281                    }
1282                    PrunableTable::EventEmitPackage => {
1283                        prune_tx_or_event_indice_table!(
1284                            event_emit_package,
1285                            conn,
1286                            min_tx,
1287                            max_tx,
1288                            "Failed to prune event_emit_package table"
1289                        );
1290                    }
1291                    PrunableTable::EventSenders => {
1292                        prune_tx_or_event_indice_table!(
1293                            event_senders,
1294                            conn,
1295                            min_tx,
1296                            max_tx,
1297                            "Failed to prune event_senders table"
1298                        );
1299                    }
1300                    PrunableTable::EventStructInstantiation => {
1301                        prune_tx_or_event_indice_table!(
1302                            event_struct_instantiation,
1303                            conn,
1304                            min_tx,
1305                            max_tx,
1306                            "Failed to prune event_struct_instantiation table"
1307                        );
1308                    }
1309                    PrunableTable::EventStructModule => {
1310                        prune_tx_or_event_indice_table!(
1311                            event_struct_module,
1312                            conn,
1313                            min_tx,
1314                            max_tx,
1315                            "Failed to prune event_struct_module table"
1316                        );
1317                    }
1318                    PrunableTable::EventStructName => {
1319                        prune_tx_or_event_indice_table!(
1320                            event_struct_name,
1321                            conn,
1322                            min_tx,
1323                            max_tx,
1324                            "Failed to prune event_struct_name table"
1325                        );
1326                    }
1327                    PrunableTable::EventStructPackage => {
1328                        prune_tx_or_event_indice_table!(
1329                            event_struct_package,
1330                            conn,
1331                            min_tx,
1332                            max_tx,
1333                            "Failed to prune event_struct_package table"
1334                        );
1335                    }
1336
1337                    // Transaction index tables
1338                    PrunableTable::TxSenders => {
1339                        prune_tx_or_event_indice_table!(
1340                            tx_senders,
1341                            conn,
1342                            min_tx,
1343                            max_tx,
1344                            "Failed to prune tx_senders table"
1345                        );
1346                    }
1347                    PrunableTable::TxRecipients => {
1348                        prune_tx_or_event_indice_table!(
1349                            tx_recipients,
1350                            conn,
1351                            min_tx,
1352                            max_tx,
1353                            "Failed to prune tx_recipients table"
1354                        );
1355                    }
1356                    PrunableTable::TxInputObjects => {
1357                        prune_tx_or_event_indice_table!(
1358                            tx_input_objects,
1359                            conn,
1360                            min_tx,
1361                            max_tx,
1362                            "Failed to prune tx_input_objects table"
1363                        );
1364                    }
1365                    PrunableTable::TxChangedObjects => {
1366                        prune_tx_or_event_indice_table!(
1367                            tx_changed_objects,
1368                            conn,
1369                            min_tx,
1370                            max_tx,
1371                            "Failed to prune tx_changed_objects table"
1372                        );
1373                    }
1374                    PrunableTable::TxWrappedOrDeletedObjects => {
1375                        prune_tx_or_event_indice_table!(
1376                            tx_wrapped_or_deleted_objects,
1377                            conn,
1378                            min_tx,
1379                            max_tx,
1380                            "Failed to prune tx_wrapped_or_deleted_objects table"
1381                        );
1382                    }
1383                    PrunableTable::TxCallsPkg => {
1384                        prune_tx_or_event_indice_table!(
1385                            tx_calls_pkg,
1386                            conn,
1387                            min_tx,
1388                            max_tx,
1389                            "Failed to prune tx_calls_pkg table"
1390                        );
1391                    }
1392                    PrunableTable::TxCallsMod => {
1393                        prune_tx_or_event_indice_table!(
1394                            tx_calls_mod,
1395                            conn,
1396                            min_tx,
1397                            max_tx,
1398                            "Failed to prune tx_calls_mod table"
1399                        );
1400                    }
1401                    PrunableTable::TxCallsFun => {
1402                        prune_tx_or_event_indice_table!(
1403                            tx_calls_fun,
1404                            conn,
1405                            min_tx,
1406                            max_tx,
1407                            "Failed to prune tx_calls_fun table"
1408                        );
1409                    }
1410                    PrunableTable::TxDigests => {
1411                        prune_tx_or_event_indice_table!(
1412                            tx_digests,
1413                            conn,
1414                            min_tx,
1415                            max_tx,
1416                            "Failed to prune tx_digests table"
1417                        );
1418                    }
1419                    PrunableTable::TxKinds => {
1420                        prune_tx_or_event_indice_table!(
1421                            tx_kinds,
1422                            conn,
1423                            min_tx,
1424                            max_tx,
1425                            "Failed to prune tx_kinds table"
1426                        );
1427                    }
1428                    PrunableTable::TxGlobalOrder => {
1429                        self.prune_tx_global_order(conn, min_tx, max_tx)?;
1430                    }
1431                    _ => {
1432                        return Err(IndexerError::InvalidArgument(format!(
1433                            "table {} is not a transaction or event index table",
1434                            table.as_ref()
1435                        )));
1436                    }
1437                }
1438                Ok::<(), IndexerError>(())
1439            },
1440            PG_DB_COMMIT_SLEEP_DURATION
1441        )
1442    }
1443
1444    /// Prune optimistic_transactions table by global_sequence_number range.
1445    /// Prunes at most `limit` rows and returns the number of rows deleted.
1446    fn prune_optimistic_tx_by_global_seq(
1447        &self,
1448        start: u64,
1449        end: u64,
1450        limit: i64,
1451    ) -> Result<usize, IndexerError> {
1452        use diesel::prelude::*;
1453
1454        transactional_blocking_with_retry!(
1455            &self.blocking_cp,
1456            |conn| {
1457                let sql = r#"
1458                    WITH ids_to_delete AS (
1459                         SELECT global_sequence_number, optimistic_sequence_number
1460                         FROM optimistic_transactions
1461                         WHERE global_sequence_number BETWEEN $1 AND $2
1462                         ORDER BY global_sequence_number, optimistic_sequence_number
1463                         FOR UPDATE
1464                         LIMIT $3
1465                     )
1466                     DELETE FROM optimistic_transactions otx
1467                     USING ids_to_delete
1468                     WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
1469                           (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
1470                "#;
1471                diesel::sql_query(sql)
1472                    .bind::<diesel::sql_types::BigInt, _>(start as i64)
1473                    .bind::<diesel::sql_types::BigInt, _>(end as i64)
1474                    .bind::<diesel::sql_types::BigInt, _>(limit)
1475                    .execute(conn)
1476                    .map_err(IndexerError::from)
1477                    .context(
1478                        format!(
1479                            "failed to prune optimistic_transactions table by global_sequence_number range [{start}..={end}] with limit {limit}"
1480                        )
1481                        .as_str(),
1482                    )
1483            },
1484            PG_DB_COMMIT_SLEEP_DURATION
1485        )
1486    }
1487
1488    fn prune_cp_tx_table_by_range(&self, min_cp: u64, max_cp: u64) -> Result<(), IndexerError> {
1489        transactional_blocking_with_retry!(
1490            &self.blocking_cp,
1491            |conn| {
1492                diesel::delete(
1493                    pruner_cp_watermark::table.filter(
1494                        pruner_cp_watermark::checkpoint_sequence_number
1495                            .between(min_cp as i64, max_cp as i64),
1496                    ),
1497                )
1498                .execute(conn)
1499                .map_err(IndexerError::from)
1500                .context("Failed to prune pruner_cp_watermark table by range")?;
1501                Ok::<(), IndexerError>(())
1502            },
1503            PG_DB_COMMIT_SLEEP_DURATION
1504        )
1505    }
1506
1507    fn prune_table_by_checkpoint_range(
1508        &self,
1509        table: &crate::pruning::pruner::PrunableTable,
1510        min_checkpoint: u64,
1511        max_checkpoint: u64,
1512    ) -> Result<(), IndexerError> {
1513        use crate::pruning::pruner::PrunableTable;
1514
1515        match table {
1516            PrunableTable::Checkpoints => {
1517                self.prune_checkpoints_table_by_range(min_checkpoint, max_checkpoint)
1518            }
1519            PrunableTable::PrunerCpWatermark => {
1520                self.prune_cp_tx_table_by_range(min_checkpoint, max_checkpoint)
1521            }
1522            _ => Err(IndexerError::InvalidArgument(format!(
1523                "table {} is not pruned by checkpoint",
1524                table.as_ref()
1525            ))),
1526        }
1527    }
1528
1529    fn get_network_total_transactions_by_end_of_epoch(
1530        &self,
1531        epoch: u64,
1532    ) -> Result<Option<u64>, IndexerError> {
1533        read_only_blocking!(&self.blocking_cp, |conn| {
1534            epochs::table
1535                .filter(epochs::epoch.eq(epoch as i64))
1536                .select(epochs::network_total_transactions)
1537                .get_result::<Option<i64>>(conn)
1538        })
1539        .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1540        .map(|option| option.map(|v| v as u64))
1541    }
1542
1543    fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1544        transactional_blocking_with_retry!(
1545            &self.blocking_cp,
1546            |conn| {
1547                diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1548                    .execute(conn)?;
1549                Ok::<(), IndexerError>(())
1550            },
1551            PG_DB_COMMIT_SLEEP_DURATION
1552        )
1553        .tap_ok(|_| {
1554            info!("Successfully refreshed participation_metrics");
1555        })
1556        .tap_err(|e| {
1557            tracing::error!("failed to refresh participation_metrics: {e}");
1558        })
1559    }
1560
1561    fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1562        &self,
1563        watermark: CommitterWatermark,
1564    ) -> Result<(), IndexerError>
1565    where
1566        E::Iterator: Iterator<Item: AsRef<str>>,
1567    {
1568        use diesel::query_dsl::methods::FilterDsl;
1569
1570        let guard = self
1571            .metrics
1572            .checkpoint_db_commit_latency_watermarks
1573            .start_timer();
1574
1575        let upper_bound_updates = E::iter()
1576            .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1577            .collect::<Vec<_>>();
1578
1579        transactional_blocking_with_retry!(
1580            &self.blocking_cp,
1581            |conn| {
1582                diesel::insert_into(watermarks::table)
1583                    .values(&upper_bound_updates)
1584                    .on_conflict(watermarks::entity)
1585                    .do_update()
1586                    .set((
1587                        watermarks::current_epoch.eq(excluded(watermarks::current_epoch)),
1588                        watermarks::max_committed_cp.eq(excluded(watermarks::max_committed_cp)),
1589                        watermarks::max_committed_tx.eq(excluded(watermarks::max_committed_tx)),
1590                    ))
1591                    .filter(excluded(watermarks::max_committed_cp).ge(watermarks::max_committed_cp))
1592                    .filter(excluded(watermarks::max_committed_tx).ge(watermarks::max_committed_tx))
1593                    .filter(excluded(watermarks::current_epoch).ge(watermarks::current_epoch))
1594                    .execute(conn)
1595                    .map_err(IndexerError::from)
1596                    .context("Failed to update watermarks upper bound")?;
1597                Ok::<(), IndexerError>(())
1598            },
1599            PG_DB_COMMIT_SLEEP_DURATION
1600        )
1601        .tap_ok(|_| {
1602            let elapsed = guard.stop_and_record();
1603            info!(elapsed, "Persisted watermarks");
1604        })
1605        .tap_err(|e| {
1606            tracing::error!("Failed to persist watermarks with error: {}", e);
1607        })
1608    }
1609
1610    fn map_epochs_to_cp_tx(
1611        &self,
1612        epochs: &[u64],
1613    ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1614        let pool = &self.blocking_cp;
1615        let results: Vec<(i64, i64, i64)> = run_query!(pool, move |conn| {
1616            epochs::table
1617                .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1618                .select((
1619                    epochs::epoch,
1620                    epochs::first_checkpoint_id,
1621                    epochs::first_tx_sequence_number,
1622                ))
1623                .load::<(i64, i64, i64)>(conn)
1624        })
1625        .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1626
1627        Ok(results
1628            .into_iter()
1629            .map(|(epoch, checkpoint, tx)| (epoch as u64, (checkpoint as u64, tx as u64)))
1630            .collect())
1631    }
1632
1633    fn update_watermarks_lower_bound(
1634        &self,
1635        watermarks: Vec<(PrunableTable, u64)>,
1636    ) -> Result<(), IndexerError> {
1637        use diesel::query_dsl::methods::FilterDsl;
1638
1639        let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1640        let epoch_mapping = self.map_epochs_to_cp_tx(&epochs)?;
1641        let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1642            .into_iter()
1643            .map(|(table, epoch)| {
1644                let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1645                    IndexerError::PersistentStorageDataCorruption(format!(
1646                        "epoch {epoch} not found in epoch mapping",
1647                    ))
1648                })?;
1649                Ok(StoredWatermark::from_lower_bound_update(
1650                    table.as_ref(),
1651                    epoch,
1652                    *checkpoint,
1653                    *tx,
1654                ))
1655            })
1656            .collect();
1657        let lower_bound_updates = lookups?;
1658        let guard = self
1659            .metrics
1660            .checkpoint_db_commit_latency_watermarks
1661            .start_timer();
1662        transactional_blocking_with_retry!(
1663            &self.blocking_cp,
1664            |conn| {
1665                diesel::insert_into(watermarks::table)
1666                    .values(&lower_bound_updates)
1667                    .on_conflict(watermarks::entity)
1668                    .do_update()
1669                    .set((
1670                        watermarks::min_available_cp.eq(excluded(watermarks::min_available_cp)),
1671                        watermarks::min_available_tx.eq(excluded(watermarks::min_available_tx)),
1672                        watermarks::min_available_epoch
1673                            .eq(excluded(watermarks::min_available_epoch)),
1674                        watermarks::min_bounds_updated_at_timestamp_ms.eq(sql::<
1675                            diesel::sql_types::BigInt,
1676                        >(
1677                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1678                        )),
1679                    ))
1680                    .filter(excluded(watermarks::min_available_cp).gt(watermarks::min_available_cp))
1681                    .filter(excluded(watermarks::min_available_tx).gt(watermarks::min_available_tx))
1682                    .filter(
1683                        excluded(watermarks::min_available_epoch)
1684                            .gt(watermarks::min_available_epoch),
1685                    )
1686                    .filter(
1687                        diesel::dsl::sql::<diesel::sql_types::BigInt>(
1688                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1689                        )
1690                        .gt(watermarks::min_bounds_updated_at_timestamp_ms),
1691                    )
1692                    .execute(conn)
1693            },
1694            PG_DB_COMMIT_SLEEP_DURATION
1695        )
1696        .tap_ok(|_| {
1697            let elapsed = guard.stop_and_record();
1698            tracing::info!(elapsed, "Persisted watermarks lower bounds");
1699        })
1700        .tap_err(|e| {
1701            tracing::error!("Failed to persist watermarks with error: {}", e);
1702        })?;
1703        Ok(())
1704    }
1705
1706    fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1707        // read_only transaction, otherwise this will block and get blocked by write
1708        // transactions to the same table.
1709        run_query_with_retry!(
1710            &self.blocking_cp,
1711            |conn| {
1712                let stored = watermarks::table
1713                    .load::<StoredWatermark>(conn)
1714                    .map_err(Into::into)
1715                    .context("Failed reading watermarks from PostgresDB")?;
1716                let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1717                    "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1718                ))
1719                .get_result(conn)
1720                .map_err(Into::into)
1721                .context("Failed reading current timestamp from PostgresDB")?;
1722                Ok::<_, IndexerError>((stored, timestamp))
1723            },
1724            PG_DB_COMMIT_SLEEP_DURATION
1725        )
1726    }
1727
1728    fn update_watermark_lowest_unpruned_key(
1729        &self,
1730        table: &PrunableTable,
1731        lowest_unpruned_key: u64,
1732    ) -> Result<(), IndexerError> {
1733        transactional_blocking_with_retry!(
1734            &self.blocking_cp,
1735            |conn| {
1736                diesel::update(watermarks::table.filter(watermarks::entity.eq(table.as_ref())))
1737                    .set(watermarks::lowest_unpruned_key.eq(lowest_unpruned_key as i64))
1738                    .execute(conn)
1739                    .map_err(IndexerError::from)
1740                    .context("failed to update watermark lowest_unpruned_key")?;
1741                Ok::<(), IndexerError>(())
1742            },
1743            PG_DB_COMMIT_SLEEP_DURATION
1744        )
1745    }
1746
1747    fn get_watermark_by_entity(
1748        &self,
1749        entity: &str,
1750    ) -> Result<Option<StoredWatermark>, IndexerError> {
1751        run_query_with_retry!(
1752            &self.blocking_cp,
1753            |conn| {
1754                watermarks::table
1755                    .filter(watermarks::entity.eq(entity))
1756                    .first::<StoredWatermark>(conn)
1757                    .optional()
1758                    .map_err(Into::into)
1759                    .context("failed reading watermark by entity from PostgresDB")
1760            },
1761            PG_DB_COMMIT_SLEEP_DURATION
1762        )
1763    }
1764
1765    async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1766    where
1767        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1768        R: Send + 'static,
1769    {
1770        let this = self.clone();
1771        let current_span = tracing::Span::current();
1772        tokio::task::spawn_blocking(move || {
1773            mark_in_blocking_pool();
1774            let _guard = current_span.enter();
1775            f(this)
1776        })
1777        .await
1778        .map_err(Into::into)
1779        .and_then(std::convert::identity)
1780    }
1781
1782    fn spawn_blocking_task<F, R>(
1783        &self,
1784        f: F,
1785    ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1786    where
1787        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1788        R: Send + 'static,
1789    {
1790        let this = self.clone();
1791        let current_span = tracing::Span::current();
1792        let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1793        tokio::task::spawn_blocking(move || {
1794            mark_in_blocking_pool();
1795            let _guard = current_span.enter();
1796            let _elapsed = guard.stop_and_record();
1797            f(this)
1798        })
1799    }
1800
1801    fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1802    where
1803        F: FnOnce(Self) -> Fut + Send + 'static,
1804        Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1805        R: Send + 'static,
1806    {
1807        let this = self.clone();
1808        tokio::task::spawn(async move { f(this).await })
1809    }
1810}
1811
1812#[async_trait]
1813impl IndexerStore for PgIndexerStore {
1814    async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1815        self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1816            .await
1817    }
1818
1819    async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1820        self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1821            .await
1822    }
1823
1824    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1825        self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1826            .await
1827    }
1828
1829    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1830        self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1831            .await
1832    }
1833
1834    async fn get_latest_object_snapshot_watermark(
1835        &self,
1836    ) -> Result<Option<CommitterWatermark>, IndexerError> {
1837        self.execute_in_blocking_worker(|this| this.get_latest_object_snapshot_watermark())
1838            .await
1839    }
1840
1841    async fn get_latest_object_snapshot_checkpoint_sequence_number(
1842        &self,
1843    ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
1844        self.execute_in_blocking_worker(|this| {
1845            this.get_latest_object_snapshot_checkpoint_sequence_number()
1846        })
1847        .await
1848    }
1849
1850    fn persist_objects_in_existing_transaction(
1851        &self,
1852        conn: &mut PgConnection,
1853        object_changes: Vec<TransactionObjectChangesToCommit>,
1854    ) -> Result<(), IndexerError> {
1855        if object_changes.is_empty() {
1856            return Ok(());
1857        }
1858
1859        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1860        let object_mutations = indexed_mutations
1861            .into_iter()
1862            .map(StoredObject::from)
1863            .collect::<Vec<_>>();
1864        let object_deletions = indexed_deletions
1865            .into_iter()
1866            .map(StoredDeletedObject::from)
1867            .collect::<Vec<_>>();
1868
1869        self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1870        self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1871
1872        Ok(())
1873    }
1874
1875    async fn persist_objects_snapshot(
1876        &self,
1877        object_changes: Vec<TransactionObjectChangesToCommit>,
1878    ) -> Result<(), IndexerError> {
1879        if object_changes.is_empty() {
1880            return Ok(());
1881        }
1882        let guard = self
1883            .metrics
1884            .checkpoint_db_commit_latency_objects_snapshot
1885            .start_timer();
1886        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1887        let objects_snapshot = indexed_mutations
1888            .into_iter()
1889            .map(StoredObjectSnapshot::try_from)
1890            .chain(
1891                indexed_deletions
1892                    .into_iter()
1893                    .map(|o| Ok(StoredObjectSnapshot::from(o))),
1894            )
1895            .collect::<Result<Vec<_>, _>>()?;
1896        let len = objects_snapshot.len();
1897        let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1898        let futures = chunks
1899            .into_iter()
1900            .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1901
1902        futures::future::try_join_all(futures)
1903            .await
1904            .map_err(|e| {
1905                tracing::error!("failed to join backfill_objects_snapshot_chunk futures: {e}");
1906                IndexerError::from(e)
1907            })?
1908            .into_iter()
1909            .collect::<Result<Vec<_>, _>>()
1910            .map_err(|e| {
1911                IndexerError::PostgresWrite(format!(
1912                    "Failed to persist all objects snapshot chunks: {e:?}"
1913                ))
1914            })?;
1915        let elapsed = guard.stop_and_record();
1916        info!(elapsed, "Persisted {} objects snapshot", len);
1917        Ok(())
1918    }
1919
1920    async fn persist_object_history(
1921        &self,
1922        object_changes: Vec<TransactionObjectChangesToCommit>,
1923    ) -> Result<(), IndexerError> {
1924        let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1925            .map(|val| val.eq_ignore_ascii_case("true"))
1926            .unwrap_or(false);
1927        if skip_history {
1928            info!("skipping object history");
1929            return Ok(());
1930        }
1931
1932        if object_changes.is_empty() {
1933            return Ok(());
1934        }
1935        let objects = make_objects_history_to_commit(object_changes)?;
1936        let guard = self
1937            .metrics
1938            .checkpoint_db_commit_latency_objects_history
1939            .start_timer();
1940
1941        let len = objects.len();
1942        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1943        let futures = chunks
1944            .into_iter()
1945            .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1946
1947        futures::future::try_join_all(futures)
1948            .await
1949            .map_err(|e| {
1950                tracing::error!("failed to join persist_objects_history_chunk futures: {e}");
1951                IndexerError::from(e)
1952            })?
1953            .into_iter()
1954            .collect::<Result<Vec<_>, _>>()
1955            .map_err(|e| {
1956                IndexerError::PostgresWrite(format!(
1957                    "Failed to persist all objects history chunks: {e:?}"
1958                ))
1959            })?;
1960        let elapsed = guard.stop_and_record();
1961        info!(elapsed, "Persisted {} objects history", len);
1962        Ok(())
1963    }
1964
1965    async fn persist_object_versions(
1966        &self,
1967        object_versions: Vec<StoredObjectVersion>,
1968    ) -> Result<(), IndexerError> {
1969        if object_versions.is_empty() {
1970            return Ok(());
1971        }
1972
1973        let guard = self
1974            .metrics
1975            .checkpoint_db_commit_latency_objects_version
1976            .start_timer();
1977
1978        let object_versions_count = object_versions.len();
1979
1980        let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1981        let futures = chunks
1982            .into_iter()
1983            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1984            .collect::<Vec<_>>();
1985
1986        futures::future::try_join_all(futures)
1987            .await
1988            .map_err(|e| {
1989                tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1990                IndexerError::from(e)
1991            })?
1992            .into_iter()
1993            .collect::<Result<Vec<_>, _>>()
1994            .map_err(|e| {
1995                IndexerError::PostgresWrite(format!(
1996                    "Failed to persist all objects version chunks: {e:?}"
1997                ))
1998            })?;
1999        let elapsed = guard.stop_and_record();
2000        info!(elapsed, "Persisted {object_versions_count} object versions");
2001        Ok(())
2002    }
2003
2004    async fn persist_checkpoints(
2005        &self,
2006        checkpoints: Vec<IndexedCheckpoint>,
2007    ) -> Result<(), IndexerError> {
2008        self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
2009            .await
2010    }
2011
2012    async fn persist_transactions(
2013        &self,
2014        transactions: Vec<IndexedTransaction>,
2015    ) -> Result<(), IndexerError> {
2016        let guard = self
2017            .metrics
2018            .checkpoint_db_commit_latency_transactions
2019            .start_timer();
2020        let len = transactions.len();
2021
2022        let chunks = chunk!(transactions, self.config.parallel_chunk_size);
2023        let futures = chunks
2024            .into_iter()
2025            .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
2026
2027        futures::future::try_join_all(futures)
2028            .await
2029            .map_err(|e| {
2030                tracing::error!("failed to join persist_transactions_chunk futures: {e}");
2031                IndexerError::from(e)
2032            })?
2033            .into_iter()
2034            .collect::<Result<Vec<_>, _>>()
2035            .map_err(|e| {
2036                IndexerError::PostgresWrite(format!(
2037                    "Failed to persist all transactions chunks: {e:?}"
2038                ))
2039            })?;
2040        let elapsed = guard.stop_and_record();
2041        info!(elapsed, "Persisted {} transactions", len);
2042        Ok(())
2043    }
2044
2045    fn persist_optimistic_transaction_in_existing_transaction(
2046        &self,
2047        conn: &mut PgConnection,
2048        transaction: OptimisticTransaction,
2049    ) -> Result<(), IndexerError> {
2050        insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
2051        Ok(())
2052    }
2053
2054    async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
2055        if events.is_empty() {
2056            return Ok(());
2057        }
2058        let len = events.len();
2059        let guard = self
2060            .metrics
2061            .checkpoint_db_commit_latency_events
2062            .start_timer();
2063        let chunks = chunk!(events, self.config.parallel_chunk_size);
2064        let futures = chunks
2065            .into_iter()
2066            .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
2067
2068        futures::future::try_join_all(futures)
2069            .await
2070            .map_err(|e| {
2071                tracing::error!("failed to join persist_events_chunk futures: {e}");
2072                IndexerError::from(e)
2073            })?
2074            .into_iter()
2075            .collect::<Result<Vec<_>, _>>()
2076            .map_err(|e| {
2077                IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
2078            })?;
2079        let elapsed = guard.stop_and_record();
2080        info!(elapsed, "Persisted {} events", len);
2081        Ok(())
2082    }
2083
2084    async fn persist_displays(
2085        &self,
2086        display_updates: BTreeMap<String, StoredDisplay>,
2087    ) -> Result<(), IndexerError> {
2088        if display_updates.is_empty() {
2089            return Ok(());
2090        }
2091
2092        self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
2093            .await?
2094    }
2095
2096    fn persist_displays_in_existing_transaction(
2097        &self,
2098        conn: &mut PgConnection,
2099        display_updates: Vec<&StoredDisplay>,
2100    ) -> Result<(), IndexerError> {
2101        if display_updates.is_empty() {
2102            return Ok(());
2103        }
2104
2105        on_conflict_do_update_with_condition!(
2106            display::table,
2107            display_updates,
2108            display::object_type,
2109            (
2110                display::id.eq(excluded(display::id)),
2111                display::version.eq(excluded(display::version)),
2112                display::bcs.eq(excluded(display::bcs)),
2113            ),
2114            excluded(display::version).gt(display::version),
2115            conn
2116        );
2117
2118        Ok(())
2119    }
2120
2121    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2122        if packages.is_empty() {
2123            return Ok(());
2124        }
2125        self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
2126            .await
2127    }
2128
2129    async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2130        if indices.is_empty() {
2131            return Ok(());
2132        }
2133        let len = indices.len();
2134        let guard = self
2135            .metrics
2136            .checkpoint_db_commit_latency_event_indices
2137            .start_timer();
2138        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2139
2140        let futures = chunks.into_iter().map(|chunk| {
2141            self.spawn_task(move |this: Self| async move {
2142                this.persist_event_indices_chunk(chunk).await
2143            })
2144        });
2145
2146        futures::future::try_join_all(futures)
2147            .await
2148            .map_err(|e| {
2149                tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
2150                IndexerError::from(e)
2151            })?
2152            .into_iter()
2153            .collect::<Result<Vec<_>, _>>()
2154            .map_err(|e| {
2155                IndexerError::PostgresWrite(format!(
2156                    "Failed to persist all event_indices chunks: {e:?}"
2157                ))
2158            })?;
2159        let elapsed = guard.stop_and_record();
2160        info!(elapsed, "Persisted {} event_indices chunks", len);
2161        Ok(())
2162    }
2163
2164    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2165        self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2166            .await
2167    }
2168
2169    async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2170        self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2171            .await
2172    }
2173
2174    async fn get_network_total_transactions_by_end_of_epoch(
2175        &self,
2176        epoch: u64,
2177    ) -> Result<Option<u64>, IndexerError> {
2178        self.execute_in_blocking_worker(move |this| {
2179            this.get_network_total_transactions_by_end_of_epoch(epoch)
2180        })
2181        .await
2182    }
2183
2184    async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2185        self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2186            .await
2187    }
2188
2189    async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2190        &self,
2191        watermark: CommitterWatermark,
2192    ) -> Result<(), IndexerError>
2193    where
2194        E::Iterator: Iterator<Item: AsRef<str>>,
2195    {
2196        self.execute_in_blocking_worker(move |this| {
2197            this.update_watermarks_upper_bound::<E>(watermark)
2198        })
2199        .await
2200    }
2201
2202    fn as_any(&self) -> &dyn StdAny {
2203        self
2204    }
2205
2206    /// Persist protocol configs and feature flags until the protocol version
2207    /// for the latest epoch we have stored in the db, inclusive.
2208    fn persist_protocol_configs_and_feature_flags(
2209        &self,
2210        chain_id: Vec<u8>,
2211    ) -> Result<(), IndexerError> {
2212        let chain_id = ChainIdentifier::from(
2213            CheckpointDigest::from_bytes(chain_id).expect("unable to convert chain id"),
2214        );
2215
2216        let mut all_configs = vec![];
2217        let mut all_flags = vec![];
2218
2219        let (start_version, end_version) = self.get_protocol_version_index_range()?;
2220        info!(
2221            "Persisting protocol configs with start_version: {}, end_version: {}",
2222            start_version, end_version
2223        );
2224
2225        // Gather all protocol configs and feature flags for all versions between start
2226        // and end.
2227        for version in start_version..=end_version {
2228            let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2229                (version as u64).into(),
2230                chain_id.chain(),
2231            )
2232            .ok_or(IndexerError::Generic(format!(
2233                "Unable to fetch protocol version {} and chain {:?}",
2234                version,
2235                chain_id.chain()
2236            )))?;
2237            let configs_vec = protocol_configs
2238                .attr_map()
2239                .into_iter()
2240                .map(|(k, v)| StoredProtocolConfig {
2241                    protocol_version: version,
2242                    config_name: k,
2243                    config_value: v.map(|v| v.to_string()),
2244                })
2245                .collect::<Vec<_>>();
2246            all_configs.extend(configs_vec);
2247
2248            let feature_flags = protocol_configs
2249                .feature_map()
2250                .into_iter()
2251                .map(|(k, v)| StoredFeatureFlag {
2252                    protocol_version: version,
2253                    flag_name: k,
2254                    flag_value: v,
2255                })
2256                .collect::<Vec<_>>();
2257            all_flags.extend(feature_flags);
2258        }
2259
2260        transactional_blocking_with_retry!(
2261            &self.blocking_cp,
2262            |conn| {
2263                for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2264                    insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2265                }
2266                for flag_chunk in all_flags.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2267                    insert_or_ignore_into!(feature_flags::table, flag_chunk, conn);
2268                }
2269                Ok::<(), IndexerError>(())
2270            },
2271            PG_DB_COMMIT_SLEEP_DURATION
2272        )?;
2273        Ok(())
2274    }
2275
2276    async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2277        if indices.is_empty() {
2278            return Ok(());
2279        }
2280        let len = indices.len();
2281        let guard = self
2282            .metrics
2283            .checkpoint_db_commit_latency_tx_indices
2284            .start_timer();
2285        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2286
2287        let futures = chunks.into_iter().map(|chunk| {
2288            self.spawn_task(move |this: Self| async move {
2289                this.persist_tx_indices_chunk_v2(chunk).await
2290            })
2291        });
2292        futures::future::try_join_all(futures)
2293            .await
2294            .map_err(|e| {
2295                tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2296                IndexerError::from(e)
2297            })?
2298            .into_iter()
2299            .collect::<Result<Vec<_>, _>>()
2300            .map_err(|e| {
2301                IndexerError::PostgresWrite(format!(
2302                    "Failed to persist all tx_indices chunks: {e:?}"
2303                ))
2304            })?;
2305        let elapsed = guard.stop_and_record();
2306        info!(elapsed, "Persisted {} tx_indices chunks", len);
2307        Ok(())
2308    }
2309
2310    async fn persist_checkpoint_objects(
2311        &self,
2312        objects: Vec<CheckpointObjectChanges>,
2313    ) -> Result<(), IndexerError> {
2314        if objects.is_empty() {
2315            return Ok(());
2316        }
2317        let guard = self
2318            .metrics
2319            .checkpoint_db_commit_latency_objects
2320            .start_timer();
2321        let CheckpointObjectChanges {
2322            changed_objects: mutations,
2323            deleted_objects: deletions,
2324        } = retain_latest_objects_from_checkpoint_batch(objects);
2325        let mutation_len = mutations.len();
2326        let deletion_len = deletions.len();
2327
2328        let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2329        let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2330        let mutation_futures = mutation_chunks
2331            .into_iter()
2332            .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2333        let deletion_futures = deletion_chunks
2334            .into_iter()
2335            .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2336        futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2337            .await
2338            .map_err(|e| {
2339                tracing::error!("failed to join futures for persisting objects: {e}");
2340                IndexerError::from(e)
2341            })?
2342            .into_iter()
2343            .collect::<Result<Vec<_>, _>>()
2344            .map_err(|e| {
2345                IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2346            })?;
2347
2348        let elapsed = guard.stop_and_record();
2349        info!(
2350            elapsed,
2351            "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2352        );
2353        Ok(())
2354    }
2355
2356    async fn persist_tx_global_order(
2357        &self,
2358        tx_order: Vec<TxGlobalOrder>,
2359    ) -> Result<(), IndexerError> {
2360        let guard = self
2361            .metrics
2362            .checkpoint_db_commit_latency_tx_insertion_order
2363            .start_timer();
2364        let len = tx_order.len();
2365
2366        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2367        let futures = chunks
2368            .into_iter()
2369            .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2370
2371        futures::future::try_join_all(futures)
2372            .await
2373            .map_err(|e| {
2374                tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2375                IndexerError::from(e)
2376            })?
2377            .into_iter()
2378            .collect::<Result<Vec<_>, _>>()
2379            .map_err(|e| {
2380                IndexerError::PostgresWrite(format!(
2381                    "Failed to persist all txs insertion order chunks: {e:?}",
2382                ))
2383            })?;
2384        let elapsed = guard.stop_and_record();
2385        info!(elapsed, "Persisted {len} txs insertion orders");
2386        Ok(())
2387    }
2388
2389    async fn update_watermarks_lower_bound(
2390        &self,
2391        watermarks: Vec<(PrunableTable, u64)>,
2392    ) -> Result<(), IndexerError> {
2393        self.execute_in_blocking_worker(move |this| this.update_watermarks_lower_bound(watermarks))
2394            .await
2395    }
2396
2397    async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2398        self.execute_in_blocking_worker(move |this| this.get_watermarks())
2399            .await
2400    }
2401
2402    async fn prune_table_by_checkpoint_range(
2403        &self,
2404        table: &crate::pruning::pruner::PrunableTable,
2405        min_checkpoint: u64,
2406        max_checkpoint: u64,
2407    ) -> Result<(), IndexerError> {
2408        let table_clone = *table;
2409        self.execute_in_blocking_worker(move |this| {
2410            this.prune_table_by_checkpoint_range(&table_clone, min_checkpoint, max_checkpoint)
2411        })
2412        .await
2413    }
2414
2415    async fn prune_table_by_tx_range(
2416        &self,
2417        table: &crate::pruning::pruner::PrunableTable,
2418        min_tx: u64,
2419        max_tx: u64,
2420    ) -> Result<(), IndexerError> {
2421        let table_clone = *table;
2422        self.execute_in_blocking_worker(move |this| {
2423            this.prune_single_tx_or_event_table(&table_clone, min_tx, max_tx)
2424        })
2425        .await
2426    }
2427
2428    async fn prune_table_by_global_seq_with_limit(
2429        &self,
2430        table: &crate::pruning::pruner::PrunableTable,
2431        start: u64,
2432        end: u64,
2433        limit: i64,
2434    ) -> Result<usize, IndexerError> {
2435        use crate::pruning::pruner::PrunableTable;
2436
2437        if !matches!(table, PrunableTable::OptimisticTransactions) {
2438            return Err(IndexerError::InvalidArgument(format!(
2439                "table {} does not support pruning by global order with limit",
2440                table.as_ref()
2441            )));
2442        }
2443
2444        self.execute_in_blocking_worker(move |this| {
2445            this.prune_optimistic_tx_by_global_seq(start, end, limit)
2446        })
2447        .await
2448    }
2449
2450    async fn update_watermark_lowest_unpruned_key(
2451        &self,
2452        table: &PrunableTable,
2453        lowest_unpruned_key: u64,
2454    ) -> Result<(), IndexerError> {
2455        let table = *table;
2456        self.execute_in_blocking_worker(move |this| {
2457            this.update_watermark_lowest_unpruned_key(&table, lowest_unpruned_key)
2458        })
2459        .await
2460    }
2461
2462    async fn get_watermark_by_entity(
2463        &self,
2464        entity: String,
2465    ) -> Result<Option<StoredWatermark>, IndexerError> {
2466        self.execute_in_blocking_worker(move |this| this.get_watermark_by_entity(&entity))
2467            .await
2468    }
2469}
2470
2471fn make_objects_history_to_commit(
2472    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2473) -> Result<Vec<StoredHistoryObject>, IndexerError> {
2474    let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2475        .clone()
2476        .into_iter()
2477        .flat_map(|changes| changes.deleted_objects)
2478        .map(|o| o.into())
2479        .collect();
2480    let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2481        .into_iter()
2482        .flat_map(|changes| changes.changed_objects)
2483        .map(StoredHistoryObject::try_from)
2484        .collect::<Result<Vec<_>, _>>()?;
2485    Ok(deleted_objects.into_iter().chain(mutated_objects).collect())
2486}
2487
2488/// Partitions object changes into deletions and mutations.
2489///
2490/// Retains only the highest version of each object among deletions and
2491/// mutations. This allows concurrent insertion into the DB of the resulting
2492/// partitions.
2493fn retain_latest_indexed_objects(
2494    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2495) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2496    use std::collections::HashMap;
2497
2498    let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2499    let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2500
2501    for change in tx_object_changes {
2502        // Remove mutation / deletion with a following deletion / mutation,
2503        // as we expect that following deletion / mutation has a higher version.
2504        // Technically, assertions below are not required, double check just in case.
2505        for mutation in change.changed_objects {
2506            let id = mutation.object.id();
2507            let version = mutation.object.version();
2508
2509            if let Some(existing) = deletions.remove(&id) {
2510                assert!(
2511                    existing.object_version < version,
2512                    "mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2513                    existing.object_version
2514                );
2515            }
2516
2517            if let Some(existing) = mutations.insert(id, mutation) {
2518                assert!(
2519                    existing.object.version() < version,
2520                    "mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2521                    existing.object.version()
2522                );
2523            }
2524        }
2525        // Handle deleted objects
2526        for deletion in change.deleted_objects {
2527            let id = deletion.object_id;
2528            let version = deletion.object_version;
2529
2530            if let Some(existing) = mutations.remove(&id) {
2531                assert!(
2532                    existing.object.version() < version,
2533                    "deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2534                    existing.object.version(),
2535                );
2536            }
2537
2538            if let Some(existing) = deletions.insert(id, deletion) {
2539                assert!(
2540                    existing.object_version < version,
2541                    "deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2542                    existing.object_version
2543                );
2544            }
2545        }
2546    }
2547
2548    (
2549        mutations.into_values().collect(),
2550        deletions.into_values().collect(),
2551    )
2552}