iota_indexer/store/
pg_indexer_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use core::result::Result::Ok;
6use std::{any::Any as StdAny, collections::BTreeMap, time::Duration};
7
8use async_trait::async_trait;
9use diesel::{
10    ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
11    dsl::{max, min},
12    sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
13    upsert::excluded,
14};
15use downcast::Any;
16use iota_protocol_config::ProtocolConfig;
17use iota_types::{
18    base_types::ObjectID,
19    digests::{ChainIdentifier, CheckpointDigest},
20};
21use itertools::Itertools;
22use tap::TapFallible;
23use tracing::info;
24
25use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
26use crate::{
27    db::ConnectionPool,
28    errors::{Context, IndexerError},
29    handlers::{EpochToCommit, TransactionObjectChangesToCommit},
30    insert_or_ignore_into,
31    metrics::IndexerMetrics,
32    models::{
33        checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
34        display::StoredDisplay,
35        epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
36        events::StoredEvent,
37        obj_indices::StoredObjectVersion,
38        objects::{
39            StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
40            StoredObjects,
41        },
42        packages::StoredPackage,
43        transactions::{
44            CheckpointTxGlobalOrder, IndexStatus, OptimisticTransaction, StoredTransaction,
45        },
46        tx_indices::TxIndexV2Split,
47    },
48    on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
49    persist_chunk_into_table_in_existing_connection, read_only_blocking,
50    rolling::transform::{
51        CheckpointObjectChanges, LiveObject, RemovedObject,
52        retain_latest_objects_from_checkpoint_batch,
53    },
54    schema::{
55        chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
56        event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
57        event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
58        objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
59        transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
60        tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
61        tx_wrapped_or_deleted_objects,
62    },
63    store::{IndexerStore, IndexerStoreExt},
64    transactional_blocking_with_retry,
65    types::{
66        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
67        IndexedPackage, IndexedTransaction, TxIndex, TxIndexV2,
68    },
69};
70
71#[macro_export]
72macro_rules! chunk {
73    ($data: expr, $size: expr) => {{
74        $data
75            .into_iter()
76            .chunks($size)
77            .into_iter()
78            .map(|c| c.collect())
79            .collect::<Vec<Vec<_>>>()
80    }};
81}
82
83macro_rules! prune_tx_or_event_indice_table {
84    ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
85        diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
86            .execute($conn)
87            .map_err(IndexerError::from)
88            .context($context_msg)?;
89    };
90}
91
92// In one DB transaction, the update could be chunked into
93// a few statements, this is the amount of rows to update in one statement
94// TODO: I think with the `per_db_tx` params, `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX`
95// is now less relevant. We should do experiments and remove it if it's true.
96const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
97// The amount of rows to update in one DB transaction
98const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
99// The amount of rows to update in one DB transaction, for objects particularly
100// Having this number too high may cause many db deadlocks because of
101// optimistic locking.
102const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
103const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
104
105#[derive(Clone)]
106pub struct PgIndexerStoreConfig {
107    pub parallel_chunk_size: usize,
108    pub parallel_objects_chunk_size: usize,
109}
110
111pub struct PgIndexerStore {
112    blocking_cp: ConnectionPool,
113    metrics: IndexerMetrics,
114    partition_manager: PgPartitionManager,
115    config: PgIndexerStoreConfig,
116}
117
118impl Clone for PgIndexerStore {
119    fn clone(&self) -> PgIndexerStore {
120        Self {
121            blocking_cp: self.blocking_cp.clone(),
122            metrics: self.metrics.clone(),
123            partition_manager: self.partition_manager.clone(),
124            config: self.config.clone(),
125        }
126    }
127}
128
129impl PgIndexerStore {
130    pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
131        let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
132            .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
133            .parse::<usize>()
134            .unwrap();
135        let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
136            .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
137            .parse::<usize>()
138            .unwrap();
139        let partition_manager = PgPartitionManager::new(blocking_cp.clone())
140            .expect("Failed to initialize partition manager");
141        let config = PgIndexerStoreConfig {
142            parallel_chunk_size,
143            parallel_objects_chunk_size,
144        };
145
146        Self {
147            blocking_cp,
148            metrics,
149            partition_manager,
150            config,
151        }
152    }
153
154    pub fn blocking_cp(&self) -> ConnectionPool {
155        self.blocking_cp.clone()
156    }
157
158    pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
159        read_only_blocking!(&self.blocking_cp, |conn| {
160            epochs::dsl::epochs
161                .select(max(epochs::epoch))
162                .first::<Option<i64>>(conn)
163                .map(|v| v.map(|v| v as u64))
164        })
165        .context("Failed reading latest epoch id from PostgresDB")
166    }
167
168    /// Get the range of the protocol versions that need to be indexed.
169    pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
170        // We start indexing from the next protocol version after the latest one stored
171        // in the db.
172        let start = read_only_blocking!(&self.blocking_cp, |conn| {
173            protocol_configs::dsl::protocol_configs
174                .select(max(protocol_configs::protocol_version))
175                .first::<Option<i64>>(conn)
176        })
177        .context("Failed reading latest protocol version from PostgresDB")?
178        .map_or(1, |v| v + 1);
179
180        // We end indexing at the protocol version of the latest epoch stored in the db.
181        let end = read_only_blocking!(&self.blocking_cp, |conn| {
182            epochs::dsl::epochs
183                .select(max(epochs::protocol_version))
184                .first::<Option<i64>>(conn)
185        })
186        .context("Failed reading latest epoch protocol version from PostgresDB")?
187        .unwrap_or(1);
188        Ok((start, end))
189    }
190
191    pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
192        read_only_blocking!(&self.blocking_cp, |conn| {
193            chain_identifier::dsl::chain_identifier
194                .select(chain_identifier::checkpoint_digest)
195                .first::<Vec<u8>>(conn)
196                .optional()
197        })
198        .context("Failed reading chain id from PostgresDB")
199    }
200
201    fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
202        read_only_blocking!(&self.blocking_cp, |conn| {
203            checkpoints::dsl::checkpoints
204                .select(max(checkpoints::sequence_number))
205                .first::<Option<i64>>(conn)
206                .map(|v| v.map(|v| v as u64))
207        })
208        .context("Failed reading latest checkpoint sequence number from PostgresDB")
209    }
210
211    fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
212        read_only_blocking!(&self.blocking_cp, |conn| {
213            checkpoints::dsl::checkpoints
214                .select((
215                    min(checkpoints::sequence_number),
216                    max(checkpoints::sequence_number),
217                ))
218                .first::<(Option<i64>, Option<i64>)>(conn)
219                .map(|(min, max)| {
220                    (
221                        min.unwrap_or_default() as u64,
222                        max.unwrap_or_default() as u64,
223                    )
224                })
225        })
226        .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
227    }
228
229    fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
230        read_only_blocking!(&self.blocking_cp, |conn| {
231            epochs::dsl::epochs
232                .select((min(epochs::epoch), max(epochs::epoch)))
233                .first::<(Option<i64>, Option<i64>)>(conn)
234                .map(|(min, max)| {
235                    (
236                        min.unwrap_or_default() as u64,
237                        max.unwrap_or_default() as u64,
238                    )
239                })
240        })
241        .context("Failed reading min and max epoch numbers from PostgresDB")
242    }
243
244    fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
245        read_only_blocking!(&self.blocking_cp, |conn| {
246            pruner_cp_watermark::dsl::pruner_cp_watermark
247                .select(min(pruner_cp_watermark::checkpoint_sequence_number))
248                .first::<Option<i64>>(conn)
249                .map(|v| v.unwrap_or_default() as u64)
250        })
251        .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
252    }
253
254    fn get_checkpoint_range_for_epoch(
255        &self,
256        epoch: u64,
257    ) -> Result<(u64, Option<u64>), IndexerError> {
258        read_only_blocking!(&self.blocking_cp, |conn| {
259            epochs::dsl::epochs
260                .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
261                .filter(epochs::epoch.eq(epoch as i64))
262                .first::<(i64, Option<i64>)>(conn)
263                .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
264        })
265        .context("Failed reading checkpoint range from PostgresDB")
266    }
267
268    fn get_transaction_range_for_checkpoint(
269        &self,
270        checkpoint: u64,
271    ) -> Result<(u64, u64), IndexerError> {
272        read_only_blocking!(&self.blocking_cp, |conn| {
273            pruner_cp_watermark::dsl::pruner_cp_watermark
274                .select((
275                    pruner_cp_watermark::min_tx_sequence_number,
276                    pruner_cp_watermark::max_tx_sequence_number,
277                ))
278                .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
279                .first::<(i64, i64)>(conn)
280                .map(|(min, max)| (min as u64, max as u64))
281        })
282        .context("Failed reading transaction range from PostgresDB")
283    }
284
285    fn get_latest_object_snapshot_checkpoint_sequence_number(
286        &self,
287    ) -> Result<Option<u64>, IndexerError> {
288        read_only_blocking!(&self.blocking_cp, |conn| {
289            objects_snapshot::dsl::objects_snapshot
290                .select(max(objects_snapshot::checkpoint_sequence_number))
291                .first::<Option<i64>>(conn)
292                .map(|v| v.map(|v| v as u64))
293        })
294        .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
295    }
296
297    fn persist_display_updates(
298        &self,
299        display_updates: BTreeMap<String, StoredDisplay>,
300    ) -> Result<(), IndexerError> {
301        transactional_blocking_with_retry!(
302            &self.blocking_cp,
303            {
304                let value = display_updates.values().collect::<Vec<_>>();
305                |conn| self.persist_displays_in_existing_transaction(conn, value)
306            },
307            PG_DB_COMMIT_SLEEP_DURATION
308        )?;
309
310        Ok(())
311    }
312
313    fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
314        let guard = self
315            .metrics
316            .checkpoint_db_commit_latency_objects_chunks
317            .start_timer();
318        let len = objects.len();
319        let raw_query = r#"
320            INSERT INTO objects (
321                object_id,
322                object_version,
323                object_digest,
324                owner_type,
325                owner_id,
326                object_type,
327                object_type_package,
328                object_type_module,
329                object_type_name,
330                serialized_object,
331                coin_type,
332                coin_balance,
333                df_kind
334            )
335            SELECT
336                u.object_id,
337                u.object_version,
338                u.object_digest,
339                u.owner_type,
340                u.owner_id,
341                u.object_type,
342                u.object_type_package,
343                u.object_type_module,
344                u.object_type_name,
345                u.serialized_object,
346                u.coin_type,
347                u.coin_balance,
348                u.df_kind
349            FROM UNNEST(
350                $1::BYTEA[],
351                $2::BIGINT[],
352                $3::BYTEA[],
353                $4::SMALLINT[],
354                $5::BYTEA[],
355                $6::TEXT[],
356                $7::BYTEA[],
357                $8::TEXT[],
358                $9::TEXT[],
359                $10::BYTEA[],
360                $11::TEXT[],
361                $12::BIGINT[],
362                $13::SMALLINT[],
363                $14::BYTEA[]
364            ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest)
365            LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
366            WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
367            ON CONFLICT (object_id) DO UPDATE
368            SET
369                object_version = EXCLUDED.object_version,
370                object_digest = EXCLUDED.object_digest,
371                owner_type = EXCLUDED.owner_type,
372                owner_id = EXCLUDED.owner_id,
373                object_type = EXCLUDED.object_type,
374                object_type_package = EXCLUDED.object_type_package,
375                object_type_module = EXCLUDED.object_type_module,
376                object_type_name = EXCLUDED.object_type_name,
377                serialized_object = EXCLUDED.serialized_object,
378                coin_type = EXCLUDED.coin_type,
379                coin_balance = EXCLUDED.coin_balance,
380                df_kind = EXCLUDED.df_kind
381        "#;
382        let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
383            .into_iter()
384            .map(LiveObject::split)
385            .map(|(indexed_object, tx_digest)| {
386                (
387                    StoredObject::from(indexed_object),
388                    tx_digest.into_inner().to_vec(),
389                )
390            })
391            .unzip();
392        let query = diesel::sql_query(raw_query)
393            .bind::<Array<Bytea>, _>(objects.object_ids)
394            .bind::<Array<BigInt>, _>(objects.object_versions)
395            .bind::<Array<Bytea>, _>(objects.object_digests)
396            .bind::<Array<SmallInt>, _>(objects.owner_types)
397            .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
398            .bind::<Array<Nullable<Text>>, _>(objects.object_types)
399            .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
400            .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
401            .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
402            .bind::<Array<Bytea>, _>(objects.serialized_objects)
403            .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
404            .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
405            .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
406            .bind::<Array<Bytea>, _>(tx_digests);
407        transactional_blocking_with_retry!(
408            &self.blocking_cp,
409            |conn| {
410                query.clone().execute(conn)?;
411                Ok::<(), IndexerError>(())
412            },
413            PG_DB_COMMIT_SLEEP_DURATION
414        )
415        .tap_ok(|_| {
416            let elapsed = guard.stop_and_record();
417            info!(elapsed, "Persisted {len} chunked objects");
418        })
419        .tap_err(|e| {
420            tracing::error!("Failed to persist object mutations with error: {e}");
421        })
422    }
423
424    fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
425        let guard = self
426            .metrics
427            .checkpoint_db_commit_latency_objects_chunks
428            .start_timer();
429        let len = objects.len();
430        let raw_query = r#"
431            DELETE FROM objects
432            WHERE object_id IN (
433                SELECT u.object_id
434                FROM UNNEST(
435                    $1::BYTEA[],
436                    $2::BYTEA[]
437                ) AS u(object_id, tx_digest)
438                LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
439                WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
440            )
441        "#;
442        let (object_ids, tx_digests): (Vec<_>, Vec<_>) = objects
443            .into_iter()
444            .map(|removed_object| {
445                (
446                    removed_object.object_id().to_vec(),
447                    removed_object.transaction_digest.into_inner().to_vec(),
448                )
449            })
450            .unzip();
451        let query = diesel::sql_query(raw_query)
452            .bind::<Array<Bytea>, _>(object_ids)
453            .bind::<Array<Bytea>, _>(tx_digests);
454        transactional_blocking_with_retry!(
455            &self.blocking_cp,
456            |conn| {
457                query.clone().execute(conn)?;
458                Ok::<(), IndexerError>(())
459            },
460            PG_DB_COMMIT_SLEEP_DURATION
461        )
462        .tap_ok(|_| {
463            let elapsed = guard.stop_and_record();
464            info!(elapsed, "Deleted {len} chunked objects");
465        })
466        .tap_err(|e| {
467            tracing::error!("Failed to persist object deletions with error: {e}");
468        })
469    }
470
471    fn persist_object_mutation_chunk(
472        &self,
473        mutated_object_mutation_chunk: Vec<StoredObject>,
474    ) -> Result<(), IndexerError> {
475        let guard = self
476            .metrics
477            .checkpoint_db_commit_latency_objects_chunks
478            .start_timer();
479        let len = mutated_object_mutation_chunk.len();
480        transactional_blocking_with_retry!(
481            &self.blocking_cp,
482            |conn| {
483                self.persist_object_mutation_chunk_in_existing_transaction(
484                    conn,
485                    mutated_object_mutation_chunk.clone(),
486                )
487            },
488            PG_DB_COMMIT_SLEEP_DURATION
489        )
490        .tap_ok(|_| {
491            let elapsed = guard.stop_and_record();
492            info!(elapsed, "Persisted {} chunked objects", len);
493        })
494        .tap_err(|e| {
495            tracing::error!("Failed to persist object mutations with error: {}", e);
496        })
497    }
498
499    fn persist_object_mutation_chunk_in_existing_transaction(
500        &self,
501        conn: &mut PgConnection,
502        mutated_object_mutation_chunk: Vec<StoredObject>,
503    ) -> Result<(), IndexerError> {
504        on_conflict_do_update!(
505            objects::table,
506            mutated_object_mutation_chunk,
507            objects::object_id,
508            (
509                objects::object_id.eq(excluded(objects::object_id)),
510                objects::object_version.eq(excluded(objects::object_version)),
511                objects::object_digest.eq(excluded(objects::object_digest)),
512                objects::owner_type.eq(excluded(objects::owner_type)),
513                objects::owner_id.eq(excluded(objects::owner_id)),
514                objects::object_type.eq(excluded(objects::object_type)),
515                objects::serialized_object.eq(excluded(objects::serialized_object)),
516                objects::coin_type.eq(excluded(objects::coin_type)),
517                objects::coin_balance.eq(excluded(objects::coin_balance)),
518                objects::df_kind.eq(excluded(objects::df_kind)),
519            ),
520            conn
521        );
522        Ok::<(), IndexerError>(())
523    }
524
525    fn persist_object_deletion_chunk(
526        &self,
527        deleted_objects_chunk: Vec<StoredDeletedObject>,
528    ) -> Result<(), IndexerError> {
529        let guard = self
530            .metrics
531            .checkpoint_db_commit_latency_objects_chunks
532            .start_timer();
533        let len = deleted_objects_chunk.len();
534        transactional_blocking_with_retry!(
535            &self.blocking_cp,
536            |conn| {
537                self.persist_object_deletion_chunk_in_existing_transaction(
538                    conn,
539                    deleted_objects_chunk.clone(),
540                )
541            },
542            PG_DB_COMMIT_SLEEP_DURATION
543        )
544        .tap_ok(|_| {
545            let elapsed = guard.stop_and_record();
546            info!(elapsed, "Deleted {} chunked objects", len);
547        })
548        .tap_err(|e| {
549            tracing::error!("Failed to persist object deletions with error: {}", e);
550        })
551    }
552
553    fn persist_object_deletion_chunk_in_existing_transaction(
554        &self,
555        conn: &mut PgConnection,
556        deleted_objects_chunk: Vec<StoredDeletedObject>,
557    ) -> Result<(), IndexerError> {
558        diesel::delete(
559            objects::table.filter(
560                objects::object_id.eq_any(
561                    deleted_objects_chunk
562                        .iter()
563                        .map(|o| o.object_id.clone())
564                        .collect::<Vec<_>>(),
565                ),
566            ),
567        )
568        .execute(conn)
569        .map_err(IndexerError::from)
570        .context("Failed to write object deletion to PostgresDB")?;
571
572        Ok::<(), IndexerError>(())
573    }
574
575    fn backfill_objects_snapshot_chunk(
576        &self,
577        objects_snapshot: Vec<StoredObjectSnapshot>,
578    ) -> Result<(), IndexerError> {
579        let guard = self
580            .metrics
581            .checkpoint_db_commit_latency_objects_snapshot_chunks
582            .start_timer();
583        transactional_blocking_with_retry!(
584            &self.blocking_cp,
585            |conn| {
586                for objects_snapshot_chunk in
587                    objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
588                {
589                    on_conflict_do_update!(
590                        objects_snapshot::table,
591                        objects_snapshot_chunk,
592                        objects_snapshot::object_id,
593                        (
594                            objects_snapshot::object_version
595                                .eq(excluded(objects_snapshot::object_version)),
596                            objects_snapshot::object_status
597                                .eq(excluded(objects_snapshot::object_status)),
598                            objects_snapshot::object_digest
599                                .eq(excluded(objects_snapshot::object_digest)),
600                            objects_snapshot::checkpoint_sequence_number
601                                .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
602                            objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
603                            objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
604                            objects_snapshot::object_type_package
605                                .eq(excluded(objects_snapshot::object_type_package)),
606                            objects_snapshot::object_type_module
607                                .eq(excluded(objects_snapshot::object_type_module)),
608                            objects_snapshot::object_type_name
609                                .eq(excluded(objects_snapshot::object_type_name)),
610                            objects_snapshot::object_type
611                                .eq(excluded(objects_snapshot::object_type)),
612                            objects_snapshot::serialized_object
613                                .eq(excluded(objects_snapshot::serialized_object)),
614                            objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
615                            objects_snapshot::coin_balance
616                                .eq(excluded(objects_snapshot::coin_balance)),
617                            objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
618                        ),
619                        conn
620                    );
621                }
622                Ok::<(), IndexerError>(())
623            },
624            PG_DB_COMMIT_SLEEP_DURATION
625        )
626        .tap_ok(|_| {
627            let elapsed = guard.stop_and_record();
628            info!(
629                elapsed,
630                "Persisted {} chunked objects snapshot",
631                objects_snapshot.len(),
632            );
633        })
634        .tap_err(|e| {
635            tracing::error!("Failed to persist object snapshot with error: {}", e);
636        })
637    }
638
639    fn persist_objects_history_chunk(
640        &self,
641        stored_objects_history: Vec<StoredHistoryObject>,
642    ) -> Result<(), IndexerError> {
643        let guard = self
644            .metrics
645            .checkpoint_db_commit_latency_objects_history_chunks
646            .start_timer();
647        transactional_blocking_with_retry!(
648            &self.blocking_cp,
649            |conn| {
650                for stored_objects_history_chunk in
651                    stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
652                {
653                    insert_or_ignore_into!(
654                        objects_history::table,
655                        stored_objects_history_chunk,
656                        conn
657                    );
658                }
659                Ok::<(), IndexerError>(())
660            },
661            PG_DB_COMMIT_SLEEP_DURATION
662        )
663        .tap_ok(|_| {
664            let elapsed = guard.stop_and_record();
665            info!(
666                elapsed,
667                "Persisted {} chunked objects history",
668                stored_objects_history.len(),
669            );
670        })
671        .tap_err(|e| {
672            tracing::error!("Failed to persist object history with error: {}", e);
673        })
674    }
675
676    fn persist_object_version_chunk(
677        &self,
678        object_versions: Vec<StoredObjectVersion>,
679    ) -> Result<(), IndexerError> {
680        let guard = self
681            .metrics
682            .checkpoint_db_commit_latency_objects_version_chunks
683            .start_timer();
684
685        transactional_blocking_with_retry!(
686            &self.blocking_cp,
687            |conn| {
688                for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
689                {
690                    insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
691                }
692                Ok::<(), IndexerError>(())
693            },
694            PG_DB_COMMIT_SLEEP_DURATION
695        )
696        .tap_ok(|_| {
697            let elapsed = guard.stop_and_record();
698            info!(
699                elapsed,
700                "Persisted {} chunked object versions",
701                object_versions.len(),
702            );
703        })
704        .tap_err(|e| {
705            tracing::error!("Failed to persist object versions with error: {e}");
706        })
707    }
708
709    fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
710        let Some(first_checkpoint) = checkpoints.first() else {
711            return Ok(());
712        };
713
714        // If the first checkpoint has sequence number 0, we need to persist the digest
715        // as chain identifier.
716        if first_checkpoint.sequence_number == 0 {
717            let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
718            self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
719            transactional_blocking_with_retry!(
720                &self.blocking_cp,
721                |conn| {
722                    let checkpoint_digest =
723                        first_checkpoint.checkpoint_digest.into_inner().to_vec();
724                    insert_or_ignore_into!(
725                        chain_identifier::table,
726                        StoredChainIdentifier { checkpoint_digest },
727                        conn
728                    );
729                    Ok::<(), IndexerError>(())
730                },
731                PG_DB_COMMIT_SLEEP_DURATION
732            )?;
733        }
734        let guard = self
735            .metrics
736            .checkpoint_db_commit_latency_checkpoints
737            .start_timer();
738
739        let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
740        transactional_blocking_with_retry!(
741            &self.blocking_cp,
742            |conn| {
743                for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
744                    insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
745                }
746                Ok::<(), IndexerError>(())
747            },
748            PG_DB_COMMIT_SLEEP_DURATION
749        )
750        .tap_ok(|_| {
751            info!(
752                "Persisted {} pruner_cp_watermark rows.",
753                stored_cp_txs.len(),
754            );
755        })
756        .tap_err(|e| {
757            tracing::error!("Failed to persist pruner_cp_watermark with error: {}", e);
758        })?;
759
760        let stored_checkpoints = checkpoints
761            .iter()
762            .map(StoredCheckpoint::from)
763            .collect::<Vec<_>>();
764        transactional_blocking_with_retry!(
765            &self.blocking_cp,
766            |conn| {
767                for stored_checkpoint_chunk in
768                    stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
769                {
770                    insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
771                    let time_now_ms = chrono::Utc::now().timestamp_millis();
772                    for stored_checkpoint in stored_checkpoint_chunk {
773                        self.metrics
774                            .db_commit_lag_ms
775                            .set(time_now_ms - stored_checkpoint.timestamp_ms);
776                        self.metrics.max_committed_checkpoint_sequence_number.set(
777                            stored_checkpoint.sequence_number,
778                        );
779                        self.metrics.committed_checkpoint_timestamp_ms.set(
780                            stored_checkpoint.timestamp_ms,
781                        );
782                    }
783                    for stored_checkpoint in stored_checkpoint_chunk {
784                        info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
785                    }
786                }
787                Ok::<(), IndexerError>(())
788            },
789            PG_DB_COMMIT_SLEEP_DURATION
790        )
791        .tap_ok(|_| {
792            let elapsed = guard.stop_and_record();
793            info!(
794                elapsed,
795                "Persisted {} checkpoints",
796                stored_checkpoints.len()
797            );
798        })
799        .tap_err(|e| {
800            tracing::error!("Failed to persist checkpoints with error: {}", e);
801        })
802    }
803
804    fn persist_transactions_chunk(
805        &self,
806        transactions: Vec<IndexedTransaction>,
807    ) -> Result<(), IndexerError> {
808        let guard = self
809            .metrics
810            .checkpoint_db_commit_latency_transactions_chunks
811            .start_timer();
812        let transformation_guard = self
813            .metrics
814            .checkpoint_db_commit_latency_transactions_chunks_transformation
815            .start_timer();
816        let transactions = transactions
817            .iter()
818            .map(StoredTransaction::from)
819            .collect::<Vec<_>>();
820        drop(transformation_guard);
821
822        transactional_blocking_with_retry!(
823            &self.blocking_cp,
824            |conn| {
825                for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
826                    insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
827                }
828                Ok::<(), IndexerError>(())
829            },
830            PG_DB_COMMIT_SLEEP_DURATION
831        )
832        .tap_ok(|_| {
833            let elapsed = guard.stop_and_record();
834            info!(
835                elapsed,
836                "Persisted {} chunked transactions",
837                transactions.len()
838            );
839        })
840        .tap_err(|e| {
841            tracing::error!("Failed to persist transactions with error: {}", e);
842        })
843    }
844
845    fn persist_tx_global_order_chunk(
846        &self,
847        tx_order: Vec<CheckpointTxGlobalOrder>,
848    ) -> Result<(), IndexerError> {
849        let guard = self
850            .metrics
851            .checkpoint_db_commit_latency_tx_insertion_order_chunks
852            .start_timer();
853
854        transactional_blocking_with_retry!(
855            &self.blocking_cp,
856            |conn| {
857                for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
858                    insert_or_ignore_into!(tx_global_order::table, tx_order_chunk, conn);
859                }
860                Ok::<(), IndexerError>(())
861            },
862            PG_DB_COMMIT_SLEEP_DURATION
863        )
864        .tap_ok(|_| {
865            let elapsed = guard.stop_and_record();
866            info!(
867                elapsed,
868                "Persisted {} chunked txs insertion order",
869                tx_order.len()
870            );
871        })
872        .tap_err(|e| {
873            tracing::error!("Failed to persist txs insertion order with error: {e}");
874        })
875    }
876
877    /// We enforce index-status semantics for checkpointed transactions
878    /// in `tx_global_order`.
879    ///
880    /// Namely, checkpointed transactions (i.e. with `optimistic_sequence_number
881    /// == 0`) are updated to `optimistic_sequence_number == -1` to indicate
882    /// that they have been persisted in the database.
883    fn update_status_for_checkpoint_transactions_chunk(
884        &self,
885        tx_order: Vec<CheckpointTxGlobalOrder>,
886    ) -> Result<(), IndexerError> {
887        let guard = self
888            .metrics
889            .checkpoint_db_commit_latency_tx_insertion_order_chunks
890            .start_timer();
891
892        let num_transactions = tx_order.len();
893        transactional_blocking_with_retry!(
894            &self.blocking_cp,
895            |conn| {
896                on_conflict_do_update_with_condition!(
897                    tx_global_order::table,
898                    tx_order.clone(),
899                    tx_global_order::tx_digest,
900                    tx_global_order::optimistic_sequence_number.eq(IndexStatus::Completed),
901                    tx_global_order::optimistic_sequence_number.eq(IndexStatus::Started),
902                    conn
903                );
904                on_conflict_do_update_with_condition!(
905                    tx_global_order::table,
906                    tx_order.clone(),
907                    tx_global_order::tx_digest,
908                    tx_global_order::chk_tx_sequence_number
909                        .eq(excluded(tx_global_order::chk_tx_sequence_number)),
910                    tx_global_order::chk_tx_sequence_number.is_null(),
911                    conn
912                );
913                Ok::<(), IndexerError>(())
914            },
915            PG_DB_COMMIT_SLEEP_DURATION
916        )
917        .tap_ok(|_| {
918            let elapsed = guard.stop_and_record();
919            info!(
920                elapsed,
921                "Updated {} chunked values of `tx_global_order`", num_transactions
922            );
923        })
924        .tap_err(|e| {
925            tracing::error!("Failed to update `tx_global_order` with error: {e}");
926        })
927    }
928
929    fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
930        let guard = self
931            .metrics
932            .checkpoint_db_commit_latency_events_chunks
933            .start_timer();
934        let len = events.len();
935        let events = events
936            .into_iter()
937            .map(StoredEvent::from)
938            .collect::<Vec<_>>();
939
940        transactional_blocking_with_retry!(
941            &self.blocking_cp,
942            |conn| {
943                for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
944                    insert_or_ignore_into!(events::table, event_chunk, conn);
945                }
946                Ok::<(), IndexerError>(())
947            },
948            PG_DB_COMMIT_SLEEP_DURATION
949        )
950        .tap_ok(|_| {
951            let elapsed = guard.stop_and_record();
952            info!(elapsed, "Persisted {} chunked events", len);
953        })
954        .tap_err(|e| {
955            tracing::error!("Failed to persist events with error: {}", e);
956        })
957    }
958
959    fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
960        if packages.is_empty() {
961            return Ok(());
962        }
963        let guard = self
964            .metrics
965            .checkpoint_db_commit_latency_packages
966            .start_timer();
967        let packages = packages
968            .into_iter()
969            .map(StoredPackage::from)
970            .collect::<Vec<_>>();
971        transactional_blocking_with_retry!(
972            &self.blocking_cp,
973            |conn| {
974                for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
975                    on_conflict_do_update!(
976                        packages::table,
977                        packages_chunk,
978                        packages::package_id,
979                        (
980                            packages::package_id.eq(excluded(packages::package_id)),
981                            packages::move_package.eq(excluded(packages::move_package)),
982                        ),
983                        conn
984                    );
985                }
986                Ok::<(), IndexerError>(())
987            },
988            PG_DB_COMMIT_SLEEP_DURATION
989        )
990        .tap_ok(|_| {
991            let elapsed = guard.stop_and_record();
992            info!(elapsed, "Persisted {} packages", packages.len());
993        })
994        .tap_err(|e| {
995            tracing::error!("Failed to persist packages with error: {}", e);
996        })
997    }
998
999    async fn persist_event_indices_chunk(
1000        &self,
1001        indices: Vec<EventIndex>,
1002    ) -> Result<(), IndexerError> {
1003        let guard = self
1004            .metrics
1005            .checkpoint_db_commit_latency_event_indices_chunks
1006            .start_timer();
1007        let len = indices.len();
1008        let (
1009            event_emit_packages,
1010            event_emit_modules,
1011            event_senders,
1012            event_struct_packages,
1013            event_struct_modules,
1014            event_struct_names,
1015            event_struct_instantiations,
1016        ) = indices.into_iter().map(|i| i.split()).fold(
1017            (
1018                Vec::new(),
1019                Vec::new(),
1020                Vec::new(),
1021                Vec::new(),
1022                Vec::new(),
1023                Vec::new(),
1024                Vec::new(),
1025            ),
1026            |(
1027                mut event_emit_packages,
1028                mut event_emit_modules,
1029                mut event_senders,
1030                mut event_struct_packages,
1031                mut event_struct_modules,
1032                mut event_struct_names,
1033                mut event_struct_instantiations,
1034            ),
1035             index| {
1036                event_emit_packages.push(index.0);
1037                event_emit_modules.push(index.1);
1038                event_senders.push(index.2);
1039                event_struct_packages.push(index.3);
1040                event_struct_modules.push(index.4);
1041                event_struct_names.push(index.5);
1042                event_struct_instantiations.push(index.6);
1043                (
1044                    event_emit_packages,
1045                    event_emit_modules,
1046                    event_senders,
1047                    event_struct_packages,
1048                    event_struct_modules,
1049                    event_struct_names,
1050                    event_struct_instantiations,
1051                )
1052            },
1053        );
1054
1055        // Now persist all the event indices in parallel into their tables.
1056        let mut futures = vec![];
1057        futures.push(self.spawn_blocking_task(move |this| {
1058            persist_chunk_into_table!(
1059                event_emit_package::table,
1060                event_emit_packages,
1061                &this.blocking_cp
1062            )
1063        }));
1064
1065        futures.push(self.spawn_blocking_task(move |this| {
1066            persist_chunk_into_table!(
1067                event_emit_module::table,
1068                event_emit_modules,
1069                &this.blocking_cp
1070            )
1071        }));
1072
1073        futures.push(self.spawn_blocking_task(move |this| {
1074            persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
1075        }));
1076
1077        futures.push(self.spawn_blocking_task(move |this| {
1078            persist_chunk_into_table!(
1079                event_struct_package::table,
1080                event_struct_packages,
1081                &this.blocking_cp
1082            )
1083        }));
1084
1085        futures.push(self.spawn_blocking_task(move |this| {
1086            persist_chunk_into_table!(
1087                event_struct_module::table,
1088                event_struct_modules,
1089                &this.blocking_cp
1090            )
1091        }));
1092
1093        futures.push(self.spawn_blocking_task(move |this| {
1094            persist_chunk_into_table!(
1095                event_struct_name::table,
1096                event_struct_names,
1097                &this.blocking_cp
1098            )
1099        }));
1100
1101        futures.push(self.spawn_blocking_task(move |this| {
1102            persist_chunk_into_table!(
1103                event_struct_instantiation::table,
1104                event_struct_instantiations,
1105                &this.blocking_cp
1106            )
1107        }));
1108
1109        futures::future::try_join_all(futures)
1110            .await
1111            .map_err(|e| {
1112                tracing::error!("Failed to join event indices futures in a chunk: {}", e);
1113                IndexerError::from(e)
1114            })?
1115            .into_iter()
1116            .collect::<Result<Vec<_>, _>>()
1117            .map_err(|e| {
1118                IndexerError::PostgresWrite(format!(
1119                    "Failed to persist all event indices in a chunk: {e:?}"
1120                ))
1121            })?;
1122        let elapsed = guard.stop_and_record();
1123        info!(elapsed, "Persisted {} chunked event indices", len);
1124        Ok(())
1125    }
1126
1127    async fn persist_tx_indices_chunk(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1128        let guard = self
1129            .metrics
1130            .checkpoint_db_commit_latency_tx_indices_chunks
1131            .start_timer();
1132        let len = indices.len();
1133        let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) =
1134            indices.into_iter().map(|i| i.split()).fold(
1135                (
1136                    Vec::new(),
1137                    Vec::new(),
1138                    Vec::new(),
1139                    Vec::new(),
1140                    Vec::new(),
1141                    Vec::new(),
1142                    Vec::new(),
1143                    Vec::new(),
1144                    Vec::new(),
1145                ),
1146                |(
1147                    mut tx_senders,
1148                    mut tx_recipients,
1149                    mut tx_input_objects,
1150                    mut tx_changed_objects,
1151                    mut tx_pkgs,
1152                    mut tx_mods,
1153                    mut tx_funs,
1154                    mut tx_digests,
1155                    mut tx_kinds,
1156                ),
1157                 index| {
1158                    tx_senders.extend(index.0);
1159                    tx_recipients.extend(index.1);
1160                    tx_input_objects.extend(index.2);
1161                    tx_changed_objects.extend(index.3);
1162                    tx_pkgs.extend(index.4);
1163                    tx_mods.extend(index.5);
1164                    tx_funs.extend(index.6);
1165                    tx_digests.extend(index.7);
1166                    tx_kinds.extend(index.8);
1167                    (
1168                        tx_senders,
1169                        tx_recipients,
1170                        tx_input_objects,
1171                        tx_changed_objects,
1172                        tx_pkgs,
1173                        tx_mods,
1174                        tx_funs,
1175                        tx_digests,
1176                        tx_kinds,
1177                    )
1178                },
1179            );
1180
1181        let futures = [
1182            self.spawn_blocking_task(move |this| {
1183                persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1184            }),
1185            self.spawn_blocking_task(move |this| {
1186                persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1187            }),
1188            self.spawn_blocking_task(move |this| {
1189                persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1190            }),
1191            self.spawn_blocking_task(move |this| {
1192                persist_chunk_into_table!(
1193                    tx_changed_objects::table,
1194                    changed_objects,
1195                    &this.blocking_cp
1196                )
1197            }),
1198            self.spawn_blocking_task(move |this| {
1199                persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1200            }),
1201            self.spawn_blocking_task(move |this| {
1202                persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1203            }),
1204            self.spawn_blocking_task(move |this| {
1205                persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1206            }),
1207            self.spawn_blocking_task(move |this| {
1208                persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1209            }),
1210            self.spawn_blocking_task(move |this| {
1211                persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1212            }),
1213        ];
1214
1215        futures::future::try_join_all(futures)
1216            .await
1217            .map_err(|e| {
1218                tracing::error!("Failed to join tx indices futures in a chunk: {}", e);
1219                IndexerError::from(e)
1220            })?
1221            .into_iter()
1222            .collect::<Result<Vec<_>, _>>()
1223            .map_err(|e| {
1224                IndexerError::PostgresWrite(format!(
1225                    "Failed to persist all tx indices in a chunk: {e:?}"
1226                ))
1227            })?;
1228        let elapsed = guard.stop_and_record();
1229        info!(elapsed, "Persisted {} chunked tx_indices", len);
1230        Ok(())
1231    }
1232
1233    async fn persist_tx_indices_chunk_v2(
1234        &self,
1235        indices: Vec<TxIndexV2>,
1236    ) -> Result<(), IndexerError> {
1237        let guard = self
1238            .metrics
1239            .checkpoint_db_commit_latency_tx_indices_chunks
1240            .start_timer();
1241        let len = indices.len();
1242
1243        let splits: Vec<TxIndexV2Split> = indices.into_iter().map(|i| i.split()).collect();
1244
1245        let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1246        let recipients: Vec<_> = splits
1247            .iter()
1248            .flat_map(|ix| ix.tx_recipients.clone())
1249            .collect();
1250        let input_objects: Vec<_> = splits
1251            .iter()
1252            .flat_map(|ix| ix.tx_input_objects.clone())
1253            .collect();
1254        let changed_objects: Vec<_> = splits
1255            .iter()
1256            .flat_map(|ix| ix.tx_changed_objects.clone())
1257            .collect();
1258        let wrapped_or_deleted_objects: Vec<_> = splits
1259            .iter()
1260            .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1261            .collect();
1262        let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1263        let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1264        let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1265        let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1266        let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1267
1268        let futures = [
1269            self.spawn_blocking_task(move |this| {
1270                persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1271            }),
1272            self.spawn_blocking_task(move |this| {
1273                persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1274            }),
1275            self.spawn_blocking_task(move |this| {
1276                persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1277            }),
1278            self.spawn_blocking_task(move |this| {
1279                persist_chunk_into_table!(
1280                    tx_changed_objects::table,
1281                    changed_objects,
1282                    &this.blocking_cp
1283                )
1284            }),
1285            self.spawn_blocking_task(move |this| {
1286                persist_chunk_into_table!(
1287                    tx_wrapped_or_deleted_objects::table,
1288                    wrapped_or_deleted_objects,
1289                    &this.blocking_cp
1290                )
1291            }),
1292            self.spawn_blocking_task(move |this| {
1293                persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1294            }),
1295            self.spawn_blocking_task(move |this| {
1296                persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1297            }),
1298            self.spawn_blocking_task(move |this| {
1299                persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1300            }),
1301            self.spawn_blocking_task(move |this| {
1302                persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1303            }),
1304            self.spawn_blocking_task(move |this| {
1305                persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1306            }),
1307        ];
1308
1309        futures::future::try_join_all(futures)
1310            .await
1311            .map_err(|e| {
1312                tracing::error!("Failed to join tx indices futures in a chunk: {}", e);
1313                IndexerError::from(e)
1314            })?
1315            .into_iter()
1316            .collect::<Result<Vec<_>, _>>()
1317            .map_err(|e| {
1318                IndexerError::PostgresWrite(format!(
1319                    "Failed to persist all tx indices in a chunk: {e:?}"
1320                ))
1321            })?;
1322        let elapsed = guard.stop_and_record();
1323        info!(elapsed, "Persisted {} chunked tx_indices", len);
1324        Ok(())
1325    }
1326
1327    fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1328        let guard = self
1329            .metrics
1330            .checkpoint_db_commit_latency_epoch
1331            .start_timer();
1332        let epoch_id = epoch.new_epoch.epoch;
1333
1334        transactional_blocking_with_retry!(
1335            &self.blocking_cp,
1336            |conn| {
1337                if let Some(last_epoch) = &epoch.last_epoch {
1338                    info!(last_epoch.epoch, "Persisting epoch end data.");
1339                    diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1340                        .set(last_epoch)
1341                        .execute(conn)?;
1342                }
1343
1344                info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1345                insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1346                Ok::<(), IndexerError>(())
1347            },
1348            PG_DB_COMMIT_SLEEP_DURATION
1349        )
1350        .tap_ok(|_| {
1351            let elapsed = guard.stop_and_record();
1352            info!(elapsed, epoch_id, "Persisted epoch beginning info");
1353        })
1354        .tap_err(|e| {
1355            tracing::error!("Failed to persist epoch with error: {}", e);
1356        })
1357    }
1358
1359    fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1360        let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1361        // partition_0 has been created, so no need to advance it.
1362        if let Some(last_epoch_id) = last_epoch_id {
1363            let last_db_epoch: Option<StoredEpochInfo> =
1364                read_only_blocking!(&self.blocking_cp, |conn| {
1365                    epochs::table
1366                        .filter(epochs::epoch.eq(last_epoch_id))
1367                        .first::<StoredEpochInfo>(conn)
1368                        .optional()
1369                })
1370                .context("Failed to read last epoch from PostgresDB")?;
1371            if let Some(last_epoch) = last_db_epoch {
1372                let epoch_partition_data =
1373                    EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1374                let table_partitions = self.partition_manager.get_table_partitions()?;
1375                for (table, (_, last_partition)) in table_partitions {
1376                    // Only advance epoch partition for epoch partitioned tables.
1377                    if !self
1378                        .partition_manager
1379                        .get_strategy(&table)
1380                        .is_epoch_partitioned()
1381                    {
1382                        continue;
1383                    }
1384                    let guard = self.metrics.advance_epoch_latency.start_timer();
1385                    self.partition_manager.advance_epoch(
1386                        table.clone(),
1387                        last_partition,
1388                        &epoch_partition_data,
1389                    )?;
1390                    let elapsed = guard.stop_and_record();
1391                    info!(
1392                        elapsed,
1393                        "Advanced epoch partition {} for table {}",
1394                        last_partition,
1395                        table.clone()
1396                    );
1397                }
1398            } else {
1399                tracing::error!("Last epoch: {} from PostgresDB is None.", last_epoch_id);
1400            }
1401        }
1402
1403        Ok(())
1404    }
1405
1406    fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1407        transactional_blocking_with_retry!(
1408            &self.blocking_cp,
1409            |conn| {
1410                diesel::delete(
1411                    checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1412                )
1413                .execute(conn)
1414                .map_err(IndexerError::from)
1415                .context("Failed to prune checkpoints table")?;
1416
1417                Ok::<(), IndexerError>(())
1418            },
1419            PG_DB_COMMIT_SLEEP_DURATION
1420        )
1421    }
1422
1423    fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1424        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1425        transactional_blocking_with_retry!(
1426            &self.blocking_cp,
1427            |conn| {
1428                prune_tx_or_event_indice_table!(
1429                    event_emit_module,
1430                    conn,
1431                    min_tx,
1432                    max_tx,
1433                    "Failed to prune event_emit_module table"
1434                );
1435                prune_tx_or_event_indice_table!(
1436                    event_emit_package,
1437                    conn,
1438                    min_tx,
1439                    max_tx,
1440                    "Failed to prune event_emit_package table"
1441                );
1442                prune_tx_or_event_indice_table![
1443                    event_senders,
1444                    conn,
1445                    min_tx,
1446                    max_tx,
1447                    "Failed to prune event_senders table"
1448                ];
1449                prune_tx_or_event_indice_table![
1450                    event_struct_instantiation,
1451                    conn,
1452                    min_tx,
1453                    max_tx,
1454                    "Failed to prune event_struct_instantiation table"
1455                ];
1456                prune_tx_or_event_indice_table![
1457                    event_struct_module,
1458                    conn,
1459                    min_tx,
1460                    max_tx,
1461                    "Failed to prune event_struct_module table"
1462                ];
1463                prune_tx_or_event_indice_table![
1464                    event_struct_name,
1465                    conn,
1466                    min_tx,
1467                    max_tx,
1468                    "Failed to prune event_struct_name table"
1469                ];
1470                prune_tx_or_event_indice_table![
1471                    event_struct_package,
1472                    conn,
1473                    min_tx,
1474                    max_tx,
1475                    "Failed to prune event_struct_package table"
1476                ];
1477                Ok::<(), IndexerError>(())
1478            },
1479            PG_DB_COMMIT_SLEEP_DURATION
1480        )
1481    }
1482
1483    fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1484        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1485        transactional_blocking_with_retry!(
1486            &self.blocking_cp,
1487            |conn| {
1488                prune_tx_or_event_indice_table!(
1489                    tx_senders,
1490                    conn,
1491                    min_tx,
1492                    max_tx,
1493                    "Failed to prune tx_senders table"
1494                );
1495                prune_tx_or_event_indice_table!(
1496                    tx_recipients,
1497                    conn,
1498                    min_tx,
1499                    max_tx,
1500                    "Failed to prune tx_recipients table"
1501                );
1502                prune_tx_or_event_indice_table![
1503                    tx_input_objects,
1504                    conn,
1505                    min_tx,
1506                    max_tx,
1507                    "Failed to prune tx_input_objects table"
1508                ];
1509                prune_tx_or_event_indice_table![
1510                    tx_changed_objects,
1511                    conn,
1512                    min_tx,
1513                    max_tx,
1514                    "Failed to prune tx_changed_objects table"
1515                ];
1516                prune_tx_or_event_indice_table![
1517                    tx_wrapped_or_deleted_objects,
1518                    conn,
1519                    min_tx,
1520                    max_tx,
1521                    "Failed to prune tx_wrapped_or_deleted_objects table"
1522                ];
1523                prune_tx_or_event_indice_table![
1524                    tx_calls_pkg,
1525                    conn,
1526                    min_tx,
1527                    max_tx,
1528                    "Failed to prune tx_calls_pkg table"
1529                ];
1530                prune_tx_or_event_indice_table![
1531                    tx_calls_mod,
1532                    conn,
1533                    min_tx,
1534                    max_tx,
1535                    "Failed to prune tx_calls_mod table"
1536                ];
1537                prune_tx_or_event_indice_table![
1538                    tx_calls_fun,
1539                    conn,
1540                    min_tx,
1541                    max_tx,
1542                    "Failed to prune tx_calls_fun table"
1543                ];
1544                prune_tx_or_event_indice_table![
1545                    tx_digests,
1546                    conn,
1547                    min_tx,
1548                    max_tx,
1549                    "Failed to prune tx_digests table"
1550                ];
1551                Ok::<(), IndexerError>(())
1552            },
1553            PG_DB_COMMIT_SLEEP_DURATION
1554        )
1555    }
1556
1557    fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1558        transactional_blocking_with_retry!(
1559            &self.blocking_cp,
1560            |conn| {
1561                diesel::delete(
1562                    pruner_cp_watermark::table
1563                        .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1564                )
1565                .execute(conn)
1566                .map_err(IndexerError::from)
1567                .context("Failed to prune pruner_cp_watermark table")?;
1568                Ok::<(), IndexerError>(())
1569            },
1570            PG_DB_COMMIT_SLEEP_DURATION
1571        )
1572    }
1573
1574    fn get_network_total_transactions_by_end_of_epoch(
1575        &self,
1576        epoch: u64,
1577    ) -> Result<Option<u64>, IndexerError> {
1578        read_only_blocking!(&self.blocking_cp, |conn| {
1579            epochs::table
1580                .filter(epochs::epoch.eq(epoch as i64))
1581                .select(epochs::network_total_transactions)
1582                .get_result::<Option<i64>>(conn)
1583        })
1584        .context("Failed to get network total transactions in epoch")
1585        .map(|option| option.map(|v| v as u64))
1586    }
1587
1588    fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1589        transactional_blocking_with_retry!(
1590            &self.blocking_cp,
1591            |conn| {
1592                diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1593                    .execute(conn)?;
1594                Ok::<(), IndexerError>(())
1595            },
1596            PG_DB_COMMIT_SLEEP_DURATION
1597        )
1598        .tap_ok(|_| {
1599            info!("Successfully refreshed participation_metrics");
1600        })
1601        .tap_err(|e| {
1602            tracing::error!("Failed to refresh participation_metrics: {e}");
1603        })
1604    }
1605
1606    async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1607    where
1608        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1609        R: Send + 'static,
1610    {
1611        let this = self.clone();
1612        let current_span = tracing::Span::current();
1613        tokio::task::spawn_blocking(move || {
1614            let _guard = current_span.enter();
1615            f(this)
1616        })
1617        .await
1618        .map_err(Into::into)
1619        .and_then(std::convert::identity)
1620    }
1621
1622    fn spawn_blocking_task<F, R>(
1623        &self,
1624        f: F,
1625    ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1626    where
1627        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1628        R: Send + 'static,
1629    {
1630        let this = self.clone();
1631        let current_span = tracing::Span::current();
1632        let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1633        tokio::task::spawn_blocking(move || {
1634            let _guard = current_span.enter();
1635            let _elapsed = guard.stop_and_record();
1636            f(this)
1637        })
1638    }
1639
1640    fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1641    where
1642        F: FnOnce(Self) -> Fut + Send + 'static,
1643        Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1644        R: Send + 'static,
1645    {
1646        let this = self.clone();
1647        tokio::task::spawn(async move { f(this).await })
1648    }
1649}
1650
1651#[async_trait]
1652impl IndexerStore for PgIndexerStore {
1653    async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1654        self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1655            .await
1656    }
1657
1658    async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1659        self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1660            .await
1661    }
1662
1663    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1664        self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1665            .await
1666    }
1667
1668    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1669        self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1670            .await
1671    }
1672
1673    async fn get_latest_object_snapshot_checkpoint_sequence_number(
1674        &self,
1675    ) -> Result<Option<u64>, IndexerError> {
1676        self.execute_in_blocking_worker(|this| {
1677            this.get_latest_object_snapshot_checkpoint_sequence_number()
1678        })
1679        .await
1680    }
1681
1682    async fn persist_objects(
1683        &self,
1684        object_changes: Vec<TransactionObjectChangesToCommit>,
1685    ) -> Result<(), IndexerError> {
1686        if object_changes.is_empty() {
1687            return Ok(());
1688        }
1689        let guard = self
1690            .metrics
1691            .checkpoint_db_commit_latency_objects
1692            .start_timer();
1693        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1694        let object_mutations = indexed_mutations
1695            .into_iter()
1696            .map(StoredObject::from)
1697            .collect::<Vec<_>>();
1698        let object_deletions = indexed_deletions
1699            .into_iter()
1700            .map(StoredDeletedObject::from)
1701            .collect::<Vec<_>>();
1702        let mutation_len = object_mutations.len();
1703        let deletion_len = object_deletions.len();
1704
1705        let object_mutation_chunks =
1706            chunk!(object_mutations, self.config.parallel_objects_chunk_size);
1707        let object_deletion_chunks =
1708            chunk!(object_deletions, self.config.parallel_objects_chunk_size);
1709        let mutation_futures = object_mutation_chunks
1710            .into_iter()
1711            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_mutation_chunk(c)));
1712        futures::future::try_join_all(mutation_futures)
1713            .await
1714            .map_err(|e| {
1715                tracing::error!(
1716                    "Failed to join persist_object_mutation_chunk futures: {}",
1717                    e
1718                );
1719                IndexerError::from(e)
1720            })?
1721            .into_iter()
1722            .collect::<Result<Vec<_>, _>>()
1723            .map_err(|e| {
1724                IndexerError::PostgresWrite(format!(
1725                    "Failed to persist all object mutation chunks: {e:?}"
1726                ))
1727            })?;
1728        let deletion_futures = object_deletion_chunks
1729            .into_iter()
1730            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_deletion_chunk(c)));
1731        futures::future::try_join_all(deletion_futures)
1732            .await
1733            .map_err(|e| {
1734                tracing::error!(
1735                    "Failed to join persist_object_deletion_chunk futures: {}",
1736                    e
1737                );
1738                IndexerError::from(e)
1739            })?
1740            .into_iter()
1741            .collect::<Result<Vec<_>, _>>()
1742            .map_err(|e| {
1743                IndexerError::PostgresWrite(format!(
1744                    "Failed to persist all object deletion chunks: {e:?}"
1745                ))
1746            })?;
1747
1748        let elapsed = guard.stop_and_record();
1749        info!(
1750            elapsed,
1751            "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
1752        );
1753        Ok(())
1754    }
1755
1756    fn persist_objects_in_existing_transaction(
1757        &self,
1758        conn: &mut PgConnection,
1759        object_changes: Vec<TransactionObjectChangesToCommit>,
1760    ) -> Result<(), IndexerError> {
1761        if object_changes.is_empty() {
1762            return Ok(());
1763        }
1764
1765        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1766        let object_mutations = indexed_mutations
1767            .into_iter()
1768            .map(StoredObject::from)
1769            .collect::<Vec<_>>();
1770        let object_deletions = indexed_deletions
1771            .into_iter()
1772            .map(StoredDeletedObject::from)
1773            .collect::<Vec<_>>();
1774
1775        self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1776        self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1777
1778        Ok(())
1779    }
1780
1781    async fn persist_objects_snapshot(
1782        &self,
1783        object_changes: Vec<TransactionObjectChangesToCommit>,
1784    ) -> Result<(), IndexerError> {
1785        if object_changes.is_empty() {
1786            return Ok(());
1787        }
1788        let guard = self
1789            .metrics
1790            .checkpoint_db_commit_latency_objects_snapshot
1791            .start_timer();
1792        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1793        let objects_snapshot = indexed_mutations
1794            .into_iter()
1795            .map(StoredObjectSnapshot::from)
1796            .chain(
1797                indexed_deletions
1798                    .into_iter()
1799                    .map(StoredObjectSnapshot::from),
1800            )
1801            .collect::<Vec<_>>();
1802        let len = objects_snapshot.len();
1803        let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1804        let futures = chunks
1805            .into_iter()
1806            .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1807
1808        futures::future::try_join_all(futures)
1809            .await
1810            .map_err(|e| {
1811                tracing::error!(
1812                    "Failed to join backfill_objects_snapshot_chunk futures: {}",
1813                    e
1814                );
1815                IndexerError::from(e)
1816            })?
1817            .into_iter()
1818            .collect::<Result<Vec<_>, _>>()
1819            .map_err(|e| {
1820                IndexerError::PostgresWrite(format!(
1821                    "Failed to persist all objects snapshot chunks: {e:?}"
1822                ))
1823            })?;
1824        let elapsed = guard.stop_and_record();
1825        info!(elapsed, "Persisted {} objects snapshot", len);
1826        Ok(())
1827    }
1828
1829    async fn persist_object_history(
1830        &self,
1831        object_changes: Vec<TransactionObjectChangesToCommit>,
1832    ) -> Result<(), IndexerError> {
1833        let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1834            .map(|val| val.eq_ignore_ascii_case("true"))
1835            .unwrap_or(false);
1836        if skip_history {
1837            info!("skipping object history");
1838            return Ok(());
1839        }
1840
1841        if object_changes.is_empty() {
1842            return Ok(());
1843        }
1844        let objects = make_objects_history_to_commit(object_changes);
1845        let guard = self
1846            .metrics
1847            .checkpoint_db_commit_latency_objects_history
1848            .start_timer();
1849
1850        let len = objects.len();
1851        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1852        let futures = chunks
1853            .into_iter()
1854            .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1855
1856        futures::future::try_join_all(futures)
1857            .await
1858            .map_err(|e| {
1859                tracing::error!(
1860                    "Failed to join persist_objects_history_chunk futures: {}",
1861                    e
1862                );
1863                IndexerError::from(e)
1864            })?
1865            .into_iter()
1866            .collect::<Result<Vec<_>, _>>()
1867            .map_err(|e| {
1868                IndexerError::PostgresWrite(format!(
1869                    "Failed to persist all objects history chunks: {e:?}"
1870                ))
1871            })?;
1872        let elapsed = guard.stop_and_record();
1873        info!(elapsed, "Persisted {} objects history", len);
1874        Ok(())
1875    }
1876
1877    async fn persist_object_versions(
1878        &self,
1879        object_versions: Vec<StoredObjectVersion>,
1880    ) -> Result<(), IndexerError> {
1881        if object_versions.is_empty() {
1882            return Ok(());
1883        }
1884
1885        let guard = self
1886            .metrics
1887            .checkpoint_db_commit_latency_objects_version
1888            .start_timer();
1889
1890        let object_versions_count = object_versions.len();
1891
1892        let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1893        let futures = chunks
1894            .into_iter()
1895            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1896            .collect::<Vec<_>>();
1897
1898        futures::future::try_join_all(futures)
1899            .await
1900            .map_err(|e| {
1901                tracing::error!("Failed to join persist_object_version_chunk futures: {}", e);
1902                IndexerError::from(e)
1903            })?
1904            .into_iter()
1905            .collect::<Result<Vec<_>, _>>()
1906            .map_err(|e| {
1907                IndexerError::PostgresWrite(format!(
1908                    "Failed to persist all objects version chunks: {e:?}"
1909                ))
1910            })?;
1911        let elapsed = guard.stop_and_record();
1912        info!(elapsed, "Persisted {object_versions_count} object versions");
1913        Ok(())
1914    }
1915
1916    async fn persist_checkpoints(
1917        &self,
1918        checkpoints: Vec<IndexedCheckpoint>,
1919    ) -> Result<(), IndexerError> {
1920        self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1921            .await
1922    }
1923
1924    async fn persist_transactions(
1925        &self,
1926        transactions: Vec<IndexedTransaction>,
1927    ) -> Result<(), IndexerError> {
1928        let guard = self
1929            .metrics
1930            .checkpoint_db_commit_latency_transactions
1931            .start_timer();
1932        let len = transactions.len();
1933
1934        let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1935        let futures = chunks
1936            .into_iter()
1937            .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1938
1939        futures::future::try_join_all(futures)
1940            .await
1941            .map_err(|e| {
1942                tracing::error!("Failed to join persist_transactions_chunk futures: {}", e);
1943                IndexerError::from(e)
1944            })?
1945            .into_iter()
1946            .collect::<Result<Vec<_>, _>>()
1947            .map_err(|e| {
1948                IndexerError::PostgresWrite(format!(
1949                    "Failed to persist all transactions chunks: {e:?}"
1950                ))
1951            })?;
1952        let elapsed = guard.stop_and_record();
1953        info!(elapsed, "Persisted {} transactions", len);
1954        Ok(())
1955    }
1956
1957    fn persist_optimistic_transaction_in_existing_transaction(
1958        &self,
1959        conn: &mut PgConnection,
1960        transaction: OptimisticTransaction,
1961    ) -> Result<(), IndexerError> {
1962        insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1963        Ok(())
1964    }
1965
1966    async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1967        if events.is_empty() {
1968            return Ok(());
1969        }
1970        let len = events.len();
1971        let guard = self
1972            .metrics
1973            .checkpoint_db_commit_latency_events
1974            .start_timer();
1975        let chunks = chunk!(events, self.config.parallel_chunk_size);
1976        let futures = chunks
1977            .into_iter()
1978            .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1979
1980        futures::future::try_join_all(futures)
1981            .await
1982            .map_err(|e| {
1983                tracing::error!("Failed to join persist_events_chunk futures: {}", e);
1984                IndexerError::from(e)
1985            })?
1986            .into_iter()
1987            .collect::<Result<Vec<_>, _>>()
1988            .map_err(|e| {
1989                IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
1990            })?;
1991        let elapsed = guard.stop_and_record();
1992        info!(elapsed, "Persisted {} events", len);
1993        Ok(())
1994    }
1995
1996    async fn persist_displays(
1997        &self,
1998        display_updates: BTreeMap<String, StoredDisplay>,
1999    ) -> Result<(), IndexerError> {
2000        if display_updates.is_empty() {
2001            return Ok(());
2002        }
2003
2004        self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
2005            .await?
2006    }
2007
2008    fn persist_displays_in_existing_transaction(
2009        &self,
2010        conn: &mut PgConnection,
2011        display_updates: Vec<&StoredDisplay>,
2012    ) -> Result<(), IndexerError> {
2013        if display_updates.is_empty() {
2014            return Ok(());
2015        }
2016
2017        on_conflict_do_update_with_condition!(
2018            display::table,
2019            display_updates,
2020            display::object_type,
2021            (
2022                display::id.eq(excluded(display::id)),
2023                display::version.eq(excluded(display::version)),
2024                display::bcs.eq(excluded(display::bcs)),
2025            ),
2026            excluded(display::version).gt(display::version),
2027            conn
2028        );
2029
2030        Ok(())
2031    }
2032
2033    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2034        if packages.is_empty() {
2035            return Ok(());
2036        }
2037        self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
2038            .await
2039    }
2040
2041    async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2042        if indices.is_empty() {
2043            return Ok(());
2044        }
2045        let len = indices.len();
2046        let guard = self
2047            .metrics
2048            .checkpoint_db_commit_latency_event_indices
2049            .start_timer();
2050        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2051
2052        let futures = chunks.into_iter().map(|chunk| {
2053            self.spawn_task(move |this: Self| async move {
2054                this.persist_event_indices_chunk(chunk).await
2055            })
2056        });
2057
2058        futures::future::try_join_all(futures)
2059            .await
2060            .map_err(|e| {
2061                tracing::error!("Failed to join persist_event_indices_chunk futures: {}", e);
2062                IndexerError::from(e)
2063            })?
2064            .into_iter()
2065            .collect::<Result<Vec<_>, _>>()
2066            .map_err(|e| {
2067                IndexerError::PostgresWrite(format!(
2068                    "Failed to persist all event_indices chunks: {e:?}"
2069                ))
2070            })?;
2071        let elapsed = guard.stop_and_record();
2072        info!(elapsed, "Persisted {} event_indices chunks", len);
2073        Ok(())
2074    }
2075
2076    async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2077        if indices.is_empty() {
2078            return Ok(());
2079        }
2080        let len = indices.len();
2081        let guard = self
2082            .metrics
2083            .checkpoint_db_commit_latency_tx_indices
2084            .start_timer();
2085        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2086
2087        let futures = chunks.into_iter().map(|chunk| {
2088            self.spawn_task(
2089                move |this: Self| async move { this.persist_tx_indices_chunk(chunk).await },
2090            )
2091        });
2092        futures::future::try_join_all(futures)
2093            .await
2094            .map_err(|e| {
2095                tracing::error!("Failed to join persist_tx_indices_chunk futures: {}", e);
2096                IndexerError::from(e)
2097            })?
2098            .into_iter()
2099            .collect::<Result<Vec<_>, _>>()
2100            .map_err(|e| {
2101                IndexerError::PostgresWrite(format!(
2102                    "Failed to persist all tx_indices chunks: {e:?}"
2103                ))
2104            })?;
2105        let elapsed = guard.stop_and_record();
2106        info!(elapsed, "Persisted {} tx_indices chunks", len);
2107        Ok(())
2108    }
2109
2110    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2111        self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2112            .await
2113    }
2114
2115    async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2116        self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2117            .await
2118    }
2119
2120    async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2121        let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2122            (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2123            _ => Err(IndexerError::PostgresRead(format!(
2124                "Failed to get checkpoint range for epoch {epoch}"
2125            ))),
2126        }?;
2127
2128        // NOTE: for disaster recovery, min_cp is the min cp of the current epoch, which
2129        // is likely partially pruned already. min_prunable_cp is the min cp to
2130        // be pruned. By std::cmp::max, we will resume the pruning process from
2131        // the next checkpoint, instead of the first cp of the current epoch.
2132        let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2133        min_cp = std::cmp::max(min_cp, min_prunable_cp);
2134        for cp in min_cp..=max_cp {
2135            // NOTE: the order of pruning tables is crucial:
2136            // 1. prune checkpoints table, checkpoints table is the source table of
2137            //    available range,
2138            // we prune it first to make sure that we always have full data for checkpoints
2139            // within the available range;
2140            // 2. then prune tx_* tables;
2141            // 3. then prune pruner_cp_watermark table, which is the checkpoint pruning
2142            //    watermark table and also tx seq source
2143            // of a checkpoint to prune tx_* tables;
2144            // 4. lastly we prune epochs table when all checkpoints of the epoch have been
2145            //    pruned.
2146            info!(
2147                "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2148                cp, epoch, min_prunable_cp
2149            );
2150            self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2151                .await
2152                .unwrap_or_else(|e| {
2153                    tracing::error!("Failed to prune checkpoint {}: {}", cp, e);
2154                });
2155
2156            let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2157            self.execute_in_blocking_worker(move |this| {
2158                this.prune_tx_indices_table(min_tx, max_tx)
2159            })
2160            .await
2161            .unwrap_or_else(|e| {
2162                tracing::error!("Failed to prune transactions for cp {}: {}", cp, e);
2163            });
2164            info!(
2165                "Pruned transactions for checkpoint {} from tx {} to tx {}",
2166                cp, min_tx, max_tx
2167            );
2168            self.execute_in_blocking_worker(move |this| {
2169                this.prune_event_indices_table(min_tx, max_tx)
2170            })
2171            .await
2172            .unwrap_or_else(|e| {
2173                tracing::error!(
2174                    "Failed to prune events of transactions for cp {}: {}",
2175                    cp,
2176                    e
2177                );
2178            });
2179            info!(
2180                "Pruned events of transactions for checkpoint {} from tx {} to tx {}",
2181                cp, min_tx, max_tx
2182            );
2183            self.metrics.last_pruned_transaction.set(max_tx as i64);
2184
2185            self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2186                .await
2187                .unwrap_or_else(|e| {
2188                    tracing::error!(
2189                        "Failed to prune pruner_cp_watermark table for cp {}: {}",
2190                        cp,
2191                        e
2192                    );
2193                });
2194            info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2195            self.metrics.last_pruned_checkpoint.set(cp as i64);
2196        }
2197
2198        Ok(())
2199    }
2200
2201    async fn get_network_total_transactions_by_end_of_epoch(
2202        &self,
2203        epoch: u64,
2204    ) -> Result<Option<u64>, IndexerError> {
2205        self.execute_in_blocking_worker(move |this| {
2206            this.get_network_total_transactions_by_end_of_epoch(epoch)
2207        })
2208        .await
2209    }
2210
2211    async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2212        self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2213            .await
2214    }
2215
2216    fn as_any(&self) -> &dyn StdAny {
2217        self
2218    }
2219
2220    /// Persist protocol configs and feature flags until the protocol version
2221    /// for the latest epoch we have stored in the db, inclusive.
2222    fn persist_protocol_configs_and_feature_flags(
2223        &self,
2224        chain_id: Vec<u8>,
2225    ) -> Result<(), IndexerError> {
2226        let chain_id = ChainIdentifier::from(
2227            CheckpointDigest::try_from(chain_id).expect("Unable to convert chain id"),
2228        );
2229
2230        let mut all_configs = vec![];
2231        let mut all_flags = vec![];
2232
2233        let (start_version, end_version) = self.get_protocol_version_index_range()?;
2234        info!(
2235            "Persisting protocol configs with start_version: {}, end_version: {}",
2236            start_version, end_version
2237        );
2238
2239        // Gather all protocol configs and feature flags for all versions between start
2240        // and end.
2241        for version in start_version..=end_version {
2242            let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2243                (version as u64).into(),
2244                chain_id.chain(),
2245            )
2246            .ok_or(IndexerError::Generic(format!(
2247                "Unable to fetch protocol version {} and chain {:?}",
2248                version,
2249                chain_id.chain()
2250            )))?;
2251            let configs_vec = protocol_configs
2252                .attr_map()
2253                .into_iter()
2254                .map(|(k, v)| StoredProtocolConfig {
2255                    protocol_version: version,
2256                    config_name: k,
2257                    config_value: v.map(|v| v.to_string()),
2258                })
2259                .collect::<Vec<_>>();
2260            all_configs.extend(configs_vec);
2261
2262            let feature_flags = protocol_configs
2263                .feature_map()
2264                .into_iter()
2265                .map(|(k, v)| StoredFeatureFlag {
2266                    protocol_version: version,
2267                    flag_name: k,
2268                    flag_value: v,
2269                })
2270                .collect::<Vec<_>>();
2271            all_flags.extend(feature_flags);
2272        }
2273
2274        transactional_blocking_with_retry!(
2275            &self.blocking_cp,
2276            |conn| {
2277                for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2278                    insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2279                }
2280                insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2281                Ok::<(), IndexerError>(())
2282            },
2283            PG_DB_COMMIT_SLEEP_DURATION
2284        )?;
2285        Ok(())
2286    }
2287}
2288#[async_trait]
2289impl IndexerStoreExt for PgIndexerStore {
2290    async fn persist_tx_indices_v2(&self, indices: Vec<TxIndexV2>) -> Result<(), IndexerError> {
2291        if indices.is_empty() {
2292            return Ok(());
2293        }
2294        let len = indices.len();
2295        let guard = self
2296            .metrics
2297            .checkpoint_db_commit_latency_tx_indices
2298            .start_timer();
2299        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2300
2301        let futures = chunks.into_iter().map(|chunk| {
2302            self.spawn_task(move |this: Self| async move {
2303                this.persist_tx_indices_chunk_v2(chunk).await
2304            })
2305        });
2306        futures::future::try_join_all(futures)
2307            .await
2308            .map_err(|e| {
2309                tracing::error!("Failed to join persist_tx_indices_chunk futures: {}", e);
2310                IndexerError::from(e)
2311            })?
2312            .into_iter()
2313            .collect::<Result<Vec<_>, _>>()
2314            .map_err(|e| {
2315                IndexerError::PostgresWrite(format!(
2316                    "Failed to persist all tx_indices chunks: {e:?}"
2317                ))
2318            })?;
2319        let elapsed = guard.stop_and_record();
2320        info!(elapsed, "Persisted {} tx_indices chunks", len);
2321        Ok(())
2322    }
2323
2324    async fn persist_checkpoint_objects(
2325        &self,
2326        objects: Vec<CheckpointObjectChanges>,
2327    ) -> Result<(), IndexerError> {
2328        if objects.is_empty() {
2329            return Ok(());
2330        }
2331        let guard = self
2332            .metrics
2333            .checkpoint_db_commit_latency_objects
2334            .start_timer();
2335        let CheckpointObjectChanges {
2336            changed_objects: mutations,
2337            deleted_objects: deletions,
2338        } = retain_latest_objects_from_checkpoint_batch(objects);
2339        let mutation_len = mutations.len();
2340        let deletion_len = deletions.len();
2341
2342        let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2343        let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2344        let mutation_futures = mutation_chunks
2345            .into_iter()
2346            .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2347        futures::future::try_join_all(mutation_futures)
2348            .await
2349            .map_err(|e| {
2350                tracing::error!(
2351                    "Failed to join persist_object_mutation_chunk futures: {}",
2352                    e
2353                );
2354                IndexerError::from(e)
2355            })?
2356            .into_iter()
2357            .collect::<Result<Vec<_>, _>>()
2358            .map_err(|e| {
2359                IndexerError::PostgresWrite(format!(
2360                    "Failed to persist all object mutation chunks: {e:?}",
2361                ))
2362            })?;
2363        let deletion_futures = deletion_chunks
2364            .into_iter()
2365            .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2366        futures::future::try_join_all(deletion_futures)
2367            .await
2368            .map_err(|e| {
2369                tracing::error!(
2370                    "Failed to join persist_object_deletion_chunk futures: {}",
2371                    e
2372                );
2373                IndexerError::from(e)
2374            })?
2375            .into_iter()
2376            .collect::<Result<Vec<_>, _>>()
2377            .map_err(|e| {
2378                IndexerError::PostgresWrite(format!(
2379                    "Failed to persist all object deletion chunks: {e:?}",
2380                ))
2381            })?;
2382
2383        let elapsed = guard.stop_and_record();
2384        info!(
2385            elapsed,
2386            "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2387        );
2388        Ok(())
2389    }
2390
2391    async fn update_status_for_checkpoint_transactions(
2392        &self,
2393        tx_order: Vec<CheckpointTxGlobalOrder>,
2394    ) -> Result<(), IndexerError> {
2395        let guard = self
2396            .metrics
2397            .checkpoint_db_commit_latency_tx_insertion_order
2398            .start_timer();
2399        let len = tx_order.len();
2400
2401        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2402        let futures = chunks.into_iter().map(|c| {
2403            self.spawn_blocking_task(move |this| {
2404                this.update_status_for_checkpoint_transactions_chunk(c)
2405            })
2406        });
2407
2408        futures::future::try_join_all(futures)
2409            .await
2410            .map_err(|e| {
2411                tracing::error!(
2412                    "Failed to join update_status_for_checkpoint_transactions_chunk futures: {e}",
2413                );
2414                IndexerError::from(e)
2415            })?
2416            .into_iter()
2417            .collect::<Result<Vec<_>, _>>()
2418            .map_err(|e| {
2419                IndexerError::PostgresWrite(format!(
2420                    "Failed to update all `tx_global_order` chunks: {e:?}",
2421                ))
2422            })?;
2423        let elapsed = guard.stop_and_record();
2424        info!(
2425            elapsed,
2426            "Updated index status for {len} txs insertion orders"
2427        );
2428        Ok(())
2429    }
2430
2431    async fn persist_tx_global_order(
2432        &self,
2433        tx_order: Vec<CheckpointTxGlobalOrder>,
2434    ) -> Result<(), IndexerError> {
2435        let guard = self
2436            .metrics
2437            .checkpoint_db_commit_latency_tx_insertion_order
2438            .start_timer();
2439        let len = tx_order.len();
2440
2441        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2442        let futures = chunks
2443            .into_iter()
2444            .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2445
2446        futures::future::try_join_all(futures)
2447            .await
2448            .map_err(|e| {
2449                tracing::error!("Failed to join persist_tx_global_order_chunk futures: {e}",);
2450                IndexerError::from(e)
2451            })?
2452            .into_iter()
2453            .collect::<Result<Vec<_>, _>>()
2454            .map_err(|e| {
2455                IndexerError::PostgresWrite(format!(
2456                    "Failed to persist all txs insertion order chunks: {e:?}",
2457                ))
2458            })?;
2459        let elapsed = guard.stop_and_record();
2460        info!(elapsed, "Persisted {len} txs insertion orders");
2461        Ok(())
2462    }
2463}
2464
2465fn make_objects_history_to_commit(
2466    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2467) -> Vec<StoredHistoryObject> {
2468    let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2469        .clone()
2470        .into_iter()
2471        .flat_map(|changes| changes.deleted_objects)
2472        .map(|o| o.into())
2473        .collect();
2474    let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2475        .into_iter()
2476        .flat_map(|changes| changes.changed_objects)
2477        .map(|o| o.into())
2478        .collect();
2479    deleted_objects.into_iter().chain(mutated_objects).collect()
2480}
2481
2482/// Partition object changes into deletions and mutations,
2483/// within partition of mutations or deletions, retain the latest with highest
2484/// version; For overlappings of mutations and deletions, only keep one with
2485/// higher version. This is necessary b/c after this step, DB commit will be
2486/// done in parallel and not in order.
2487fn retain_latest_indexed_objects(
2488    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2489) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2490    use std::collections::HashMap;
2491
2492    let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2493    let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2494
2495    for change in tx_object_changes {
2496        // Remove mutation / deletion with a following deletion / mutation,
2497        // as we expect that following deletion / mutation has a higher version.
2498        // Technically, assertions below are not required, double check just in case.
2499        for mutation in change.changed_objects {
2500            let id = mutation.object.id();
2501            let version = mutation.object.version();
2502
2503            if let Some(existing) = deletions.remove(&id) {
2504                assert!(
2505                    existing.object_version < version.value(),
2506                    "Mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2507                    existing.object_version
2508                );
2509            }
2510
2511            if let Some(existing) = mutations.insert(id, mutation) {
2512                assert!(
2513                    existing.object.version() < version,
2514                    "Mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2515                    existing.object.version()
2516                );
2517            }
2518        }
2519        // Handle deleted objects
2520        for deletion in change.deleted_objects {
2521            let id = deletion.object_id;
2522            let version = deletion.object_version;
2523
2524            if let Some(existing) = mutations.remove(&id) {
2525                assert!(
2526                    existing.object.version().value() < version,
2527                    "Deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2528                    existing.object.version(),
2529                );
2530            }
2531
2532            if let Some(existing) = deletions.insert(id, deletion) {
2533                assert!(
2534                    existing.object_version < version,
2535                    "Deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2536                    existing.object_version
2537                );
2538            }
2539        }
2540    }
2541
2542    (
2543        mutations.into_values().collect(),
2544        deletions.into_values().collect(),
2545    )
2546}