Skip to main content

iota_indexer/store/
pg_indexer_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use core::result::Result::Ok;
6use std::{
7    any::Any as StdAny,
8    collections::{BTreeMap, HashMap},
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use diesel::{
14    ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
15    dsl::{max, min, sql},
16    sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
17    upsert::excluded,
18};
19use downcast::Any;
20use iota_protocol_config::ProtocolConfig;
21use iota_sdk_types::ObjectId;
22use iota_types::digests::{ChainIdentifier, CheckpointDigest};
23use itertools::Itertools;
24use strum::IntoEnumIterator;
25use tap::TapFallible;
26use tracing::info;
27
28use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
29use crate::{
30    blocking_call_is_ok_or_panic,
31    db::ConnectionPool,
32    errors::{Context, IndexerError},
33    ingestion::{
34        common::{
35            persist::CommitterWatermark,
36            prepare::{
37                CheckpointObjectChanges, LiveObject, RemovedObject,
38                retain_latest_objects_from_checkpoint_batch,
39            },
40        },
41        primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
42    },
43    insert_or_ignore_into,
44    metrics::IndexerMetrics,
45    models::{
46        checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
47        display::StoredDisplay,
48        epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
49        events::StoredEvent,
50        obj_indices::StoredObjectVersion,
51        objects::{
52            StoredBackwardHistoryObject, StoredCheckpointedObject, StoredDeletedObject,
53            StoredObject, StoredObjects,
54        },
55        packages::StoredPackage,
56        transactions::{OptimisticTransaction, StoredTransaction, TxGlobalOrder},
57        tx_indices::TxIndexSplit,
58        watermarks::StoredWatermark,
59    },
60    on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
61    persist_chunk_into_table_in_existing_connection,
62    pruning::pruner::PrunableTable,
63    read_only_blocking, run_query, run_query_with_retry,
64    schema::{
65        chain_identifier, checkpointed_objects, checkpoints, display, epochs, event_emit_module,
66        event_emit_package, event_senders, event_struct_instantiation, event_struct_module,
67        event_struct_name, event_struct_package, events, feature_flags, objects,
68        objects_backward_history, objects_version, optimistic_transactions, packages,
69        protocol_configs, pruner_cp_watermark, transactions, tx_calls_fun, tx_calls_mod,
70        tx_calls_pkg, tx_changed_objects, tx_digests, tx_global_order, tx_input_objects, tx_kinds,
71        tx_recipients, tx_senders, tx_wrapped_or_deleted_objects, watermarks,
72    },
73    store::{IndexerStore, diesel_macro::mark_in_blocking_pool},
74    transactional_blocking_with_retry,
75    types::{
76        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
77        IndexedPackage, IndexedTransaction, TxIndex,
78    },
79};
80
81/// A cursor representing the global order position of transaction according to
82/// tx_global_order table
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub struct TxGlobalOrderCursor {
85    pub global_sequence_number: i64,
86    pub optimistic_sequence_number: i64,
87}
88
89#[macro_export]
90macro_rules! chunk {
91    ($data: expr, $size: expr) => {{
92        $data
93            .into_iter()
94            .chunks($size)
95            .into_iter()
96            .map(|c| c.collect())
97            .collect::<Vec<Vec<_>>>()
98    }};
99}
100
101macro_rules! prune_tx_or_event_indice_table {
102    ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
103        diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
104            .execute($conn)
105            .map_err(IndexerError::from)
106            .context($context_msg)?;
107    };
108}
109
110// In one DB transaction, the update could be chunked into
111// a few statements, this is the amount of rows to update in one statement
112// TODO: I think with the `per_db_tx` params, `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX`
113// is now less relevant. We should do experiments and remove it if it's true.
114const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
115// The amount of rows to update in one DB transaction
116const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
117// The amount of rows to update in one DB transaction, for objects particularly
118// Having this number too high may cause many db deadlocks because of
119// optimistic locking.
120const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
121const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
122
123#[derive(Clone)]
124pub struct PgIndexerStoreConfig {
125    pub parallel_chunk_size: usize,
126    pub parallel_objects_chunk_size: usize,
127}
128
129pub struct PgIndexerStore {
130    blocking_cp: ConnectionPool,
131    metrics: IndexerMetrics,
132    partition_manager: PgPartitionManager,
133    config: PgIndexerStoreConfig,
134}
135
136impl Clone for PgIndexerStore {
137    fn clone(&self) -> PgIndexerStore {
138        Self {
139            blocking_cp: self.blocking_cp.clone(),
140            metrics: self.metrics.clone(),
141            partition_manager: self.partition_manager.clone(),
142            config: self.config.clone(),
143        }
144    }
145}
146
147impl PgIndexerStore {
148    pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
149        let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
150            .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
151            .parse::<usize>()
152            .unwrap();
153        let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
154            .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
155            .parse::<usize>()
156            .unwrap();
157        let partition_manager = PgPartitionManager::new(blocking_cp.clone())
158            .expect("failed to initialize partition manager");
159        let config = PgIndexerStoreConfig {
160            parallel_chunk_size,
161            parallel_objects_chunk_size,
162        };
163
164        Self {
165            blocking_cp,
166            metrics,
167            partition_manager,
168            config,
169        }
170    }
171
172    pub fn get_metrics(&self) -> IndexerMetrics {
173        self.metrics.clone()
174    }
175
176    pub fn blocking_cp(&self) -> ConnectionPool {
177        self.blocking_cp.clone()
178    }
179
180    /// Get the range of the protocol versions that need to be indexed.
181    pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
182        // We start indexing from the next protocol version after the latest one stored
183        // in the db.
184        let start = read_only_blocking!(&self.blocking_cp, |conn| {
185            protocol_configs::dsl::protocol_configs
186                .select(max(protocol_configs::protocol_version))
187                .first::<Option<i64>>(conn)
188        })
189        .context("Failed reading latest protocol version from PostgresDB")?
190        .map_or(1, |v| v + 1);
191
192        // We end indexing at the protocol version of the latest epoch stored in the db.
193        let end = read_only_blocking!(&self.blocking_cp, |conn| {
194            epochs::dsl::epochs
195                .select(max(epochs::protocol_version))
196                .first::<Option<i64>>(conn)
197        })
198        .context("Failed reading latest epoch protocol version from PostgresDB")?
199        .unwrap_or(1);
200        Ok((start, end))
201    }
202
203    pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
204        read_only_blocking!(&self.blocking_cp, |conn| {
205            chain_identifier::dsl::chain_identifier
206                .select(chain_identifier::checkpoint_digest)
207                .first::<Vec<u8>>(conn)
208                .optional()
209        })
210        .context("Failed reading chain id from PostgresDB")
211    }
212
213    fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
214        read_only_blocking!(&self.blocking_cp, |conn| {
215            checkpoints::dsl::checkpoints
216                .select(max(checkpoints::sequence_number))
217                .first::<Option<i64>>(conn)
218                .map(|v| v.map(|v| v as u64))
219        })
220        .context("Failed reading latest checkpoint sequence number from PostgresDB")
221    }
222
223    fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
224        read_only_blocking!(&self.blocking_cp, |conn| {
225            checkpoints::dsl::checkpoints
226                .select((
227                    min(checkpoints::sequence_number),
228                    max(checkpoints::sequence_number),
229                ))
230                .first::<(Option<i64>, Option<i64>)>(conn)
231                .map(|(min, max)| {
232                    (
233                        min.unwrap_or_default() as u64,
234                        max.unwrap_or_default() as u64,
235                    )
236                })
237        })
238        .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
239    }
240
241    fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
242        read_only_blocking!(&self.blocking_cp, |conn| {
243            epochs::dsl::epochs
244                .select((min(epochs::epoch), max(epochs::epoch)))
245                .first::<(Option<i64>, Option<i64>)>(conn)
246                .map(|(min, max)| {
247                    (
248                        min.unwrap_or_default() as u64,
249                        max.unwrap_or_default() as u64,
250                    )
251                })
252        })
253        .context("Failed reading min and max epoch numbers from PostgresDB")
254    }
255
256    fn persist_display_updates(
257        &self,
258        display_updates: BTreeMap<String, StoredDisplay>,
259    ) -> Result<(), IndexerError> {
260        transactional_blocking_with_retry!(
261            &self.blocking_cp,
262            {
263                let value = display_updates.values().collect::<Vec<_>>();
264                |conn| self.persist_displays_in_existing_transaction(conn, value)
265            },
266            PG_DB_COMMIT_SLEEP_DURATION
267        )?;
268
269        Ok(())
270    }
271
272    fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
273        let guard = self
274            .metrics
275            .checkpoint_db_commit_latency_objects_chunks
276            .start_timer();
277        let len = objects.len();
278        let raw_query = r#"
279            INSERT INTO objects (
280                object_id,
281                object_version,
282                object_digest,
283                owner_type,
284                owner_id,
285                object_type,
286                object_type_package,
287                object_type_module,
288                object_type_name,
289                serialized_object,
290                coin_type,
291                coin_balance,
292                df_kind,
293                finalized_in_cp
294            )
295            SELECT
296                u.object_id,
297                u.object_version,
298                u.object_digest,
299                u.owner_type,
300                u.owner_id,
301                u.object_type,
302                u.object_type_package,
303                u.object_type_module,
304                u.object_type_name,
305                u.serialized_object,
306                u.coin_type,
307                u.coin_balance,
308                u.df_kind,
309                u.finalized_in_cp
310            FROM UNNEST(
311                $1::BYTEA[],
312                $2::BIGINT[],
313                $3::BYTEA[],
314                $4::SMALLINT[],
315                $5::BYTEA[],
316                $6::TEXT[],
317                $7::BYTEA[],
318                $8::TEXT[],
319                $9::TEXT[],
320                $10::BYTEA[],
321                $11::TEXT[],
322                $12::BIGINT[],
323                $13::SMALLINT[],
324                $14::BYTEA[],
325                $15::BIGINT[]
326            ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest, finalized_in_cp)
327            LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
328            WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = -1
329            ON CONFLICT (object_id) DO UPDATE
330            SET
331                object_version = EXCLUDED.object_version,
332                object_digest = EXCLUDED.object_digest,
333                owner_type = EXCLUDED.owner_type,
334                owner_id = EXCLUDED.owner_id,
335                object_type = EXCLUDED.object_type,
336                object_type_package = EXCLUDED.object_type_package,
337                object_type_module = EXCLUDED.object_type_module,
338                object_type_name = EXCLUDED.object_type_name,
339                serialized_object = EXCLUDED.serialized_object,
340                coin_type = EXCLUDED.coin_type,
341                coin_balance = EXCLUDED.coin_balance,
342                df_kind = EXCLUDED.df_kind,
343                finalized_in_cp = EXCLUDED.finalized_in_cp
344            WHERE EXCLUDED.object_version > objects.object_version
345        "#;
346        let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
347            .into_iter()
348            .map(LiveObject::split)
349            .map(|(indexed_object, tx_digest)| {
350                (
351                    StoredObject::from(indexed_object),
352                    tx_digest.into_inner().to_vec(),
353                )
354            })
355            .unzip();
356        let query = diesel::sql_query(raw_query)
357            .bind::<Array<Bytea>, _>(objects.object_ids)
358            .bind::<Array<BigInt>, _>(objects.object_versions)
359            .bind::<Array<Bytea>, _>(objects.object_digests)
360            .bind::<Array<SmallInt>, _>(objects.owner_types)
361            .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
362            .bind::<Array<Nullable<Text>>, _>(objects.object_types)
363            .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
364            .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
365            .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
366            .bind::<Array<Bytea>, _>(objects.serialized_objects)
367            .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
368            .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
369            .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
370            .bind::<Array<Bytea>, _>(tx_digests)
371            .bind::<Array<Nullable<BigInt>>, _>(objects.finalized_in_cps);
372        transactional_blocking_with_retry!(
373            &self.blocking_cp,
374            |conn| {
375                query.clone().execute(conn)?;
376                Ok::<(), IndexerError>(())
377            },
378            PG_DB_COMMIT_SLEEP_DURATION
379        )
380        .tap_ok(|_| {
381            let elapsed = guard.stop_and_record();
382            info!(elapsed, "Persisted {len} chunked objects");
383        })
384        .tap_err(|e| {
385            tracing::error!("failed to persist object mutations with error: {e}");
386        })
387    }
388
389    fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
390        let guard = self
391            .metrics
392            .checkpoint_db_commit_latency_objects_chunks
393            .start_timer();
394        let len = objects.len();
395        let raw_query = r#"
396            DELETE FROM objects
397            USING (
398                SELECT u.object_id, u.object_version
399                FROM UNNEST(
400                    $1::BYTEA[],
401                    $2::BIGINT[],
402                    $3::BYTEA[]
403                ) AS u(object_id, object_version, tx_digest)
404                LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
405                WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = -1
406            ) AS to_delete
407            WHERE objects.object_id = to_delete.object_id
408            AND objects.object_version < to_delete.object_version
409        "#;
410        let (object_ids, versions, tx_digests): (Vec<_>, Vec<_>, Vec<_>) = objects
411            .into_iter()
412            .map(|removed_object| {
413                (
414                    removed_object.object_id().as_bytes().to_vec(),
415                    removed_object.version() as i64,
416                    removed_object.transaction_digest.into_inner().to_vec(),
417                )
418            })
419            .multiunzip();
420        let query = diesel::sql_query(raw_query)
421            .bind::<Array<Bytea>, _>(object_ids)
422            .bind::<Array<BigInt>, _>(versions)
423            .bind::<Array<Bytea>, _>(tx_digests);
424        transactional_blocking_with_retry!(
425            &self.blocking_cp,
426            |conn| {
427                query.clone().execute(conn)?;
428                Ok::<(), IndexerError>(())
429            },
430            PG_DB_COMMIT_SLEEP_DURATION
431        )
432        .tap_ok(|_| {
433            let elapsed = guard.stop_and_record();
434            info!(elapsed, "Deleted {len} chunked objects");
435        })
436        .tap_err(|e| {
437            tracing::error!("failed to persist object deletions with error: {e}");
438        })
439    }
440
441    fn persist_checkpointed_objects_chunk(
442        &self,
443        objects: Vec<StoredCheckpointedObject>,
444    ) -> Result<(), IndexerError> {
445        use diesel::upsert::excluded;
446
447        let len = objects.len();
448        transactional_blocking_with_retry!(
449            &self.blocking_cp,
450            |conn| {
451                for chunk in objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
452                    on_conflict_do_update!(
453                        checkpointed_objects::table,
454                        chunk,
455                        checkpointed_objects::object_id,
456                        (
457                            checkpointed_objects::object_version
458                                .eq(excluded(checkpointed_objects::object_version)),
459                            checkpointed_objects::object_status
460                                .eq(excluded(checkpointed_objects::object_status)),
461                            checkpointed_objects::object_digest
462                                .eq(excluded(checkpointed_objects::object_digest)),
463                            checkpointed_objects::checkpoint_sequence_number
464                                .eq(excluded(checkpointed_objects::checkpoint_sequence_number)),
465                            checkpointed_objects::owner_type
466                                .eq(excluded(checkpointed_objects::owner_type)),
467                            checkpointed_objects::owner_id
468                                .eq(excluded(checkpointed_objects::owner_id)),
469                            checkpointed_objects::object_type
470                                .eq(excluded(checkpointed_objects::object_type)),
471                            checkpointed_objects::object_type_package
472                                .eq(excluded(checkpointed_objects::object_type_package)),
473                            checkpointed_objects::object_type_module
474                                .eq(excluded(checkpointed_objects::object_type_module)),
475                            checkpointed_objects::object_type_name
476                                .eq(excluded(checkpointed_objects::object_type_name)),
477                            checkpointed_objects::serialized_object
478                                .eq(excluded(checkpointed_objects::serialized_object)),
479                            checkpointed_objects::coin_type
480                                .eq(excluded(checkpointed_objects::coin_type)),
481                            checkpointed_objects::coin_balance
482                                .eq(excluded(checkpointed_objects::coin_balance)),
483                            checkpointed_objects::df_kind
484                                .eq(excluded(checkpointed_objects::df_kind)),
485                        ),
486                        conn
487                    );
488                }
489                Ok::<(), IndexerError>(())
490            },
491            PG_DB_COMMIT_SLEEP_DURATION
492        )
493        .tap_ok(|_| {
494            info!("Persisted {len} checkpointed objects");
495        })
496        .tap_err(|e| {
497            tracing::error!("failed to persist checkpointed objects: {e}");
498        })
499    }
500
501    fn persist_object_mutation_chunk_in_existing_transaction(
502        &self,
503        conn: &mut PgConnection,
504        mutated_object_mutation_chunk: Vec<StoredObject>,
505    ) -> Result<(), IndexerError> {
506        on_conflict_do_update_with_condition!(
507            objects::table,
508            mutated_object_mutation_chunk,
509            objects::object_id,
510            (
511                objects::object_id.eq(excluded(objects::object_id)),
512                objects::object_version.eq(excluded(objects::object_version)),
513                objects::object_digest.eq(excluded(objects::object_digest)),
514                objects::owner_type.eq(excluded(objects::owner_type)),
515                objects::owner_id.eq(excluded(objects::owner_id)),
516                objects::object_type.eq(excluded(objects::object_type)),
517                objects::serialized_object.eq(excluded(objects::serialized_object)),
518                objects::coin_type.eq(excluded(objects::coin_type)),
519                objects::coin_balance.eq(excluded(objects::coin_balance)),
520                objects::df_kind.eq(excluded(objects::df_kind)),
521                objects::finalized_in_cp.eq(excluded(objects::finalized_in_cp)),
522            ),
523            excluded(objects::object_version).gt(objects::object_version),
524            conn
525        );
526        Ok::<(), IndexerError>(())
527    }
528
529    fn persist_object_deletion_chunk_in_existing_transaction(
530        &self,
531        conn: &mut PgConnection,
532        deleted_objects_chunk: Vec<StoredDeletedObject>,
533    ) -> Result<(), IndexerError> {
534        let (object_ids, object_versions): (Vec<_>, Vec<_>) = deleted_objects_chunk
535            .iter()
536            .map(|o| (o.object_id.clone(), o.object_version))
537            .unzip();
538        let raw_query = r#"
539            DELETE FROM objects
540            USING UNNEST($1::BYTEA[], $2::BIGINT[]) AS to_delete(object_id, object_version)
541            WHERE objects.object_id = to_delete.object_id
542            AND objects.object_version < to_delete.object_version
543        "#;
544        diesel::sql_query(raw_query)
545            .bind::<Array<Bytea>, _>(object_ids)
546            .bind::<Array<BigInt>, _>(object_versions)
547            .execute(conn)
548            .map_err(IndexerError::from)
549            .context("Failed to write object deletion to PostgresDB")?;
550        Ok::<(), IndexerError>(())
551    }
552
553    fn persist_objects_backward_history_chunk(
554        &self,
555        stored: Vec<StoredBackwardHistoryObject>,
556    ) -> Result<(), IndexerError> {
557        persist_chunk_into_table!(objects_backward_history::table, stored, &self.blocking_cp)
558    }
559
560    fn persist_object_version_chunk(
561        &self,
562        object_versions: Vec<StoredObjectVersion>,
563    ) -> Result<(), IndexerError> {
564        let guard = self
565            .metrics
566            .checkpoint_db_commit_latency_objects_version_chunks
567            .start_timer();
568
569        transactional_blocking_with_retry!(
570            &self.blocking_cp,
571            |conn| {
572                for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
573                {
574                    insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
575                }
576                Ok::<(), IndexerError>(())
577            },
578            PG_DB_COMMIT_SLEEP_DURATION
579        )
580        .tap_ok(|_| {
581            let elapsed = guard.stop_and_record();
582            info!(
583                elapsed,
584                "Persisted {} chunked object versions",
585                object_versions.len(),
586            );
587        })
588        .tap_err(|e| {
589            tracing::error!("failed to persist object versions with error: {e}");
590        })
591    }
592
593    fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
594        let Some(first_checkpoint) = checkpoints.first() else {
595            return Ok(());
596        };
597
598        // If the first checkpoint has sequence number 0, we need to persist the digest
599        // as chain identifier.
600        if first_checkpoint.sequence_number == 0 {
601            let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
602            self.persist_protocol_configs_and_feature_flags(checkpoint_digest)?;
603            transactional_blocking_with_retry!(
604                &self.blocking_cp,
605                |conn| {
606                    let checkpoint_digest =
607                        first_checkpoint.checkpoint_digest.into_inner().to_vec();
608                    insert_or_ignore_into!(
609                        chain_identifier::table,
610                        StoredChainIdentifier { checkpoint_digest },
611                        conn
612                    );
613                    Ok::<(), IndexerError>(())
614                },
615                PG_DB_COMMIT_SLEEP_DURATION
616            )?;
617        }
618        let guard = self
619            .metrics
620            .checkpoint_db_commit_latency_checkpoints
621            .start_timer();
622
623        let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
624        transactional_blocking_with_retry!(
625            &self.blocking_cp,
626            |conn| {
627                for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
628                    insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
629                }
630                Ok::<(), IndexerError>(())
631            },
632            PG_DB_COMMIT_SLEEP_DURATION
633        )
634        .tap_ok(|_| {
635            info!(
636                "Persisted {} pruner_cp_watermark rows.",
637                stored_cp_txs.len(),
638            );
639        })
640        .tap_err(|e| {
641            tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
642        })?;
643
644        let stored_checkpoints = checkpoints
645            .iter()
646            .map(StoredCheckpoint::from)
647            .collect::<Vec<_>>();
648        transactional_blocking_with_retry!(
649            &self.blocking_cp,
650            |conn| {
651                for stored_checkpoint_chunk in
652                    stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
653                {
654                    insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
655                    let time_now_ms = chrono::Utc::now().timestamp_millis();
656                    for stored_checkpoint in stored_checkpoint_chunk {
657                        self.metrics
658                            .db_commit_lag_ms
659                            .set(time_now_ms - stored_checkpoint.timestamp_ms);
660                        self.metrics.max_committed_checkpoint_sequence_number.set(
661                            stored_checkpoint.sequence_number,
662                        );
663                        self.metrics.committed_checkpoint_timestamp_ms.set(
664                            stored_checkpoint.timestamp_ms,
665                        );
666                    }
667                    for stored_checkpoint in stored_checkpoint_chunk {
668                        info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
669                    }
670                }
671                Ok::<(), IndexerError>(())
672            },
673            PG_DB_COMMIT_SLEEP_DURATION
674        )
675        .tap_ok(|_| {
676            let elapsed = guard.stop_and_record();
677            info!(
678                elapsed,
679                "Persisted {} checkpoints",
680                stored_checkpoints.len()
681            );
682        })
683        .tap_err(|e| {
684            tracing::error!("failed to persist checkpoints with error: {e}");
685        })
686    }
687
688    fn persist_transactions_chunk(
689        &self,
690        transactions: Vec<IndexedTransaction>,
691    ) -> Result<(), IndexerError> {
692        let guard = self
693            .metrics
694            .checkpoint_db_commit_latency_transactions_chunks
695            .start_timer();
696        let transformation_guard = self
697            .metrics
698            .checkpoint_db_commit_latency_transactions_chunks_transformation
699            .start_timer();
700        let transactions = transactions
701            .iter()
702            .map(StoredTransaction::from)
703            .collect::<Vec<_>>();
704        drop(transformation_guard);
705
706        transactional_blocking_with_retry!(
707            &self.blocking_cp,
708            |conn| {
709                for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
710                    insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
711                }
712                Ok::<(), IndexerError>(())
713            },
714            PG_DB_COMMIT_SLEEP_DURATION
715        )
716        .tap_ok(|_| {
717            let elapsed = guard.stop_and_record();
718            info!(
719                elapsed,
720                "Persisted {} chunked transactions",
721                transactions.len()
722            );
723        })
724        .tap_err(|e| {
725            tracing::error!("failed to persist transactions with error: {e}");
726        })
727    }
728
729    fn persist_tx_global_order_chunk(
730        &self,
731        tx_order: Vec<TxGlobalOrder>,
732    ) -> Result<(), IndexerError> {
733        let guard = self
734            .metrics
735            .checkpoint_db_commit_latency_tx_insertion_order_chunks
736            .start_timer();
737
738        transactional_blocking_with_retry!(
739            &self.blocking_cp,
740            |conn| {
741                for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
742                    // Upsert: on conflict (row already inserted by optimistic path),
743                    // set `chk_tx_sequence_number` so checkpoint data is available
744                    // immediately.
745                    on_conflict_do_update_with_condition!(
746                        tx_global_order::table,
747                        tx_order_chunk,
748                        tx_global_order::tx_digest,
749                        tx_global_order::chk_tx_sequence_number
750                            .eq(excluded(tx_global_order::chk_tx_sequence_number)),
751                        tx_global_order::chk_tx_sequence_number.is_null(),
752                        conn
753                    );
754                }
755                Ok::<(), IndexerError>(())
756            },
757            PG_DB_COMMIT_SLEEP_DURATION
758        )
759        .tap_ok(|_| {
760            let elapsed = guard.stop_and_record();
761            info!(
762                elapsed,
763                "Persisted {} chunked txs insertion order",
764                tx_order.len()
765            );
766        })
767        .tap_err(|e| {
768            tracing::error!("failed to persist txs insertion order with error: {e}");
769        })
770    }
771
772    fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
773        let guard = self
774            .metrics
775            .checkpoint_db_commit_latency_events_chunks
776            .start_timer();
777        let len = events.len();
778        let events = events
779            .into_iter()
780            .map(StoredEvent::from)
781            .collect::<Vec<_>>();
782
783        transactional_blocking_with_retry!(
784            &self.blocking_cp,
785            |conn| {
786                for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
787                    insert_or_ignore_into!(events::table, event_chunk, conn);
788                }
789                Ok::<(), IndexerError>(())
790            },
791            PG_DB_COMMIT_SLEEP_DURATION
792        )
793        .tap_ok(|_| {
794            let elapsed = guard.stop_and_record();
795            info!(elapsed, "Persisted {} chunked events", len);
796        })
797        .tap_err(|e| {
798            tracing::error!("failed to persist events with error: {e}");
799        })
800    }
801
802    fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
803        if packages.is_empty() {
804            return Ok(());
805        }
806        let guard = self
807            .metrics
808            .checkpoint_db_commit_latency_packages
809            .start_timer();
810        let packages = packages
811            .into_iter()
812            .map(StoredPackage::from)
813            .collect::<Vec<_>>();
814        transactional_blocking_with_retry!(
815            &self.blocking_cp,
816            |conn| {
817                for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
818                    on_conflict_do_update!(
819                        packages::table,
820                        packages_chunk,
821                        packages::package_id,
822                        (
823                            packages::package_id.eq(excluded(packages::package_id)),
824                            packages::move_package.eq(excluded(packages::move_package)),
825                        ),
826                        conn
827                    );
828                }
829                Ok::<(), IndexerError>(())
830            },
831            PG_DB_COMMIT_SLEEP_DURATION
832        )
833        .tap_ok(|_| {
834            let elapsed = guard.stop_and_record();
835            info!(elapsed, "Persisted {} packages", packages.len());
836        })
837        .tap_err(|e| {
838            tracing::error!("failed to persist packages with error: {e}");
839        })
840    }
841
842    async fn persist_event_indices_chunk(
843        &self,
844        indices: Vec<EventIndex>,
845    ) -> Result<(), IndexerError> {
846        let guard = self
847            .metrics
848            .checkpoint_db_commit_latency_event_indices_chunks
849            .start_timer();
850        let len = indices.len();
851        let (
852            event_emit_packages,
853            event_emit_modules,
854            event_senders,
855            event_struct_packages,
856            event_struct_modules,
857            event_struct_names,
858            event_struct_instantiations,
859        ) = indices.into_iter().map(|i| i.split()).fold(
860            (
861                Vec::new(),
862                Vec::new(),
863                Vec::new(),
864                Vec::new(),
865                Vec::new(),
866                Vec::new(),
867                Vec::new(),
868            ),
869            |(
870                mut event_emit_packages,
871                mut event_emit_modules,
872                mut event_senders,
873                mut event_struct_packages,
874                mut event_struct_modules,
875                mut event_struct_names,
876                mut event_struct_instantiations,
877            ),
878             index| {
879                event_emit_packages.push(index.0);
880                event_emit_modules.push(index.1);
881                event_senders.push(index.2);
882                event_struct_packages.push(index.3);
883                event_struct_modules.push(index.4);
884                event_struct_names.push(index.5);
885                event_struct_instantiations.push(index.6);
886                (
887                    event_emit_packages,
888                    event_emit_modules,
889                    event_senders,
890                    event_struct_packages,
891                    event_struct_modules,
892                    event_struct_names,
893                    event_struct_instantiations,
894                )
895            },
896        );
897
898        // Now persist all the event indices in parallel into their tables.
899        let mut futures = vec![];
900        futures.push(self.spawn_blocking_task(move |this| {
901            persist_chunk_into_table!(
902                event_emit_package::table,
903                event_emit_packages,
904                &this.blocking_cp
905            )
906        }));
907
908        futures.push(self.spawn_blocking_task(move |this| {
909            persist_chunk_into_table!(
910                event_emit_module::table,
911                event_emit_modules,
912                &this.blocking_cp
913            )
914        }));
915
916        futures.push(self.spawn_blocking_task(move |this| {
917            persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
918        }));
919
920        futures.push(self.spawn_blocking_task(move |this| {
921            persist_chunk_into_table!(
922                event_struct_package::table,
923                event_struct_packages,
924                &this.blocking_cp
925            )
926        }));
927
928        futures.push(self.spawn_blocking_task(move |this| {
929            persist_chunk_into_table!(
930                event_struct_module::table,
931                event_struct_modules,
932                &this.blocking_cp
933            )
934        }));
935
936        futures.push(self.spawn_blocking_task(move |this| {
937            persist_chunk_into_table!(
938                event_struct_name::table,
939                event_struct_names,
940                &this.blocking_cp
941            )
942        }));
943
944        futures.push(self.spawn_blocking_task(move |this| {
945            persist_chunk_into_table!(
946                event_struct_instantiation::table,
947                event_struct_instantiations,
948                &this.blocking_cp
949            )
950        }));
951
952        futures::future::try_join_all(futures)
953            .await
954            .map_err(|e| {
955                tracing::error!("failed to join event indices futures in a chunk: {e}");
956                IndexerError::from(e)
957            })?
958            .into_iter()
959            .collect::<Result<Vec<_>, _>>()
960            .map_err(|e| {
961                IndexerError::PostgresWrite(format!(
962                    "Failed to persist all event indices in a chunk: {e:?}"
963                ))
964            })?;
965        let elapsed = guard.stop_and_record();
966        info!(elapsed, "Persisted {} chunked event indices", len);
967        Ok(())
968    }
969
970    async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
971        let guard = self
972            .metrics
973            .checkpoint_db_commit_latency_tx_indices_chunks
974            .start_timer();
975        let len = indices.len();
976
977        let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
978
979        let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
980        let recipients: Vec<_> = splits
981            .iter()
982            .flat_map(|ix| ix.tx_recipients.clone())
983            .collect();
984        let input_objects: Vec<_> = splits
985            .iter()
986            .flat_map(|ix| ix.tx_input_objects.clone())
987            .collect();
988        let changed_objects: Vec<_> = splits
989            .iter()
990            .flat_map(|ix| ix.tx_changed_objects.clone())
991            .collect();
992        let wrapped_or_deleted_objects: Vec<_> = splits
993            .iter()
994            .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
995            .collect();
996        let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
997        let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
998        let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
999        let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1000        let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1001
1002        let futures = [
1003            self.spawn_blocking_task(move |this| {
1004                persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1005            }),
1006            self.spawn_blocking_task(move |this| {
1007                persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1008            }),
1009            self.spawn_blocking_task(move |this| {
1010                persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1011            }),
1012            self.spawn_blocking_task(move |this| {
1013                persist_chunk_into_table!(
1014                    tx_changed_objects::table,
1015                    changed_objects,
1016                    &this.blocking_cp
1017                )
1018            }),
1019            self.spawn_blocking_task(move |this| {
1020                persist_chunk_into_table!(
1021                    tx_wrapped_or_deleted_objects::table,
1022                    wrapped_or_deleted_objects,
1023                    &this.blocking_cp
1024                )
1025            }),
1026            self.spawn_blocking_task(move |this| {
1027                persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1028            }),
1029            self.spawn_blocking_task(move |this| {
1030                persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1031            }),
1032            self.spawn_blocking_task(move |this| {
1033                persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1034            }),
1035            self.spawn_blocking_task(move |this| {
1036                persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1037            }),
1038            self.spawn_blocking_task(move |this| {
1039                persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1040            }),
1041        ];
1042
1043        futures::future::try_join_all(futures)
1044            .await
1045            .map_err(|e| {
1046                tracing::error!("failed to join tx indices futures in a chunk: {e}");
1047                IndexerError::from(e)
1048            })?
1049            .into_iter()
1050            .collect::<Result<Vec<_>, _>>()
1051            .map_err(|e| {
1052                IndexerError::PostgresWrite(format!(
1053                    "Failed to persist all tx indices in a chunk: {e:?}"
1054                ))
1055            })?;
1056        let elapsed = guard.stop_and_record();
1057        info!(elapsed, "Persisted {} chunked tx_indices", len);
1058        Ok(())
1059    }
1060
1061    fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1062        let guard = self
1063            .metrics
1064            .checkpoint_db_commit_latency_epoch
1065            .start_timer();
1066        let epoch_id = epoch.new_epoch.epoch;
1067
1068        transactional_blocking_with_retry!(
1069            &self.blocking_cp,
1070            |conn| {
1071                if let Some(last_epoch) = &epoch.last_epoch {
1072                    info!(last_epoch.epoch, "Persisting epoch end data.");
1073                    diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1074                        .set(last_epoch)
1075                        .execute(conn)?;
1076                }
1077
1078                info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1079                insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1080                Ok::<(), IndexerError>(())
1081            },
1082            PG_DB_COMMIT_SLEEP_DURATION
1083        )
1084        .tap_ok(|_| {
1085            let elapsed = guard.stop_and_record();
1086            info!(elapsed, epoch_id, "Persisted epoch beginning info");
1087        })
1088        .tap_err(|e| {
1089            tracing::error!("failed to persist epoch with error: {e}");
1090        })
1091    }
1092
1093    fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1094        let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1095        // partition_0 has been created, so no need to advance it.
1096        if let Some(last_epoch_id) = last_epoch_id {
1097            let last_db_epoch: Option<StoredEpochInfo> =
1098                read_only_blocking!(&self.blocking_cp, |conn| {
1099                    epochs::table
1100                        .filter(epochs::epoch.eq(last_epoch_id))
1101                        .first::<StoredEpochInfo>(conn)
1102                        .optional()
1103                })
1104                .context("Failed to read last epoch from PostgresDB")?;
1105            if let Some(last_epoch) = last_db_epoch {
1106                let epoch_partition_data =
1107                    EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1108                let table_partitions = self.partition_manager.get_table_partitions()?;
1109                for (table, (_, last_partition)) in table_partitions {
1110                    // Only advance epoch partition for epoch partitioned tables.
1111                    if !self
1112                        .partition_manager
1113                        .get_strategy(&table)
1114                        .is_epoch_partitioned()
1115                    {
1116                        continue;
1117                    }
1118                    let guard = self.metrics.advance_epoch_latency.start_timer();
1119                    self.partition_manager.advance_epoch(
1120                        table.clone(),
1121                        last_partition,
1122                        &epoch_partition_data,
1123                    )?;
1124                    let elapsed = guard.stop_and_record();
1125                    info!(
1126                        elapsed,
1127                        "Advanced epoch partition {} for table {}",
1128                        last_partition,
1129                        table.clone()
1130                    );
1131                }
1132            } else {
1133                tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1134            }
1135        }
1136
1137        Ok(())
1138    }
1139
1140    fn prune_checkpoints_table_by_range(
1141        &self,
1142        min_cp: u64,
1143        max_cp: u64,
1144    ) -> Result<(), IndexerError> {
1145        transactional_blocking_with_retry!(
1146            &self.blocking_cp,
1147            |conn| {
1148                diesel::delete(
1149                    checkpoints::table
1150                        .filter(checkpoints::sequence_number.between(min_cp as i64, max_cp as i64)),
1151                )
1152                .execute(conn)
1153                .map_err(IndexerError::from)
1154                .context("Failed to prune checkpoints table by range")?;
1155
1156                Ok::<(), IndexerError>(())
1157            },
1158            PG_DB_COMMIT_SLEEP_DURATION
1159        )
1160    }
1161
1162    /// Prunes tx_global_order table by transaction range using
1163    /// chk_tx_sequence_number
1164    fn prune_tx_global_order(
1165        &self,
1166        conn: &mut PgConnection,
1167        min_tx: i64,
1168        max_tx: i64,
1169    ) -> Result<(), IndexerError> {
1170        diesel::delete(
1171            tx_global_order::table
1172                .filter(tx_global_order::chk_tx_sequence_number.between(min_tx, max_tx)),
1173        )
1174        .execute(conn)
1175        .map_err(IndexerError::from)
1176        .context("Failed to prune tx_global_order table")
1177        .map(|_| ())
1178    }
1179
1180    /// Prunes a single transaction or event index table by transaction range
1181    fn prune_single_tx_or_event_table(
1182        &self,
1183        table: &crate::pruning::pruner::PrunableTable,
1184        min_tx: u64,
1185        max_tx: u64,
1186    ) -> Result<(), IndexerError> {
1187        use crate::pruning::pruner::PrunableTable;
1188
1189        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1190
1191        transactional_blocking_with_retry!(
1192            &self.blocking_cp,
1193            |conn| {
1194                match table {
1195                    // Event index tables
1196                    PrunableTable::EventEmitModule => {
1197                        prune_tx_or_event_indice_table!(
1198                            event_emit_module,
1199                            conn,
1200                            min_tx,
1201                            max_tx,
1202                            "Failed to prune event_emit_module table"
1203                        );
1204                    }
1205                    PrunableTable::EventEmitPackage => {
1206                        prune_tx_or_event_indice_table!(
1207                            event_emit_package,
1208                            conn,
1209                            min_tx,
1210                            max_tx,
1211                            "Failed to prune event_emit_package table"
1212                        );
1213                    }
1214                    PrunableTable::EventSenders => {
1215                        prune_tx_or_event_indice_table!(
1216                            event_senders,
1217                            conn,
1218                            min_tx,
1219                            max_tx,
1220                            "Failed to prune event_senders table"
1221                        );
1222                    }
1223                    PrunableTable::EventStructInstantiation => {
1224                        prune_tx_or_event_indice_table!(
1225                            event_struct_instantiation,
1226                            conn,
1227                            min_tx,
1228                            max_tx,
1229                            "Failed to prune event_struct_instantiation table"
1230                        );
1231                    }
1232                    PrunableTable::EventStructModule => {
1233                        prune_tx_or_event_indice_table!(
1234                            event_struct_module,
1235                            conn,
1236                            min_tx,
1237                            max_tx,
1238                            "Failed to prune event_struct_module table"
1239                        );
1240                    }
1241                    PrunableTable::EventStructName => {
1242                        prune_tx_or_event_indice_table!(
1243                            event_struct_name,
1244                            conn,
1245                            min_tx,
1246                            max_tx,
1247                            "Failed to prune event_struct_name table"
1248                        );
1249                    }
1250                    PrunableTable::EventStructPackage => {
1251                        prune_tx_or_event_indice_table!(
1252                            event_struct_package,
1253                            conn,
1254                            min_tx,
1255                            max_tx,
1256                            "Failed to prune event_struct_package table"
1257                        );
1258                    }
1259
1260                    // Transaction index tables
1261                    PrunableTable::TxSenders => {
1262                        prune_tx_or_event_indice_table!(
1263                            tx_senders,
1264                            conn,
1265                            min_tx,
1266                            max_tx,
1267                            "Failed to prune tx_senders table"
1268                        );
1269                    }
1270                    PrunableTable::TxRecipients => {
1271                        prune_tx_or_event_indice_table!(
1272                            tx_recipients,
1273                            conn,
1274                            min_tx,
1275                            max_tx,
1276                            "Failed to prune tx_recipients table"
1277                        );
1278                    }
1279                    PrunableTable::TxInputObjects => {
1280                        prune_tx_or_event_indice_table!(
1281                            tx_input_objects,
1282                            conn,
1283                            min_tx,
1284                            max_tx,
1285                            "Failed to prune tx_input_objects table"
1286                        );
1287                    }
1288                    PrunableTable::TxChangedObjects => {
1289                        prune_tx_or_event_indice_table!(
1290                            tx_changed_objects,
1291                            conn,
1292                            min_tx,
1293                            max_tx,
1294                            "Failed to prune tx_changed_objects table"
1295                        );
1296                    }
1297                    PrunableTable::TxWrappedOrDeletedObjects => {
1298                        prune_tx_or_event_indice_table!(
1299                            tx_wrapped_or_deleted_objects,
1300                            conn,
1301                            min_tx,
1302                            max_tx,
1303                            "Failed to prune tx_wrapped_or_deleted_objects table"
1304                        );
1305                    }
1306                    PrunableTable::TxCallsPkg => {
1307                        prune_tx_or_event_indice_table!(
1308                            tx_calls_pkg,
1309                            conn,
1310                            min_tx,
1311                            max_tx,
1312                            "Failed to prune tx_calls_pkg table"
1313                        );
1314                    }
1315                    PrunableTable::TxCallsMod => {
1316                        prune_tx_or_event_indice_table!(
1317                            tx_calls_mod,
1318                            conn,
1319                            min_tx,
1320                            max_tx,
1321                            "Failed to prune tx_calls_mod table"
1322                        );
1323                    }
1324                    PrunableTable::TxCallsFun => {
1325                        prune_tx_or_event_indice_table!(
1326                            tx_calls_fun,
1327                            conn,
1328                            min_tx,
1329                            max_tx,
1330                            "Failed to prune tx_calls_fun table"
1331                        );
1332                    }
1333                    PrunableTable::TxDigests => {
1334                        prune_tx_or_event_indice_table!(
1335                            tx_digests,
1336                            conn,
1337                            min_tx,
1338                            max_tx,
1339                            "Failed to prune tx_digests table"
1340                        );
1341                    }
1342                    PrunableTable::TxKinds => {
1343                        prune_tx_or_event_indice_table!(
1344                            tx_kinds,
1345                            conn,
1346                            min_tx,
1347                            max_tx,
1348                            "Failed to prune tx_kinds table"
1349                        );
1350                    }
1351                    PrunableTable::TxGlobalOrder => {
1352                        self.prune_tx_global_order(conn, min_tx, max_tx)?;
1353                    }
1354                    _ => {
1355                        return Err(IndexerError::InvalidArgument(format!(
1356                            "table {} is not a transaction or event index table",
1357                            table.as_ref()
1358                        )));
1359                    }
1360                }
1361                Ok::<(), IndexerError>(())
1362            },
1363            PG_DB_COMMIT_SLEEP_DURATION
1364        )
1365    }
1366
1367    /// Prune optimistic_transactions table by global_sequence_number range.
1368    /// Prunes at most `limit` rows and returns the number of rows deleted.
1369    fn prune_optimistic_tx_by_global_seq(
1370        &self,
1371        start: u64,
1372        end: u64,
1373        limit: i64,
1374    ) -> Result<usize, IndexerError> {
1375        use diesel::prelude::*;
1376
1377        transactional_blocking_with_retry!(
1378            &self.blocking_cp,
1379            |conn| {
1380                let sql = r#"
1381                    WITH ids_to_delete AS (
1382                         SELECT global_sequence_number, optimistic_sequence_number
1383                         FROM optimistic_transactions
1384                         WHERE global_sequence_number BETWEEN $1 AND $2
1385                         ORDER BY global_sequence_number, optimistic_sequence_number
1386                         FOR UPDATE
1387                         LIMIT $3
1388                     )
1389                     DELETE FROM optimistic_transactions otx
1390                     USING ids_to_delete
1391                     WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
1392                           (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
1393                "#;
1394                diesel::sql_query(sql)
1395                    .bind::<diesel::sql_types::BigInt, _>(start as i64)
1396                    .bind::<diesel::sql_types::BigInt, _>(end as i64)
1397                    .bind::<diesel::sql_types::BigInt, _>(limit)
1398                    .execute(conn)
1399                    .map_err(IndexerError::from)
1400                    .context(
1401                        format!(
1402                            "failed to prune optimistic_transactions table by global_sequence_number range [{start}..={end}] with limit {limit}"
1403                        )
1404                        .as_str(),
1405                    )
1406            },
1407            PG_DB_COMMIT_SLEEP_DURATION
1408        )
1409    }
1410
1411    fn prune_backward_history_by_checkpoint_with_limit(
1412        &self,
1413        start: u64,
1414        end: u64,
1415        limit: i64,
1416    ) -> Result<usize, IndexerError> {
1417        use diesel::prelude::*;
1418
1419        transactional_blocking_with_retry!(
1420            &self.blocking_cp,
1421            |conn| {
1422                let sql = r#"
1423                    WITH to_delete AS (
1424                        SELECT superseded_at_checkpoint, object_id, object_version
1425                        FROM objects_backward_history
1426                        WHERE superseded_at_checkpoint BETWEEN $1 AND $2
1427                        ORDER BY superseded_at_checkpoint, object_id, object_version
1428                        FOR UPDATE
1429                        LIMIT $3
1430                    )
1431                    DELETE FROM objects_backward_history bh
1432                    USING to_delete
1433                    WHERE (bh.superseded_at_checkpoint, bh.object_id, bh.object_version) =
1434                          (to_delete.superseded_at_checkpoint, to_delete.object_id, to_delete.object_version)
1435                "#;
1436                diesel::sql_query(sql)
1437                    .bind::<diesel::sql_types::BigInt, _>(start as i64)
1438                    .bind::<diesel::sql_types::BigInt, _>(end as i64)
1439                    .bind::<diesel::sql_types::BigInt, _>(limit)
1440                    .execute(conn)
1441                    .map_err(IndexerError::from)
1442                    .context(
1443                        format!(
1444                            "failed to prune objects_backward_history by checkpoint range [{start}..={end}] with limit {limit}"
1445                        )
1446                        .as_str(),
1447                    )
1448            },
1449            PG_DB_COMMIT_SLEEP_DURATION
1450        )
1451    }
1452
1453    fn prune_cp_tx_table_by_range(&self, min_cp: u64, max_cp: u64) -> Result<(), IndexerError> {
1454        transactional_blocking_with_retry!(
1455            &self.blocking_cp,
1456            |conn| {
1457                diesel::delete(
1458                    pruner_cp_watermark::table.filter(
1459                        pruner_cp_watermark::checkpoint_sequence_number
1460                            .between(min_cp as i64, max_cp as i64),
1461                    ),
1462                )
1463                .execute(conn)
1464                .map_err(IndexerError::from)
1465                .context("Failed to prune pruner_cp_watermark table by range")?;
1466                Ok::<(), IndexerError>(())
1467            },
1468            PG_DB_COMMIT_SLEEP_DURATION
1469        )
1470    }
1471
1472    fn prune_table_by_checkpoint_range(
1473        &self,
1474        table: &crate::pruning::pruner::PrunableTable,
1475        min_checkpoint: u64,
1476        max_checkpoint: u64,
1477    ) -> Result<(), IndexerError> {
1478        use crate::pruning::pruner::PrunableTable;
1479
1480        match table {
1481            PrunableTable::Checkpoints => {
1482                self.prune_checkpoints_table_by_range(min_checkpoint, max_checkpoint)
1483            }
1484            PrunableTable::PrunerCpWatermark => {
1485                self.prune_cp_tx_table_by_range(min_checkpoint, max_checkpoint)
1486            }
1487            _ => Err(IndexerError::InvalidArgument(format!(
1488                "table {} is not pruned by checkpoint",
1489                table.as_ref()
1490            ))),
1491        }
1492    }
1493
1494    fn get_network_total_transactions_by_end_of_epoch(
1495        &self,
1496        epoch: u64,
1497    ) -> Result<Option<u64>, IndexerError> {
1498        read_only_blocking!(&self.blocking_cp, |conn| {
1499            epochs::table
1500                .filter(epochs::epoch.eq(epoch as i64))
1501                .select(epochs::network_total_transactions)
1502                .get_result::<Option<i64>>(conn)
1503        })
1504        .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1505        .map(|option| option.map(|v| v as u64))
1506    }
1507
1508    fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1509        transactional_blocking_with_retry!(
1510            &self.blocking_cp,
1511            |conn| {
1512                diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1513                    .execute(conn)?;
1514                Ok::<(), IndexerError>(())
1515            },
1516            PG_DB_COMMIT_SLEEP_DURATION
1517        )
1518        .tap_ok(|_| {
1519            info!("Successfully refreshed participation_metrics");
1520        })
1521        .tap_err(|e| {
1522            tracing::error!("failed to refresh participation_metrics: {e}");
1523        })
1524    }
1525
1526    fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1527        &self,
1528        watermark: CommitterWatermark,
1529    ) -> Result<(), IndexerError>
1530    where
1531        E::Iterator: Iterator<Item: AsRef<str>>,
1532    {
1533        use diesel::query_dsl::methods::FilterDsl;
1534
1535        let guard = self
1536            .metrics
1537            .checkpoint_db_commit_latency_watermarks
1538            .start_timer();
1539
1540        let upper_bound_updates = E::iter()
1541            .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1542            .collect::<Vec<_>>();
1543
1544        transactional_blocking_with_retry!(
1545            &self.blocking_cp,
1546            |conn| {
1547                diesel::insert_into(watermarks::table)
1548                    .values(&upper_bound_updates)
1549                    .on_conflict(watermarks::entity)
1550                    .do_update()
1551                    .set((
1552                        watermarks::current_epoch.eq(excluded(watermarks::current_epoch)),
1553                        watermarks::max_committed_cp.eq(excluded(watermarks::max_committed_cp)),
1554                        watermarks::max_committed_tx.eq(excluded(watermarks::max_committed_tx)),
1555                    ))
1556                    .filter(excluded(watermarks::max_committed_cp).ge(watermarks::max_committed_cp))
1557                    .filter(excluded(watermarks::max_committed_tx).ge(watermarks::max_committed_tx))
1558                    .filter(excluded(watermarks::current_epoch).ge(watermarks::current_epoch))
1559                    .execute(conn)
1560                    .map_err(IndexerError::from)
1561                    .context("Failed to update watermarks upper bound")?;
1562                Ok::<(), IndexerError>(())
1563            },
1564            PG_DB_COMMIT_SLEEP_DURATION
1565        )
1566        .tap_ok(|_| {
1567            let elapsed = guard.stop_and_record();
1568            info!(elapsed, "Persisted watermarks");
1569        })
1570        .tap_err(|e| {
1571            tracing::error!("Failed to persist watermarks with error: {}", e);
1572        })
1573    }
1574
1575    fn map_epochs_to_cp_tx(
1576        &self,
1577        epochs: &[u64],
1578    ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1579        let pool = &self.blocking_cp;
1580        let results: Vec<(i64, i64, i64)> = run_query!(pool, move |conn| {
1581            epochs::table
1582                .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1583                .select((
1584                    epochs::epoch,
1585                    epochs::first_checkpoint_id,
1586                    epochs::first_tx_sequence_number,
1587                ))
1588                .load::<(i64, i64, i64)>(conn)
1589        })
1590        .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1591
1592        Ok(results
1593            .into_iter()
1594            .map(|(epoch, checkpoint, tx)| (epoch as u64, (checkpoint as u64, tx as u64)))
1595            .collect())
1596    }
1597
1598    fn update_watermarks_lower_bound(
1599        &self,
1600        watermarks: Vec<(PrunableTable, u64)>,
1601    ) -> Result<(), IndexerError> {
1602        use diesel::query_dsl::methods::FilterDsl;
1603
1604        let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1605        let epoch_mapping = self.map_epochs_to_cp_tx(&epochs)?;
1606        let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1607            .into_iter()
1608            .map(|(table, epoch)| {
1609                let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1610                    IndexerError::PersistentStorageDataCorruption(format!(
1611                        "epoch {epoch} not found in epoch mapping",
1612                    ))
1613                })?;
1614                Ok(StoredWatermark::from_lower_bound_update(
1615                    table.as_ref(),
1616                    epoch,
1617                    *checkpoint,
1618                    *tx,
1619                ))
1620            })
1621            .collect();
1622        let lower_bound_updates = lookups?;
1623        let guard = self
1624            .metrics
1625            .checkpoint_db_commit_latency_watermarks
1626            .start_timer();
1627        transactional_blocking_with_retry!(
1628            &self.blocking_cp,
1629            |conn| {
1630                diesel::insert_into(watermarks::table)
1631                    .values(&lower_bound_updates)
1632                    .on_conflict(watermarks::entity)
1633                    .do_update()
1634                    .set((
1635                        watermarks::min_available_cp.eq(excluded(watermarks::min_available_cp)),
1636                        watermarks::min_available_tx.eq(excluded(watermarks::min_available_tx)),
1637                        watermarks::min_available_epoch
1638                            .eq(excluded(watermarks::min_available_epoch)),
1639                        watermarks::min_bounds_updated_at_timestamp_ms.eq(sql::<
1640                            diesel::sql_types::BigInt,
1641                        >(
1642                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1643                        )),
1644                    ))
1645                    .filter(excluded(watermarks::min_available_cp).gt(watermarks::min_available_cp))
1646                    .filter(excluded(watermarks::min_available_tx).gt(watermarks::min_available_tx))
1647                    .filter(
1648                        excluded(watermarks::min_available_epoch)
1649                            .gt(watermarks::min_available_epoch),
1650                    )
1651                    .filter(
1652                        diesel::dsl::sql::<diesel::sql_types::BigInt>(
1653                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1654                        )
1655                        .gt(watermarks::min_bounds_updated_at_timestamp_ms),
1656                    )
1657                    .execute(conn)
1658            },
1659            PG_DB_COMMIT_SLEEP_DURATION
1660        )
1661        .tap_ok(|_| {
1662            let elapsed = guard.stop_and_record();
1663            tracing::info!(elapsed, "Persisted watermarks lower bounds");
1664        })
1665        .tap_err(|e| {
1666            tracing::error!("Failed to persist watermarks with error: {}", e);
1667        })?;
1668        Ok(())
1669    }
1670
1671    fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1672        // read_only transaction, otherwise this will block and get blocked by write
1673        // transactions to the same table.
1674        run_query_with_retry!(
1675            &self.blocking_cp,
1676            |conn| {
1677                let stored = watermarks::table
1678                    .load::<StoredWatermark>(conn)
1679                    .map_err(Into::into)
1680                    .context("Failed reading watermarks from PostgresDB")?;
1681                let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1682                    "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1683                ))
1684                .get_result(conn)
1685                .map_err(Into::into)
1686                .context("Failed reading current timestamp from PostgresDB")?;
1687                Ok::<_, IndexerError>((stored, timestamp))
1688            },
1689            PG_DB_COMMIT_SLEEP_DURATION
1690        )
1691    }
1692
1693    fn update_watermark_lowest_unpruned_key(
1694        &self,
1695        table: &PrunableTable,
1696        lowest_unpruned_key: u64,
1697    ) -> Result<(), IndexerError> {
1698        transactional_blocking_with_retry!(
1699            &self.blocking_cp,
1700            |conn| {
1701                diesel::update(watermarks::table.filter(watermarks::entity.eq(table.as_ref())))
1702                    .set(watermarks::lowest_unpruned_key.eq(lowest_unpruned_key as i64))
1703                    .execute(conn)
1704                    .map_err(IndexerError::from)
1705                    .context("failed to update watermark lowest_unpruned_key")?;
1706                Ok::<(), IndexerError>(())
1707            },
1708            PG_DB_COMMIT_SLEEP_DURATION
1709        )
1710    }
1711
1712    fn get_watermark_by_entity(
1713        &self,
1714        entity: &str,
1715    ) -> Result<Option<StoredWatermark>, IndexerError> {
1716        run_query_with_retry!(
1717            &self.blocking_cp,
1718            |conn| {
1719                watermarks::table
1720                    .filter(watermarks::entity.eq(entity))
1721                    .first::<StoredWatermark>(conn)
1722                    .optional()
1723                    .map_err(Into::into)
1724                    .context("failed reading watermark by entity from PostgresDB")
1725            },
1726            PG_DB_COMMIT_SLEEP_DURATION
1727        )
1728    }
1729
1730    async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1731    where
1732        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1733        R: Send + 'static,
1734    {
1735        let this = self.clone();
1736        let current_span = tracing::Span::current();
1737        tokio::task::spawn_blocking(move || {
1738            mark_in_blocking_pool();
1739            let _guard = current_span.enter();
1740            f(this)
1741        })
1742        .await
1743        .map_err(Into::into)
1744        .and_then(std::convert::identity)
1745    }
1746
1747    fn spawn_blocking_task<F, R>(
1748        &self,
1749        f: F,
1750    ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1751    where
1752        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1753        R: Send + 'static,
1754    {
1755        let this = self.clone();
1756        let current_span = tracing::Span::current();
1757        let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1758        tokio::task::spawn_blocking(move || {
1759            mark_in_blocking_pool();
1760            let _guard = current_span.enter();
1761            let _elapsed = guard.stop_and_record();
1762            f(this)
1763        })
1764    }
1765
1766    fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1767    where
1768        F: FnOnce(Self) -> Fut + Send + 'static,
1769        Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1770        R: Send + 'static,
1771    {
1772        let this = self.clone();
1773        tokio::task::spawn(async move { f(this).await })
1774    }
1775}
1776
1777#[async_trait]
1778impl IndexerStore for PgIndexerStore {
1779    async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1780        self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1781            .await
1782    }
1783
1784    async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1785        self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1786            .await
1787    }
1788
1789    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1790        self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1791            .await
1792    }
1793
1794    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1795        self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1796            .await
1797    }
1798
1799    fn persist_objects_in_existing_transaction(
1800        &self,
1801        conn: &mut PgConnection,
1802        object_changes: Vec<TransactionObjectChangesToCommit>,
1803    ) -> Result<(), IndexerError> {
1804        if object_changes.is_empty() {
1805            return Ok(());
1806        }
1807
1808        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1809        let object_mutations = indexed_mutations
1810            .into_iter()
1811            .map(StoredObject::from)
1812            .collect::<Vec<_>>();
1813        let object_deletions = indexed_deletions
1814            .into_iter()
1815            .map(StoredDeletedObject::from)
1816            .collect::<Vec<_>>();
1817
1818        self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1819        self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1820
1821        Ok(())
1822    }
1823
1824    async fn persist_object_versions(
1825        &self,
1826        object_versions: Vec<StoredObjectVersion>,
1827    ) -> Result<(), IndexerError> {
1828        if object_versions.is_empty() {
1829            return Ok(());
1830        }
1831
1832        let guard = self
1833            .metrics
1834            .checkpoint_db_commit_latency_objects_version
1835            .start_timer();
1836
1837        let object_versions_count = object_versions.len();
1838
1839        let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1840        let futures = chunks
1841            .into_iter()
1842            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1843            .collect::<Vec<_>>();
1844
1845        futures::future::try_join_all(futures)
1846            .await
1847            .map_err(|e| {
1848                tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1849                IndexerError::from(e)
1850            })?
1851            .into_iter()
1852            .collect::<Result<Vec<_>, _>>()
1853            .map_err(|e| {
1854                IndexerError::PostgresWrite(format!(
1855                    "Failed to persist all objects version chunks: {e:?}"
1856                ))
1857            })?;
1858        let elapsed = guard.stop_and_record();
1859        info!(elapsed, "Persisted {object_versions_count} object versions");
1860        Ok(())
1861    }
1862
1863    async fn persist_checkpoints(
1864        &self,
1865        checkpoints: Vec<IndexedCheckpoint>,
1866    ) -> Result<(), IndexerError> {
1867        self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1868            .await
1869    }
1870
1871    async fn persist_transactions(
1872        &self,
1873        transactions: Vec<IndexedTransaction>,
1874    ) -> Result<(), IndexerError> {
1875        let guard = self
1876            .metrics
1877            .checkpoint_db_commit_latency_transactions
1878            .start_timer();
1879        let len = transactions.len();
1880
1881        let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1882        let futures = chunks
1883            .into_iter()
1884            .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1885
1886        futures::future::try_join_all(futures)
1887            .await
1888            .map_err(|e| {
1889                tracing::error!("failed to join persist_transactions_chunk futures: {e}");
1890                IndexerError::from(e)
1891            })?
1892            .into_iter()
1893            .collect::<Result<Vec<_>, _>>()
1894            .map_err(|e| {
1895                IndexerError::PostgresWrite(format!(
1896                    "Failed to persist all transactions chunks: {e:?}"
1897                ))
1898            })?;
1899        let elapsed = guard.stop_and_record();
1900        info!(elapsed, "Persisted {} transactions", len);
1901        Ok(())
1902    }
1903
1904    fn persist_optimistic_transaction_in_existing_transaction(
1905        &self,
1906        conn: &mut PgConnection,
1907        transaction: OptimisticTransaction,
1908    ) -> Result<(), IndexerError> {
1909        insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1910        Ok(())
1911    }
1912
1913    async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1914        if events.is_empty() {
1915            return Ok(());
1916        }
1917        let len = events.len();
1918        let guard = self
1919            .metrics
1920            .checkpoint_db_commit_latency_events
1921            .start_timer();
1922        let chunks = chunk!(events, self.config.parallel_chunk_size);
1923        let futures = chunks
1924            .into_iter()
1925            .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1926
1927        futures::future::try_join_all(futures)
1928            .await
1929            .map_err(|e| {
1930                tracing::error!("failed to join persist_events_chunk futures: {e}");
1931                IndexerError::from(e)
1932            })?
1933            .into_iter()
1934            .collect::<Result<Vec<_>, _>>()
1935            .map_err(|e| {
1936                IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
1937            })?;
1938        let elapsed = guard.stop_and_record();
1939        info!(elapsed, "Persisted {} events", len);
1940        Ok(())
1941    }
1942
1943    async fn persist_displays(
1944        &self,
1945        display_updates: BTreeMap<String, StoredDisplay>,
1946    ) -> Result<(), IndexerError> {
1947        if display_updates.is_empty() {
1948            return Ok(());
1949        }
1950
1951        self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
1952            .await?
1953    }
1954
1955    fn persist_displays_in_existing_transaction(
1956        &self,
1957        conn: &mut PgConnection,
1958        display_updates: Vec<&StoredDisplay>,
1959    ) -> Result<(), IndexerError> {
1960        if display_updates.is_empty() {
1961            return Ok(());
1962        }
1963
1964        on_conflict_do_update_with_condition!(
1965            display::table,
1966            display_updates,
1967            display::object_type,
1968            (
1969                display::id.eq(excluded(display::id)),
1970                display::version.eq(excluded(display::version)),
1971                display::bcs.eq(excluded(display::bcs)),
1972            ),
1973            excluded(display::version).gt(display::version),
1974            conn
1975        );
1976
1977        Ok(())
1978    }
1979
1980    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1981        if packages.is_empty() {
1982            return Ok(());
1983        }
1984        self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
1985            .await
1986    }
1987
1988    async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
1989        if indices.is_empty() {
1990            return Ok(());
1991        }
1992        let len = indices.len();
1993        let guard = self
1994            .metrics
1995            .checkpoint_db_commit_latency_event_indices
1996            .start_timer();
1997        let chunks = chunk!(indices, self.config.parallel_chunk_size);
1998
1999        let futures = chunks.into_iter().map(|chunk| {
2000            self.spawn_task(move |this: Self| async move {
2001                this.persist_event_indices_chunk(chunk).await
2002            })
2003        });
2004
2005        futures::future::try_join_all(futures)
2006            .await
2007            .map_err(|e| {
2008                tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
2009                IndexerError::from(e)
2010            })?
2011            .into_iter()
2012            .collect::<Result<Vec<_>, _>>()
2013            .map_err(|e| {
2014                IndexerError::PostgresWrite(format!(
2015                    "Failed to persist all event_indices chunks: {e:?}"
2016                ))
2017            })?;
2018        let elapsed = guard.stop_and_record();
2019        info!(elapsed, "Persisted {} event_indices chunks", len);
2020        Ok(())
2021    }
2022
2023    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2024        self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2025            .await
2026    }
2027
2028    async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2029        self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2030            .await
2031    }
2032
2033    async fn get_network_total_transactions_by_end_of_epoch(
2034        &self,
2035        epoch: u64,
2036    ) -> Result<Option<u64>, IndexerError> {
2037        self.execute_in_blocking_worker(move |this| {
2038            this.get_network_total_transactions_by_end_of_epoch(epoch)
2039        })
2040        .await
2041    }
2042
2043    async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2044        self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2045            .await
2046    }
2047
2048    async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2049        &self,
2050        watermark: CommitterWatermark,
2051    ) -> Result<(), IndexerError>
2052    where
2053        E::Iterator: Iterator<Item: AsRef<str>>,
2054    {
2055        self.execute_in_blocking_worker(move |this| {
2056            this.update_watermarks_upper_bound::<E>(watermark)
2057        })
2058        .await
2059    }
2060
2061    fn as_any(&self) -> &dyn StdAny {
2062        self
2063    }
2064
2065    /// Persist protocol configs and feature flags until the protocol version
2066    /// for the latest epoch we have stored in the db, inclusive.
2067    fn persist_protocol_configs_and_feature_flags(
2068        &self,
2069        chain_id: Vec<u8>,
2070    ) -> Result<(), IndexerError> {
2071        let chain_id = ChainIdentifier::from(
2072            CheckpointDigest::from_bytes(chain_id).expect("unable to convert chain id"),
2073        );
2074
2075        let mut all_configs = vec![];
2076        let mut all_flags = vec![];
2077
2078        let (start_version, end_version) = self.get_protocol_version_index_range()?;
2079        info!(
2080            "Persisting protocol configs with start_version: {}, end_version: {}",
2081            start_version, end_version
2082        );
2083
2084        // Gather all protocol configs and feature flags for all versions between start
2085        // and end.
2086        for version in start_version..=end_version {
2087            let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2088                (version as u64).into(),
2089                chain_id.chain(),
2090            )
2091            .ok_or(IndexerError::Generic(format!(
2092                "Unable to fetch protocol version {} and chain {:?}",
2093                version,
2094                chain_id.chain()
2095            )))?;
2096            let configs_vec = protocol_configs
2097                .attr_map()
2098                .into_iter()
2099                .map(|(k, v)| StoredProtocolConfig {
2100                    protocol_version: version,
2101                    config_name: k,
2102                    config_value: v.map(|v| v.to_string()),
2103                })
2104                .collect::<Vec<_>>();
2105            all_configs.extend(configs_vec);
2106
2107            let feature_flags = protocol_configs
2108                .feature_map()
2109                .into_iter()
2110                .map(|(k, v)| StoredFeatureFlag {
2111                    protocol_version: version,
2112                    flag_name: k,
2113                    flag_value: v,
2114                })
2115                .collect::<Vec<_>>();
2116            all_flags.extend(feature_flags);
2117        }
2118
2119        transactional_blocking_with_retry!(
2120            &self.blocking_cp,
2121            |conn| {
2122                for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2123                    insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2124                }
2125                for flag_chunk in all_flags.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2126                    insert_or_ignore_into!(feature_flags::table, flag_chunk, conn);
2127                }
2128                Ok::<(), IndexerError>(())
2129            },
2130            PG_DB_COMMIT_SLEEP_DURATION
2131        )?;
2132        Ok(())
2133    }
2134
2135    async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2136        if indices.is_empty() {
2137            return Ok(());
2138        }
2139        let len = indices.len();
2140        let guard = self
2141            .metrics
2142            .checkpoint_db_commit_latency_tx_indices
2143            .start_timer();
2144        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2145
2146        let futures = chunks.into_iter().map(|chunk| {
2147            self.spawn_task(move |this: Self| async move {
2148                this.persist_tx_indices_chunk_v2(chunk).await
2149            })
2150        });
2151        futures::future::try_join_all(futures)
2152            .await
2153            .map_err(|e| {
2154                tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2155                IndexerError::from(e)
2156            })?
2157            .into_iter()
2158            .collect::<Result<Vec<_>, _>>()
2159            .map_err(|e| {
2160                IndexerError::PostgresWrite(format!(
2161                    "Failed to persist all tx_indices chunks: {e:?}"
2162                ))
2163            })?;
2164        let elapsed = guard.stop_and_record();
2165        info!(elapsed, "Persisted {} tx_indices chunks", len);
2166        Ok(())
2167    }
2168
2169    async fn persist_objects(
2170        &self,
2171        objects: Vec<CheckpointObjectChanges>,
2172    ) -> Result<(), IndexerError> {
2173        if objects.is_empty() {
2174            return Ok(());
2175        }
2176        let guard = self
2177            .metrics
2178            .checkpoint_db_commit_latency_objects
2179            .start_timer();
2180        let CheckpointObjectChanges {
2181            changed_objects: mutations,
2182            deleted_objects: deletions,
2183        } = retain_latest_objects_from_checkpoint_batch(objects);
2184        let mutation_len = mutations.len();
2185        let deletion_len = deletions.len();
2186
2187        let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2188        let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2189        let mutation_futures = mutation_chunks
2190            .into_iter()
2191            .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2192        let deletion_futures = deletion_chunks
2193            .into_iter()
2194            .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2195        futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2196            .await
2197            .map_err(|e| {
2198                tracing::error!("failed to join futures for persisting objects: {e}");
2199                IndexerError::from(e)
2200            })?
2201            .into_iter()
2202            .collect::<Result<Vec<_>, _>>()
2203            .map_err(|e| {
2204                IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2205            })?;
2206
2207        let elapsed = guard.stop_and_record();
2208        info!(
2209            elapsed,
2210            "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2211        );
2212        Ok(())
2213    }
2214
2215    async fn persist_object_backward_history(
2216        &self,
2217        objects: Vec<StoredBackwardHistoryObject>,
2218    ) -> Result<(), IndexerError> {
2219        if objects.is_empty() {
2220            return Ok(());
2221        }
2222        let len = objects.len();
2223        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
2224        let futures = chunks.into_iter().map(|c| {
2225            self.spawn_blocking_task(move |this| this.persist_objects_backward_history_chunk(c))
2226        });
2227
2228        futures::future::try_join_all(futures)
2229            .await
2230            .map_err(|e| {
2231                tracing::error!(
2232                    "failed to join persist_objects_backward_history_chunk futures: {e}"
2233                );
2234                IndexerError::from(e)
2235            })?
2236            .into_iter()
2237            .collect::<Result<Vec<_>, _>>()
2238            .map_err(|e| {
2239                IndexerError::PostgresWrite(format!(
2240                    "Failed to persist all objects backward history chunks: {e:?}"
2241                ))
2242            })?;
2243        info!("Persisted {} objects backward history", len);
2244        Ok(())
2245    }
2246
2247    async fn persist_checkpointed_objects(
2248        &self,
2249        objects: Vec<CheckpointObjectChanges>,
2250    ) -> Result<(), IndexerError> {
2251        if objects.is_empty() {
2252            return Ok(());
2253        }
2254        let CheckpointObjectChanges {
2255            changed_objects: mutations,
2256            deleted_objects: deletions,
2257        } = retain_latest_objects_from_checkpoint_batch(objects);
2258        let mutation_len = mutations.len();
2259        let deletion_len = deletions.len();
2260
2261        let checkpointed_objects: Vec<StoredCheckpointedObject> = mutations
2262            .into_iter()
2263            .map(|live| {
2264                let (indexed, _tx_digest) = live.split();
2265                StoredCheckpointedObject::try_from(indexed)
2266            })
2267            .chain(
2268                deletions
2269                    .into_iter()
2270                    .map(|removed| Ok(StoredCheckpointedObject::from(removed.indexed_object))),
2271            )
2272            .collect::<Result<Vec<_>, IndexerError>>()?;
2273
2274        let len = checkpointed_objects.len();
2275        let chunks = chunk!(
2276            checkpointed_objects,
2277            self.config.parallel_objects_chunk_size
2278        );
2279        let futures = chunks.into_iter().map(|c| {
2280            self.spawn_blocking_task(move |this| this.persist_checkpointed_objects_chunk(c))
2281        });
2282        futures::future::try_join_all(futures)
2283            .await
2284            .map_err(|e| {
2285                tracing::error!("failed to join futures for persisting checkpointed objects: {e}");
2286                IndexerError::from(e)
2287            })?
2288            .into_iter()
2289            .collect::<Result<Vec<_>, _>>()
2290            .map_err(|e| {
2291                IndexerError::PostgresWrite(format!(
2292                    "Failed to persist all checkpointed object chunks: {e:?}",
2293                ))
2294            })?;
2295
2296        info!(
2297            "Persisted {len} checkpointed objects ({mutation_len} mutations, {deletion_len} deletions)",
2298        );
2299        Ok(())
2300    }
2301
2302    async fn persist_tx_global_order(
2303        &self,
2304        tx_order: Vec<TxGlobalOrder>,
2305    ) -> Result<(), IndexerError> {
2306        let guard = self
2307            .metrics
2308            .checkpoint_db_commit_latency_tx_insertion_order
2309            .start_timer();
2310        let len = tx_order.len();
2311
2312        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2313        let futures = chunks
2314            .into_iter()
2315            .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2316
2317        futures::future::try_join_all(futures)
2318            .await
2319            .map_err(|e| {
2320                tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2321                IndexerError::from(e)
2322            })?
2323            .into_iter()
2324            .collect::<Result<Vec<_>, _>>()
2325            .map_err(|e| {
2326                IndexerError::PostgresWrite(format!(
2327                    "Failed to persist all txs insertion order chunks: {e:?}",
2328                ))
2329            })?;
2330        let elapsed = guard.stop_and_record();
2331        info!(elapsed, "Persisted {len} txs insertion orders");
2332        Ok(())
2333    }
2334
2335    async fn update_watermarks_lower_bound(
2336        &self,
2337        watermarks: Vec<(PrunableTable, u64)>,
2338    ) -> Result<(), IndexerError> {
2339        self.execute_in_blocking_worker(move |this| this.update_watermarks_lower_bound(watermarks))
2340            .await
2341    }
2342
2343    async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2344        self.execute_in_blocking_worker(move |this| this.get_watermarks())
2345            .await
2346    }
2347
2348    async fn prune_table_by_checkpoint_range(
2349        &self,
2350        table: &crate::pruning::pruner::PrunableTable,
2351        min_checkpoint: u64,
2352        max_checkpoint: u64,
2353    ) -> Result<(), IndexerError> {
2354        let table_clone = *table;
2355        self.execute_in_blocking_worker(move |this| {
2356            this.prune_table_by_checkpoint_range(&table_clone, min_checkpoint, max_checkpoint)
2357        })
2358        .await
2359    }
2360
2361    async fn prune_table_by_tx_range(
2362        &self,
2363        table: &crate::pruning::pruner::PrunableTable,
2364        min_tx: u64,
2365        max_tx: u64,
2366    ) -> Result<(), IndexerError> {
2367        let table_clone = *table;
2368        self.execute_in_blocking_worker(move |this| {
2369            this.prune_single_tx_or_event_table(&table_clone, min_tx, max_tx)
2370        })
2371        .await
2372    }
2373
2374    async fn prune_table_by_global_seq_with_limit(
2375        &self,
2376        table: &crate::pruning::pruner::PrunableTable,
2377        start: u64,
2378        end: u64,
2379        limit: i64,
2380    ) -> Result<usize, IndexerError> {
2381        use crate::pruning::pruner::PrunableTable;
2382
2383        if !matches!(table, PrunableTable::OptimisticTransactions) {
2384            return Err(IndexerError::InvalidArgument(format!(
2385                "table {} does not support pruning by global order with limit",
2386                table.as_ref()
2387            )));
2388        }
2389
2390        self.execute_in_blocking_worker(move |this| {
2391            this.prune_optimistic_tx_by_global_seq(start, end, limit)
2392        })
2393        .await
2394    }
2395
2396    async fn prune_table_by_checkpoint_with_limit(
2397        &self,
2398        table: &crate::pruning::pruner::PrunableTable,
2399        start: u64,
2400        end: u64,
2401        limit: i64,
2402    ) -> Result<usize, IndexerError> {
2403        use crate::pruning::pruner::PrunableTable;
2404
2405        if !matches!(table, PrunableTable::ObjectsBackwardHistory) {
2406            return Err(IndexerError::InvalidArgument(format!(
2407                "table {} does not support pruning by checkpoint with limit",
2408                table.as_ref()
2409            )));
2410        }
2411
2412        self.execute_in_blocking_worker(move |this| {
2413            this.prune_backward_history_by_checkpoint_with_limit(start, end, limit)
2414        })
2415        .await
2416    }
2417
2418    async fn update_watermark_lowest_unpruned_key(
2419        &self,
2420        table: &PrunableTable,
2421        lowest_unpruned_key: u64,
2422    ) -> Result<(), IndexerError> {
2423        let table = *table;
2424        self.execute_in_blocking_worker(move |this| {
2425            this.update_watermark_lowest_unpruned_key(&table, lowest_unpruned_key)
2426        })
2427        .await
2428    }
2429
2430    async fn get_watermark_by_entity(
2431        &self,
2432        entity: String,
2433    ) -> Result<Option<StoredWatermark>, IndexerError> {
2434        self.execute_in_blocking_worker(move |this| this.get_watermark_by_entity(&entity))
2435            .await
2436    }
2437}
2438
2439/// Partitions object changes into deletions and mutations.
2440///
2441/// Retains only the highest version of each object among deletions and
2442/// mutations. This allows concurrent insertion into the DB of the resulting
2443/// partitions.
2444fn retain_latest_indexed_objects(
2445    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2446) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2447    use std::collections::HashMap;
2448
2449    let mut mutations = HashMap::<ObjectId, IndexedObject>::new();
2450    let mut deletions = HashMap::<ObjectId, IndexedDeletedObject>::new();
2451
2452    for change in tx_object_changes {
2453        // Remove mutation / deletion with a following deletion / mutation,
2454        // as we expect that following deletion / mutation has a higher version.
2455        // Technically, assertions below are not required, double check just in case.
2456        for mutation in change.changed_objects {
2457            let id = mutation.object.id();
2458            let version = mutation.object.version();
2459
2460            if let Some(existing) = deletions.remove(&id) {
2461                assert!(
2462                    existing.object_version < version,
2463                    "mutation version ({version}) should be greater than existing deletion version ({}) for object {id}",
2464                    existing.object_version
2465                );
2466            }
2467
2468            if let Some(existing) = mutations.insert(id, mutation) {
2469                assert!(
2470                    existing.object.version() < version,
2471                    "mutation version ({version}) should be greater than existing mutation version ({}) for object {id}",
2472                    existing.object.version()
2473                );
2474            }
2475        }
2476        // Handle deleted objects
2477        for deletion in change.deleted_objects {
2478            let id = deletion.object_id;
2479            let version = deletion.object_version;
2480
2481            if let Some(existing) = mutations.remove(&id) {
2482                assert!(
2483                    existing.object.version() < version,
2484                    "deletion version ({version}) should be greater than existing mutation version ({}) for object {id}",
2485                    existing.object.version(),
2486                );
2487            }
2488
2489            if let Some(existing) = deletions.insert(id, deletion) {
2490                assert!(
2491                    existing.object_version < version,
2492                    "deletion version ({version}) should be greater than existing deletion version ({}) for object {id}",
2493                    existing.object_version
2494                );
2495            }
2496        }
2497    }
2498
2499    (
2500        mutations.into_values().collect(),
2501        deletions.into_values().collect(),
2502    )
2503}