iota_indexer/store/
pg_indexer_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use core::result::Result::Ok;
6use std::{
7    any::Any as StdAny,
8    collections::{BTreeMap, HashMap},
9    time::{Duration, Instant},
10};
11
12use async_trait::async_trait;
13use diesel::{
14    ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
15    dsl::{max, min},
16    upsert::excluded,
17};
18use downcast::Any;
19use futures::future::Either;
20use iota_protocol_config::ProtocolConfig;
21use iota_types::{
22    base_types::ObjectID,
23    digests::{ChainIdentifier, CheckpointDigest},
24};
25use itertools::Itertools;
26use tap::TapFallible;
27use tracing::info;
28
29use super::{
30    IndexerStore,
31    pg_partition_manager::{EpochPartitionData, PgPartitionManager},
32};
33use crate::{
34    db::ConnectionPool,
35    errors::{Context, IndexerError},
36    handlers::{EpochToCommit, TransactionObjectChangesToCommit},
37    insert_or_ignore_into,
38    metrics::IndexerMetrics,
39    models::{
40        checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
41        display::StoredDisplay,
42        epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
43        event_indices::OptimisticEventIndices,
44        events::{OptimisticEvent, StoredEvent},
45        obj_indices::StoredObjectVersion,
46        objects::{StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot},
47        packages::StoredPackage,
48        transactions::{OptimisticTransaction, StoredTransaction, TxInsertionOrder},
49        tx_indices::OptimisticTxIndices,
50    },
51    on_conflict_do_update, persist_chunk_into_table, read_only_blocking,
52    schema::{
53        chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
54        event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
55        event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
56        objects_version, optimistic_event_emit_module, optimistic_event_emit_package,
57        optimistic_event_senders, optimistic_event_struct_instantiation,
58        optimistic_event_struct_module, optimistic_event_struct_name,
59        optimistic_event_struct_package, optimistic_events, optimistic_transactions,
60        optimistic_tx_calls_fun, optimistic_tx_calls_mod, optimistic_tx_calls_pkg,
61        optimistic_tx_changed_objects, optimistic_tx_input_objects, optimistic_tx_kinds,
62        optimistic_tx_recipients, optimistic_tx_senders, packages, protocol_configs,
63        pruner_cp_watermark, transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg,
64        tx_changed_objects, tx_digests, tx_input_objects, tx_insertion_order, tx_kinds,
65        tx_recipients, tx_senders,
66    },
67    transactional_blocking_with_retry,
68    types::{
69        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
70        IndexedPackage, IndexedTransaction, TxIndex,
71    },
72};
73
74#[macro_export]
75macro_rules! chunk {
76    ($data: expr, $size: expr) => {{
77        $data
78            .into_iter()
79            .chunks($size)
80            .into_iter()
81            .map(|c| c.collect())
82            .collect::<Vec<Vec<_>>>()
83    }};
84}
85
86macro_rules! prune_tx_or_event_indice_table {
87    ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
88        diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
89            .execute($conn)
90            .map_err(IndexerError::from)
91            .context($context_msg)?;
92    };
93}
94
95// In one DB transaction, the update could be chunked into
96// a few statements, this is the amount of rows to update in one statement
97// TODO: I think with the `per_db_tx` params, `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX`
98// is now less relevant. We should do experiments and remove it if it's true.
99const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
100// The amount of rows to update in one DB transaction
101const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
102// The amount of rows to update in one DB transaction, for objects particularly
103// Having this number too high may cause many db deadlocks because of
104// optimistic locking.
105const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
106const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
107
108#[derive(Clone)]
109pub struct PgIndexerStoreConfig {
110    pub parallel_chunk_size: usize,
111    pub parallel_objects_chunk_size: usize,
112    #[expect(unused)]
113    pub epochs_to_keep: Option<u64>,
114}
115
116pub struct PgIndexerStore {
117    blocking_cp: ConnectionPool,
118    metrics: IndexerMetrics,
119    partition_manager: PgPartitionManager,
120    config: PgIndexerStoreConfig,
121}
122
123impl Clone for PgIndexerStore {
124    fn clone(&self) -> PgIndexerStore {
125        Self {
126            blocking_cp: self.blocking_cp.clone(),
127            metrics: self.metrics.clone(),
128            partition_manager: self.partition_manager.clone(),
129            config: self.config.clone(),
130        }
131    }
132}
133
134impl PgIndexerStore {
135    pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
136        let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
137            .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
138            .parse::<usize>()
139            .unwrap();
140        let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
141            .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
142            .parse::<usize>()
143            .unwrap();
144        let epochs_to_keep = std::env::var("EPOCHS_TO_KEEP")
145            .map(|s| s.parse::<u64>().ok())
146            .unwrap_or_else(|_e| None);
147        let partition_manager = PgPartitionManager::new(blocking_cp.clone())
148            .expect("Failed to initialize partition manager");
149        let config = PgIndexerStoreConfig {
150            parallel_chunk_size,
151            parallel_objects_chunk_size,
152            epochs_to_keep,
153        };
154
155        Self {
156            blocking_cp,
157            metrics,
158            partition_manager,
159            config,
160        }
161    }
162
163    pub fn blocking_cp(&self) -> ConnectionPool {
164        self.blocking_cp.clone()
165    }
166
167    pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
168        read_only_blocking!(&self.blocking_cp, |conn| {
169            epochs::dsl::epochs
170                .select(max(epochs::epoch))
171                .first::<Option<i64>>(conn)
172                .map(|v| v.map(|v| v as u64))
173        })
174        .context("Failed reading latest epoch id from PostgresDB")
175    }
176
177    /// Get the range of the protocol versions that need to be indexed.
178    pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
179        // We start indexing from the next protocol version after the latest one stored
180        // in the db.
181        let start = read_only_blocking!(&self.blocking_cp, |conn| {
182            protocol_configs::dsl::protocol_configs
183                .select(max(protocol_configs::protocol_version))
184                .first::<Option<i64>>(conn)
185        })
186        .context("Failed reading latest protocol version from PostgresDB")?
187        .map_or(1, |v| v + 1);
188
189        // We end indexing at the protocol version of the latest epoch stored in the db.
190        let end = read_only_blocking!(&self.blocking_cp, |conn| {
191            epochs::dsl::epochs
192                .select(max(epochs::protocol_version))
193                .first::<Option<i64>>(conn)
194        })
195        .context("Failed reading latest epoch protocol version from PostgresDB")?
196        .unwrap_or(1);
197        Ok((start, end))
198    }
199
200    pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
201        read_only_blocking!(&self.blocking_cp, |conn| {
202            chain_identifier::dsl::chain_identifier
203                .select(chain_identifier::checkpoint_digest)
204                .first::<Vec<u8>>(conn)
205                .optional()
206        })
207        .context("Failed reading chain id from PostgresDB")
208    }
209
210    fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
211        read_only_blocking!(&self.blocking_cp, |conn| {
212            checkpoints::dsl::checkpoints
213                .select(max(checkpoints::sequence_number))
214                .first::<Option<i64>>(conn)
215                .map(|v| v.map(|v| v as u64))
216        })
217        .context("Failed reading latest checkpoint sequence number from PostgresDB")
218    }
219
220    fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
221        read_only_blocking!(&self.blocking_cp, |conn| {
222            checkpoints::dsl::checkpoints
223                .select((
224                    min(checkpoints::sequence_number),
225                    max(checkpoints::sequence_number),
226                ))
227                .first::<(Option<i64>, Option<i64>)>(conn)
228                .map(|(min, max)| {
229                    (
230                        min.unwrap_or_default() as u64,
231                        max.unwrap_or_default() as u64,
232                    )
233                })
234        })
235        .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
236    }
237
238    fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
239        read_only_blocking!(&self.blocking_cp, |conn| {
240            epochs::dsl::epochs
241                .select((min(epochs::epoch), max(epochs::epoch)))
242                .first::<(Option<i64>, Option<i64>)>(conn)
243                .map(|(min, max)| {
244                    (
245                        min.unwrap_or_default() as u64,
246                        max.unwrap_or_default() as u64,
247                    )
248                })
249        })
250        .context("Failed reading min and max epoch numbers from PostgresDB")
251    }
252
253    fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
254        read_only_blocking!(&self.blocking_cp, |conn| {
255            pruner_cp_watermark::dsl::pruner_cp_watermark
256                .select(min(pruner_cp_watermark::checkpoint_sequence_number))
257                .first::<Option<i64>>(conn)
258                .map(|v| v.unwrap_or_default() as u64)
259        })
260        .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
261    }
262
263    fn get_checkpoint_range_for_epoch(
264        &self,
265        epoch: u64,
266    ) -> Result<(u64, Option<u64>), IndexerError> {
267        read_only_blocking!(&self.blocking_cp, |conn| {
268            epochs::dsl::epochs
269                .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
270                .filter(epochs::epoch.eq(epoch as i64))
271                .first::<(i64, Option<i64>)>(conn)
272                .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
273        })
274        .context("Failed reading checkpoint range from PostgresDB")
275    }
276
277    fn get_transaction_range_for_checkpoint(
278        &self,
279        checkpoint: u64,
280    ) -> Result<(u64, u64), IndexerError> {
281        read_only_blocking!(&self.blocking_cp, |conn| {
282            pruner_cp_watermark::dsl::pruner_cp_watermark
283                .select((
284                    pruner_cp_watermark::min_tx_sequence_number,
285                    pruner_cp_watermark::max_tx_sequence_number,
286                ))
287                .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
288                .first::<(i64, i64)>(conn)
289                .map(|(min, max)| (min as u64, max as u64))
290        })
291        .context("Failed reading transaction range from PostgresDB")
292    }
293
294    fn get_latest_object_snapshot_checkpoint_sequence_number(
295        &self,
296    ) -> Result<Option<u64>, IndexerError> {
297        read_only_blocking!(&self.blocking_cp, |conn| {
298            objects_snapshot::dsl::objects_snapshot
299                .select(max(objects_snapshot::checkpoint_sequence_number))
300                .first::<Option<i64>>(conn)
301                .map(|v| v.map(|v| v as u64))
302        })
303        .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
304    }
305
306    fn persist_display_updates(
307        &self,
308        display_updates: BTreeMap<String, StoredDisplay>,
309    ) -> Result<(), IndexerError> {
310        transactional_blocking_with_retry!(
311            &self.blocking_cp,
312            |conn| {
313                on_conflict_do_update!(
314                    display::table,
315                    display_updates.values().collect::<Vec<_>>(),
316                    display::object_type,
317                    (
318                        display::id.eq(excluded(display::id)),
319                        display::version.eq(excluded(display::version)),
320                        display::bcs.eq(excluded(display::bcs)),
321                    ),
322                    conn
323                );
324                Ok::<(), IndexerError>(())
325            },
326            PG_DB_COMMIT_SLEEP_DURATION
327        )?;
328
329        Ok(())
330    }
331
332    fn persist_object_mutation_chunk(
333        &self,
334        mutated_object_mutation_chunk: Vec<StoredObject>,
335    ) -> Result<(), IndexerError> {
336        let guard = self
337            .metrics
338            .checkpoint_db_commit_latency_objects_chunks
339            .start_timer();
340        let len = mutated_object_mutation_chunk.len();
341        transactional_blocking_with_retry!(
342            &self.blocking_cp,
343            |conn| {
344                on_conflict_do_update!(
345                    objects::table,
346                    mutated_object_mutation_chunk.clone(),
347                    objects::object_id,
348                    (
349                        objects::object_id.eq(excluded(objects::object_id)),
350                        objects::object_version.eq(excluded(objects::object_version)),
351                        objects::object_digest.eq(excluded(objects::object_digest)),
352                        objects::owner_type.eq(excluded(objects::owner_type)),
353                        objects::owner_id.eq(excluded(objects::owner_id)),
354                        objects::object_type.eq(excluded(objects::object_type)),
355                        objects::serialized_object.eq(excluded(objects::serialized_object)),
356                        objects::coin_type.eq(excluded(objects::coin_type)),
357                        objects::coin_balance.eq(excluded(objects::coin_balance)),
358                        objects::df_kind.eq(excluded(objects::df_kind)),
359                    ),
360                    conn
361                );
362                Ok::<(), IndexerError>(())
363            },
364            PG_DB_COMMIT_SLEEP_DURATION
365        )
366        .tap_ok(|_| {
367            let elapsed = guard.stop_and_record();
368            info!(elapsed, "Persisted {} chunked objects", len);
369        })
370        .tap_err(|e| {
371            tracing::error!("Failed to persist object mutations with error: {}", e);
372        })
373    }
374
375    fn persist_object_deletion_chunk(
376        &self,
377        deleted_objects_chunk: Vec<StoredDeletedObject>,
378    ) -> Result<(), IndexerError> {
379        let guard = self
380            .metrics
381            .checkpoint_db_commit_latency_objects_chunks
382            .start_timer();
383        let len = deleted_objects_chunk.len();
384        transactional_blocking_with_retry!(
385            &self.blocking_cp,
386            |conn| {
387                diesel::delete(
388                    objects::table.filter(
389                        objects::object_id.eq_any(
390                            deleted_objects_chunk
391                                .iter()
392                                .map(|o| o.object_id.clone())
393                                .collect::<Vec<_>>(),
394                        ),
395                    ),
396                )
397                .execute(conn)
398                .map_err(IndexerError::from)
399                .context("Failed to write object deletion to PostgresDB")?;
400
401                Ok::<(), IndexerError>(())
402            },
403            PG_DB_COMMIT_SLEEP_DURATION
404        )
405        .tap_ok(|_| {
406            let elapsed = guard.stop_and_record();
407            info!(elapsed, "Deleted {} chunked objects", len);
408        })
409        .tap_err(|e| {
410            tracing::error!("Failed to persist object deletions with error: {}", e);
411        })
412    }
413
414    fn backfill_objects_snapshot_chunk(
415        &self,
416        objects_snapshot: Vec<StoredObjectSnapshot>,
417    ) -> Result<(), IndexerError> {
418        let guard = self
419            .metrics
420            .checkpoint_db_commit_latency_objects_snapshot_chunks
421            .start_timer();
422        transactional_blocking_with_retry!(
423            &self.blocking_cp,
424            |conn| {
425                for objects_snapshot_chunk in
426                    objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
427                {
428                    on_conflict_do_update!(
429                        objects_snapshot::table,
430                        objects_snapshot_chunk,
431                        objects_snapshot::object_id,
432                        (
433                            objects_snapshot::object_version
434                                .eq(excluded(objects_snapshot::object_version)),
435                            objects_snapshot::object_status
436                                .eq(excluded(objects_snapshot::object_status)),
437                            objects_snapshot::object_digest
438                                .eq(excluded(objects_snapshot::object_digest)),
439                            objects_snapshot::checkpoint_sequence_number
440                                .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
441                            objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
442                            objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
443                            objects_snapshot::object_type_package
444                                .eq(excluded(objects_snapshot::object_type_package)),
445                            objects_snapshot::object_type_module
446                                .eq(excluded(objects_snapshot::object_type_module)),
447                            objects_snapshot::object_type_name
448                                .eq(excluded(objects_snapshot::object_type_name)),
449                            objects_snapshot::object_type
450                                .eq(excluded(objects_snapshot::object_type)),
451                            objects_snapshot::serialized_object
452                                .eq(excluded(objects_snapshot::serialized_object)),
453                            objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
454                            objects_snapshot::coin_balance
455                                .eq(excluded(objects_snapshot::coin_balance)),
456                            objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
457                        ),
458                        conn
459                    );
460                }
461                Ok::<(), IndexerError>(())
462            },
463            PG_DB_COMMIT_SLEEP_DURATION
464        )
465        .tap_ok(|_| {
466            let elapsed = guard.stop_and_record();
467            info!(
468                elapsed,
469                "Persisted {} chunked objects snapshot",
470                objects_snapshot.len(),
471            );
472        })
473        .tap_err(|e| {
474            tracing::error!("Failed to persist object snapshot with error: {}", e);
475        })
476    }
477
478    fn persist_objects_history_chunk(
479        &self,
480        stored_objects_history: Vec<StoredHistoryObject>,
481    ) -> Result<(), IndexerError> {
482        let guard = self
483            .metrics
484            .checkpoint_db_commit_latency_objects_history_chunks
485            .start_timer();
486        transactional_blocking_with_retry!(
487            &self.blocking_cp,
488            |conn| {
489                for stored_objects_history_chunk in
490                    stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
491                {
492                    insert_or_ignore_into!(
493                        objects_history::table,
494                        stored_objects_history_chunk,
495                        conn
496                    );
497                }
498                Ok::<(), IndexerError>(())
499            },
500            PG_DB_COMMIT_SLEEP_DURATION
501        )
502        .tap_ok(|_| {
503            let elapsed = guard.stop_and_record();
504            info!(
505                elapsed,
506                "Persisted {} chunked objects history",
507                stored_objects_history.len(),
508            );
509        })
510        .tap_err(|e| {
511            tracing::error!("Failed to persist object history with error: {}", e);
512        })
513    }
514
515    fn persist_object_version_chunk(
516        &self,
517        object_versions: Vec<StoredObjectVersion>,
518    ) -> Result<(), IndexerError> {
519        transactional_blocking_with_retry!(
520            &self.blocking_cp,
521            |conn| {
522                for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
523                {
524                    insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
525                }
526                Ok::<(), IndexerError>(())
527            },
528            PG_DB_COMMIT_SLEEP_DURATION
529        )
530        .tap_ok(|_| {
531            info!(
532                "Persisted {} chunked object versions",
533                object_versions.len(),
534            );
535        })
536        .tap_err(|e| {
537            tracing::error!("Failed to persist object version chunk with error: {}", e);
538        })
539    }
540
541    fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
542        let Some(first_checkpoint) = checkpoints.first() else {
543            return Ok(());
544        };
545
546        // If the first checkpoint has sequence number 0, we need to persist the digest
547        // as chain identifier.
548        if first_checkpoint.sequence_number == 0 {
549            let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
550            self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
551            transactional_blocking_with_retry!(
552                &self.blocking_cp,
553                |conn| {
554                    let checkpoint_digest =
555                        first_checkpoint.checkpoint_digest.into_inner().to_vec();
556                    insert_or_ignore_into!(
557                        chain_identifier::table,
558                        StoredChainIdentifier { checkpoint_digest },
559                        conn
560                    );
561                    Ok::<(), IndexerError>(())
562                },
563                PG_DB_COMMIT_SLEEP_DURATION
564            )?;
565        }
566        let guard = self
567            .metrics
568            .checkpoint_db_commit_latency_checkpoints
569            .start_timer();
570
571        let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
572        transactional_blocking_with_retry!(
573            &self.blocking_cp,
574            |conn| {
575                for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
576                    insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
577                }
578                Ok::<(), IndexerError>(())
579            },
580            PG_DB_COMMIT_SLEEP_DURATION
581        )
582        .tap_ok(|_| {
583            info!(
584                "Persisted {} pruner_cp_watermark rows.",
585                stored_cp_txs.len(),
586            );
587        })
588        .tap_err(|e| {
589            tracing::error!("Failed to persist pruner_cp_watermark with error: {}", e);
590        })?;
591
592        let stored_checkpoints = checkpoints
593            .iter()
594            .map(StoredCheckpoint::from)
595            .collect::<Vec<_>>();
596        transactional_blocking_with_retry!(
597            &self.blocking_cp,
598            |conn| {
599                for stored_checkpoint_chunk in
600                    stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
601                {
602                    insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
603                    let time_now_ms = chrono::Utc::now().timestamp_millis();
604                    for stored_checkpoint in stored_checkpoint_chunk {
605                        self.metrics
606                            .db_commit_lag_ms
607                            .set(time_now_ms - stored_checkpoint.timestamp_ms);
608                        self.metrics.max_committed_checkpoint_sequence_number.set(
609                            stored_checkpoint.sequence_number,
610                        );
611                        self.metrics.committed_checkpoint_timestamp_ms.set(
612                            stored_checkpoint.timestamp_ms,
613                        );
614                    }
615                    for stored_checkpoint in stored_checkpoint_chunk {
616                        info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
617                    }
618                }
619                Ok::<(), IndexerError>(())
620            },
621            PG_DB_COMMIT_SLEEP_DURATION
622        )
623        .tap_ok(|_| {
624            let elapsed = guard.stop_and_record();
625            info!(
626                elapsed,
627                "Persisted {} checkpoints",
628                stored_checkpoints.len()
629            );
630        })
631        .tap_err(|e| {
632            tracing::error!("Failed to persist checkpoints with error: {}", e);
633        })
634    }
635
636    fn persist_transactions_chunk(
637        &self,
638        transactions: Vec<IndexedTransaction>,
639    ) -> Result<(), IndexerError> {
640        let guard = self
641            .metrics
642            .checkpoint_db_commit_latency_transactions_chunks
643            .start_timer();
644        let transformation_guard = self
645            .metrics
646            .checkpoint_db_commit_latency_transactions_chunks_transformation
647            .start_timer();
648        let transactions = transactions
649            .iter()
650            .map(StoredTransaction::from)
651            .collect::<Vec<_>>();
652        drop(transformation_guard);
653
654        transactional_blocking_with_retry!(
655            &self.blocking_cp,
656            |conn| {
657                for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
658                    insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
659                }
660                Ok::<(), IndexerError>(())
661            },
662            PG_DB_COMMIT_SLEEP_DURATION
663        )
664        .tap_ok(|_| {
665            let elapsed = guard.stop_and_record();
666            info!(
667                elapsed,
668                "Persisted {} chunked transactions",
669                transactions.len()
670            );
671        })
672        .tap_err(|e| {
673            tracing::error!("Failed to persist transactions with error: {}", e);
674        })
675    }
676
677    fn persist_tx_insertion_order_chunk(
678        &self,
679        tx_order: Vec<TxInsertionOrder>,
680    ) -> Result<(), IndexerError> {
681        let guard = self
682            .metrics
683            .checkpoint_db_commit_latency_tx_insertion_order_chunks
684            .start_timer();
685
686        transactional_blocking_with_retry!(
687            &self.blocking_cp,
688            |conn| {
689                for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
690                    insert_or_ignore_into!(tx_insertion_order::table, tx_order_chunk, conn);
691                }
692                Ok::<(), IndexerError>(())
693            },
694            PG_DB_COMMIT_SLEEP_DURATION
695        )
696        .tap_ok(|_| {
697            let elapsed = guard.stop_and_record();
698            info!(
699                elapsed,
700                "Persisted {} chunked txs insertion order",
701                tx_order.len()
702            );
703        })
704        .tap_err(|e| {
705            tracing::error!("Failed to persist txs insertion order with error: {e}");
706        })
707    }
708
709    fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
710        let guard = self
711            .metrics
712            .checkpoint_db_commit_latency_events_chunks
713            .start_timer();
714        let len = events.len();
715        let events = events
716            .into_iter()
717            .map(StoredEvent::from)
718            .collect::<Vec<_>>();
719
720        transactional_blocking_with_retry!(
721            &self.blocking_cp,
722            |conn| {
723                for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
724                    insert_or_ignore_into!(events::table, event_chunk, conn);
725                }
726                Ok::<(), IndexerError>(())
727            },
728            PG_DB_COMMIT_SLEEP_DURATION
729        )
730        .tap_ok(|_| {
731            let elapsed = guard.stop_and_record();
732            info!(elapsed, "Persisted {} chunked events", len);
733        })
734        .tap_err(|e| {
735            tracing::error!("Failed to persist events with error: {}", e);
736        })
737    }
738
739    fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
740        if packages.is_empty() {
741            return Ok(());
742        }
743        let guard = self
744            .metrics
745            .checkpoint_db_commit_latency_packages
746            .start_timer();
747        let packages = packages
748            .into_iter()
749            .map(StoredPackage::from)
750            .collect::<Vec<_>>();
751        transactional_blocking_with_retry!(
752            &self.blocking_cp,
753            |conn| {
754                for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
755                    on_conflict_do_update!(
756                        packages::table,
757                        packages_chunk,
758                        packages::package_id,
759                        (
760                            packages::package_id.eq(excluded(packages::package_id)),
761                            packages::move_package.eq(excluded(packages::move_package)),
762                        ),
763                        conn
764                    );
765                }
766                Ok::<(), IndexerError>(())
767            },
768            PG_DB_COMMIT_SLEEP_DURATION
769        )
770        .tap_ok(|_| {
771            let elapsed = guard.stop_and_record();
772            info!(elapsed, "Persisted {} packages", packages.len());
773        })
774        .tap_err(|e| {
775            tracing::error!("Failed to persist packages with error: {}", e);
776        })
777    }
778
779    async fn persist_event_indices_chunk(
780        &self,
781        indices: Vec<EventIndex>,
782    ) -> Result<(), IndexerError> {
783        let guard = self
784            .metrics
785            .checkpoint_db_commit_latency_event_indices_chunks
786            .start_timer();
787        let len = indices.len();
788        let (
789            event_emit_packages,
790            event_emit_modules,
791            event_senders,
792            event_struct_packages,
793            event_struct_modules,
794            event_struct_names,
795            event_struct_instantiations,
796        ) = indices.into_iter().map(|i| i.split()).fold(
797            (
798                Vec::new(),
799                Vec::new(),
800                Vec::new(),
801                Vec::new(),
802                Vec::new(),
803                Vec::new(),
804                Vec::new(),
805            ),
806            |(
807                mut event_emit_packages,
808                mut event_emit_modules,
809                mut event_senders,
810                mut event_struct_packages,
811                mut event_struct_modules,
812                mut event_struct_names,
813                mut event_struct_instantiations,
814            ),
815             index| {
816                event_emit_packages.push(index.0);
817                event_emit_modules.push(index.1);
818                event_senders.push(index.2);
819                event_struct_packages.push(index.3);
820                event_struct_modules.push(index.4);
821                event_struct_names.push(index.5);
822                event_struct_instantiations.push(index.6);
823                (
824                    event_emit_packages,
825                    event_emit_modules,
826                    event_senders,
827                    event_struct_packages,
828                    event_struct_modules,
829                    event_struct_names,
830                    event_struct_instantiations,
831                )
832            },
833        );
834
835        // Now persist all the event indices in parallel into their tables.
836        let mut futures = vec![];
837        futures.push(self.spawn_blocking_task(move |this| {
838            persist_chunk_into_table!(
839                event_emit_package::table,
840                event_emit_packages,
841                &this.blocking_cp
842            )
843        }));
844
845        futures.push(self.spawn_blocking_task(move |this| {
846            persist_chunk_into_table!(
847                event_emit_module::table,
848                event_emit_modules,
849                &this.blocking_cp
850            )
851        }));
852
853        futures.push(self.spawn_blocking_task(move |this| {
854            persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
855        }));
856
857        futures.push(self.spawn_blocking_task(move |this| {
858            persist_chunk_into_table!(
859                event_struct_package::table,
860                event_struct_packages,
861                &this.blocking_cp
862            )
863        }));
864
865        futures.push(self.spawn_blocking_task(move |this| {
866            persist_chunk_into_table!(
867                event_struct_module::table,
868                event_struct_modules,
869                &this.blocking_cp
870            )
871        }));
872
873        futures.push(self.spawn_blocking_task(move |this| {
874            persist_chunk_into_table!(
875                event_struct_name::table,
876                event_struct_names,
877                &this.blocking_cp
878            )
879        }));
880
881        futures.push(self.spawn_blocking_task(move |this| {
882            persist_chunk_into_table!(
883                event_struct_instantiation::table,
884                event_struct_instantiations,
885                &this.blocking_cp
886            )
887        }));
888
889        futures::future::try_join_all(futures)
890            .await
891            .map_err(|e| {
892                tracing::error!("Failed to join event indices futures in a chunk: {}", e);
893                IndexerError::from(e)
894            })?
895            .into_iter()
896            .collect::<Result<Vec<_>, _>>()
897            .map_err(|e| {
898                IndexerError::PostgresWrite(format!(
899                    "Failed to persist all event indices in a chunk: {:?}",
900                    e
901                ))
902            })?;
903        let elapsed = guard.stop_and_record();
904        info!(elapsed, "Persisted {} chunked event indices", len);
905        Ok(())
906    }
907
908    async fn persist_tx_indices_chunk(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
909        let guard = self
910            .metrics
911            .checkpoint_db_commit_latency_tx_indices_chunks
912            .start_timer();
913        let len = indices.len();
914        let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) =
915            indices.into_iter().map(|i| i.split()).fold(
916                (
917                    Vec::new(),
918                    Vec::new(),
919                    Vec::new(),
920                    Vec::new(),
921                    Vec::new(),
922                    Vec::new(),
923                    Vec::new(),
924                    Vec::new(),
925                    Vec::new(),
926                ),
927                |(
928                    mut tx_senders,
929                    mut tx_recipients,
930                    mut tx_input_objects,
931                    mut tx_changed_objects,
932                    mut tx_pkgs,
933                    mut tx_mods,
934                    mut tx_funs,
935                    mut tx_digests,
936                    mut tx_kinds,
937                ),
938                 index| {
939                    tx_senders.extend(index.0);
940                    tx_recipients.extend(index.1);
941                    tx_input_objects.extend(index.2);
942                    tx_changed_objects.extend(index.3);
943                    tx_pkgs.extend(index.4);
944                    tx_mods.extend(index.5);
945                    tx_funs.extend(index.6);
946                    tx_digests.extend(index.7);
947                    tx_kinds.extend(index.8);
948                    (
949                        tx_senders,
950                        tx_recipients,
951                        tx_input_objects,
952                        tx_changed_objects,
953                        tx_pkgs,
954                        tx_mods,
955                        tx_funs,
956                        tx_digests,
957                        tx_kinds,
958                    )
959                },
960            );
961
962        let mut futures = vec![];
963        futures.push(self.spawn_blocking_task(move |this| {
964            let now = Instant::now();
965            let senders_len = senders.len();
966            let recipients_len = recipients.len();
967            transactional_blocking_with_retry!(
968                &this.blocking_cp,
969                |conn| {
970                    for chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
971                        insert_or_ignore_into!(tx_senders::table, chunk, conn);
972                    }
973                    for chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
974                        insert_or_ignore_into!(tx_recipients::table, chunk, conn);
975                    }
976                    Ok::<(), IndexerError>(())
977                },
978                PG_DB_COMMIT_SLEEP_DURATION
979            )
980            .tap_ok(|_| {
981                let elapsed = now.elapsed().as_secs_f64();
982                info!(
983                    elapsed,
984                    "Persisted {} rows to tx_senders and {} rows to tx_recipients",
985                    senders_len,
986                    recipients_len,
987                );
988            })
989            .tap_err(|e| {
990                tracing::error!(
991                    "Failed to persist tx_senders and tx_recipients with error: {}",
992                    e
993                );
994            })
995        }));
996
997        futures.push(self.spawn_blocking_task(move |this| {
998            let now = Instant::now();
999            let input_objects_len = input_objects.len();
1000            transactional_blocking_with_retry!(
1001                &this.blocking_cp,
1002                |conn| {
1003                    for chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1004                        insert_or_ignore_into!(tx_input_objects::table, chunk, conn);
1005                    }
1006                    Ok::<(), IndexerError>(())
1007                },
1008                PG_DB_COMMIT_SLEEP_DURATION
1009            )
1010            .tap_ok(|_| {
1011                let elapsed = now.elapsed().as_secs_f64();
1012                info!(
1013                    elapsed,
1014                    "Persisted {} rows to tx_input_objects", input_objects_len,
1015                );
1016            })
1017            .tap_err(|e| {
1018                tracing::error!("Failed to persist tx_input_objects with error: {}", e);
1019            })
1020        }));
1021
1022        futures.push(self.spawn_blocking_task(move |this| {
1023            let now = Instant::now();
1024            let changed_objects_len = changed_objects.len();
1025            transactional_blocking_with_retry!(
1026                &this.blocking_cp,
1027                |conn| {
1028                    for chunk in changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1029                        insert_or_ignore_into!(tx_changed_objects::table, chunk, conn);
1030                    }
1031                    Ok::<(), IndexerError>(())
1032                },
1033                PG_DB_COMMIT_SLEEP_DURATION
1034            )
1035            .tap_ok(|_| {
1036                let elapsed = now.elapsed().as_secs_f64();
1037                info!(
1038                    elapsed,
1039                    "Persisted {} rows to tx_changed_objects table", changed_objects_len,
1040                );
1041            })
1042            .tap_err(|e| {
1043                tracing::error!("Failed to persist tx_changed_objects with error: {}", e);
1044            })
1045        }));
1046
1047        futures.push(self.spawn_blocking_task(move |this| {
1048            let now = Instant::now();
1049            let rows_len = pkgs.len();
1050            transactional_blocking_with_retry!(
1051                &this.blocking_cp,
1052                |conn| {
1053                    for chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1054                        insert_or_ignore_into!(tx_calls_pkg::table, chunk, conn);
1055                    }
1056                    Ok::<(), IndexerError>(())
1057                },
1058                PG_DB_COMMIT_SLEEP_DURATION
1059            )
1060            .tap_ok(|_| {
1061                let elapsed = now.elapsed().as_secs_f64();
1062                info!(
1063                    elapsed,
1064                    "Persisted {} rows to tx_calls_pkg tables", rows_len
1065                );
1066            })
1067            .tap_err(|e| {
1068                tracing::error!("Failed to persist tx_calls_pkg with error: {}", e);
1069            })
1070        }));
1071
1072        futures.push(self.spawn_blocking_task(move |this| {
1073            let now = Instant::now();
1074            let rows_len = mods.len();
1075            transactional_blocking_with_retry!(
1076                &this.blocking_cp,
1077                |conn| {
1078                    for chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1079                        insert_or_ignore_into!(tx_calls_mod::table, chunk, conn);
1080                    }
1081                    Ok::<(), IndexerError>(())
1082                },
1083                PG_DB_COMMIT_SLEEP_DURATION
1084            )
1085            .tap_ok(|_| {
1086                let elapsed = now.elapsed().as_secs_f64();
1087                info!(elapsed, "Persisted {} rows to tx_calls_mod table", rows_len);
1088            })
1089            .tap_err(|e| {
1090                tracing::error!("Failed to persist tx_calls_mod with error: {}", e);
1091            })
1092        }));
1093
1094        futures.push(self.spawn_blocking_task(move |this| {
1095            let now = Instant::now();
1096            let rows_len = funs.len();
1097            transactional_blocking_with_retry!(
1098                &this.blocking_cp,
1099                |conn| {
1100                    for chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1101                        insert_or_ignore_into!(tx_calls_fun::table, chunk, conn);
1102                    }
1103                    Ok::<(), IndexerError>(())
1104                },
1105                PG_DB_COMMIT_SLEEP_DURATION
1106            )
1107            .tap_ok(|_| {
1108                let elapsed = now.elapsed().as_secs_f64();
1109                info!(elapsed, "Persisted {} rows to tx_calls_fun table", rows_len);
1110            })
1111            .tap_err(|e| {
1112                tracing::error!("Failed to persist tx_calls_fun with error: {}", e);
1113            })
1114        }));
1115
1116        futures.push(self.spawn_blocking_task(move |this| {
1117            let now = Instant::now();
1118            let calls_len = digests.len();
1119            transactional_blocking_with_retry!(
1120                &this.blocking_cp,
1121                |conn| {
1122                    for chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1123                        insert_or_ignore_into!(tx_digests::table, chunk, conn);
1124                    }
1125                    Ok::<(), IndexerError>(())
1126                },
1127                Duration::from_secs(60)
1128            )
1129            .tap_ok(|_| {
1130                let elapsed = now.elapsed().as_secs_f64();
1131                info!(elapsed, "Persisted {} rows to tx_digests tables", calls_len);
1132            })
1133            .tap_err(|e| {
1134                tracing::error!("Failed to persist tx_digests with error: {}", e);
1135            })
1136        }));
1137
1138        futures.push(self.spawn_blocking_task(move |this| {
1139            let now = Instant::now();
1140            let rows_len = kinds.len();
1141            transactional_blocking_with_retry!(
1142                &this.blocking_cp,
1143                |conn| {
1144                    for chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1145                        insert_or_ignore_into!(tx_kinds::table, chunk, conn);
1146                    }
1147                    Ok::<(), IndexerError>(())
1148                },
1149                Duration::from_secs(60)
1150            )
1151            .tap_ok(|_| {
1152                let elapsed = now.elapsed().as_secs_f64();
1153                info!(elapsed, "Persisted {} rows to tx_kinds tables", rows_len);
1154            })
1155            .tap_err(|e| {
1156                tracing::error!("Failed to persist tx_kinds with error: {}", e);
1157            })
1158        }));
1159
1160        futures::future::try_join_all(futures)
1161            .await
1162            .map_err(|e| {
1163                tracing::error!("Failed to join tx indices futures in a chunk: {}", e);
1164                IndexerError::from(e)
1165            })?
1166            .into_iter()
1167            .collect::<Result<Vec<_>, _>>()
1168            .map_err(|e| {
1169                IndexerError::PostgresWrite(format!(
1170                    "Failed to persist all tx indices in a chunk: {:?}",
1171                    e
1172                ))
1173            })?;
1174        let elapsed = guard.stop_and_record();
1175        info!(elapsed, "Persisted {} chunked tx_indices", len);
1176        Ok(())
1177    }
1178
1179    fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1180        let guard = self
1181            .metrics
1182            .checkpoint_db_commit_latency_epoch
1183            .start_timer();
1184        let epoch_id = epoch.new_epoch.epoch;
1185
1186        transactional_blocking_with_retry!(
1187            &self.blocking_cp,
1188            |conn| {
1189                if let Some(last_epoch) = &epoch.last_epoch {
1190                    let last_epoch_id = last_epoch.epoch;
1191                    // Overwrites the `epoch_total_transactions` field on `epoch.last_epoch` because
1192                    // we are not guaranteed to have the latest data in db when this is set on
1193                    // indexer's chain-reading side. However, when we `persist_epoch`, the
1194                    // checkpoints from an epoch ago must have been indexed.
1195                    let previous_epoch_network_total_transactions = match epoch_id {
1196                        0 | 1 => 0,
1197                        _ => {
1198                            let prev_epoch_id = epoch_id - 2;
1199                            let result = checkpoints::table
1200                                .filter(checkpoints::epoch.eq(prev_epoch_id as i64))
1201                                .select(max(checkpoints::network_total_transactions))
1202                                .first::<Option<i64>>(conn)
1203                                .map(|o| o.unwrap_or(0))?;
1204
1205                            result as u64
1206                        }
1207                    };
1208
1209                    let epoch_total_transactions = epoch.network_total_transactions
1210                        - previous_epoch_network_total_transactions;
1211
1212                    let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch);
1213                    last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64);
1214                    info!(last_epoch_id, "Persisting epoch end data.");
1215                    on_conflict_do_update!(
1216                        epochs::table,
1217                        vec![last_epoch],
1218                        epochs::epoch,
1219                        (
1220                            // Note: Exclude epoch beginning info except system_state below.
1221                            // This is to ensure that epoch beginning info columns are not
1222                            // overridden with default values,
1223                            // because these columns are default values in `last_epoch`.
1224                            epochs::system_state.eq(excluded(epochs::system_state)),
1225                            epochs::epoch_total_transactions
1226                                .eq(excluded(epochs::epoch_total_transactions)),
1227                            epochs::last_checkpoint_id.eq(excluded(epochs::last_checkpoint_id)),
1228                            epochs::epoch_end_timestamp.eq(excluded(epochs::epoch_end_timestamp)),
1229                            epochs::storage_charge.eq(excluded(epochs::storage_charge)),
1230                            epochs::storage_rebate.eq(excluded(epochs::storage_rebate)),
1231                            epochs::total_gas_fees.eq(excluded(epochs::total_gas_fees)),
1232                            epochs::total_stake_rewards_distributed
1233                                .eq(excluded(epochs::total_stake_rewards_distributed)),
1234                            epochs::epoch_commitments.eq(excluded(epochs::epoch_commitments)),
1235                            epochs::burnt_tokens_amount.eq(excluded(epochs::burnt_tokens_amount)),
1236                            epochs::minted_tokens_amount.eq(excluded(epochs::minted_tokens_amount)),
1237                        ),
1238                        conn
1239                    );
1240                }
1241
1242                let epoch_id = epoch.new_epoch.epoch;
1243                info!(epoch_id, "Persisting epoch beginning info");
1244                let new_epoch = StoredEpochInfo::from_epoch_beginning_info(&epoch.new_epoch);
1245                insert_or_ignore_into!(epochs::table, new_epoch, conn);
1246                Ok::<(), IndexerError>(())
1247            },
1248            PG_DB_COMMIT_SLEEP_DURATION
1249        )
1250        .tap_ok(|_| {
1251            let elapsed = guard.stop_and_record();
1252            info!(elapsed, epoch_id, "Persisted epoch beginning info");
1253        })
1254        .tap_err(|e| {
1255            tracing::error!("Failed to persist epoch with error: {}", e);
1256        })
1257    }
1258
1259    fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1260        let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1261        // partition_0 has been created, so no need to advance it.
1262        if let Some(last_epoch_id) = last_epoch_id {
1263            let last_db_epoch: Option<StoredEpochInfo> =
1264                read_only_blocking!(&self.blocking_cp, |conn| {
1265                    epochs::table
1266                        .filter(epochs::epoch.eq(last_epoch_id as i64))
1267                        .first::<StoredEpochInfo>(conn)
1268                        .optional()
1269                })
1270                .context("Failed to read last epoch from PostgresDB")?;
1271            if let Some(last_epoch) = last_db_epoch {
1272                let epoch_partition_data =
1273                    EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1274                let table_partitions = self.partition_manager.get_table_partitions()?;
1275                for (table, (_, last_partition)) in table_partitions {
1276                    // Only advance epoch partition for epoch partitioned tables.
1277                    if !self
1278                        .partition_manager
1279                        .get_strategy(&table)
1280                        .is_epoch_partitioned()
1281                    {
1282                        continue;
1283                    }
1284                    let guard = self.metrics.advance_epoch_latency.start_timer();
1285                    self.partition_manager.advance_epoch(
1286                        table.clone(),
1287                        last_partition,
1288                        &epoch_partition_data,
1289                    )?;
1290                    let elapsed = guard.stop_and_record();
1291                    info!(
1292                        elapsed,
1293                        "Advanced epoch partition {} for table {}",
1294                        last_partition,
1295                        table.clone()
1296                    );
1297                }
1298            } else {
1299                tracing::error!("Last epoch: {} from PostgresDB is None.", last_epoch_id);
1300            }
1301        }
1302
1303        Ok(())
1304    }
1305
1306    fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1307        transactional_blocking_with_retry!(
1308            &self.blocking_cp,
1309            |conn| {
1310                diesel::delete(
1311                    checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1312                )
1313                .execute(conn)
1314                .map_err(IndexerError::from)
1315                .context("Failed to prune checkpoints table")?;
1316
1317                Ok::<(), IndexerError>(())
1318            },
1319            PG_DB_COMMIT_SLEEP_DURATION
1320        )
1321    }
1322
1323    fn prune_epochs_table(&self, epoch: u64) -> Result<(), IndexerError> {
1324        transactional_blocking_with_retry!(
1325            &self.blocking_cp,
1326            |conn| {
1327                diesel::delete(epochs::table.filter(epochs::epoch.eq(epoch as i64)))
1328                    .execute(conn)
1329                    .map_err(IndexerError::from)
1330                    .context("Failed to prune epochs table")?;
1331                Ok::<(), IndexerError>(())
1332            },
1333            PG_DB_COMMIT_SLEEP_DURATION
1334        )
1335    }
1336
1337    fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1338        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1339        transactional_blocking_with_retry!(
1340            &self.blocking_cp,
1341            |conn| {
1342                prune_tx_or_event_indice_table!(
1343                    event_emit_module,
1344                    conn,
1345                    min_tx,
1346                    max_tx,
1347                    "Failed to prune event_emit_module table"
1348                );
1349                prune_tx_or_event_indice_table!(
1350                    event_emit_package,
1351                    conn,
1352                    min_tx,
1353                    max_tx,
1354                    "Failed to prune event_emit_package table"
1355                );
1356                prune_tx_or_event_indice_table![
1357                    event_senders,
1358                    conn,
1359                    min_tx,
1360                    max_tx,
1361                    "Failed to prune event_senders table"
1362                ];
1363                prune_tx_or_event_indice_table![
1364                    event_struct_instantiation,
1365                    conn,
1366                    min_tx,
1367                    max_tx,
1368                    "Failed to prune event_struct_instantiation table"
1369                ];
1370                prune_tx_or_event_indice_table![
1371                    event_struct_module,
1372                    conn,
1373                    min_tx,
1374                    max_tx,
1375                    "Failed to prune event_struct_module table"
1376                ];
1377                prune_tx_or_event_indice_table![
1378                    event_struct_name,
1379                    conn,
1380                    min_tx,
1381                    max_tx,
1382                    "Failed to prune event_struct_name table"
1383                ];
1384                prune_tx_or_event_indice_table![
1385                    event_struct_package,
1386                    conn,
1387                    min_tx,
1388                    max_tx,
1389                    "Failed to prune event_struct_package table"
1390                ];
1391                Ok::<(), IndexerError>(())
1392            },
1393            PG_DB_COMMIT_SLEEP_DURATION
1394        )
1395    }
1396
1397    fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1398        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1399        transactional_blocking_with_retry!(
1400            &self.blocking_cp,
1401            |conn| {
1402                prune_tx_or_event_indice_table!(
1403                    tx_senders,
1404                    conn,
1405                    min_tx,
1406                    max_tx,
1407                    "Failed to prune tx_senders table"
1408                );
1409                prune_tx_or_event_indice_table!(
1410                    tx_recipients,
1411                    conn,
1412                    min_tx,
1413                    max_tx,
1414                    "Failed to prune tx_recipients table"
1415                );
1416                prune_tx_or_event_indice_table![
1417                    tx_input_objects,
1418                    conn,
1419                    min_tx,
1420                    max_tx,
1421                    "Failed to prune tx_input_objects table"
1422                ];
1423                prune_tx_or_event_indice_table![
1424                    tx_changed_objects,
1425                    conn,
1426                    min_tx,
1427                    max_tx,
1428                    "Failed to prune tx_changed_objects table"
1429                ];
1430                prune_tx_or_event_indice_table![
1431                    tx_calls_pkg,
1432                    conn,
1433                    min_tx,
1434                    max_tx,
1435                    "Failed to prune tx_calls_pkg table"
1436                ];
1437                prune_tx_or_event_indice_table![
1438                    tx_calls_mod,
1439                    conn,
1440                    min_tx,
1441                    max_tx,
1442                    "Failed to prune tx_calls_mod table"
1443                ];
1444                prune_tx_or_event_indice_table![
1445                    tx_calls_fun,
1446                    conn,
1447                    min_tx,
1448                    max_tx,
1449                    "Failed to prune tx_calls_fun table"
1450                ];
1451                prune_tx_or_event_indice_table![
1452                    tx_digests,
1453                    conn,
1454                    min_tx,
1455                    max_tx,
1456                    "Failed to prune tx_digests table"
1457                ];
1458                Ok::<(), IndexerError>(())
1459            },
1460            PG_DB_COMMIT_SLEEP_DURATION
1461        )
1462    }
1463
1464    fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1465        transactional_blocking_with_retry!(
1466            &self.blocking_cp,
1467            |conn| {
1468                diesel::delete(
1469                    pruner_cp_watermark::table
1470                        .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1471                )
1472                .execute(conn)
1473                .map_err(IndexerError::from)
1474                .context("Failed to prune pruner_cp_watermark table")?;
1475                Ok::<(), IndexerError>(())
1476            },
1477            PG_DB_COMMIT_SLEEP_DURATION
1478        )
1479    }
1480
1481    fn get_network_total_transactions_by_end_of_epoch(
1482        &self,
1483        epoch: u64,
1484    ) -> Result<u64, IndexerError> {
1485        read_only_blocking!(&self.blocking_cp, |conn| {
1486            checkpoints::table
1487                .filter(checkpoints::epoch.eq(epoch as i64))
1488                .select(checkpoints::network_total_transactions)
1489                .order_by(checkpoints::sequence_number.desc())
1490                .first::<i64>(conn)
1491        })
1492        .context("Failed to get network total transactions in epoch")
1493        .map(|v| v as u64)
1494    }
1495
1496    fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1497        transactional_blocking_with_retry!(
1498            &self.blocking_cp,
1499            |conn| {
1500                diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1501                    .execute(conn)?;
1502                Ok::<(), IndexerError>(())
1503            },
1504            PG_DB_COMMIT_SLEEP_DURATION
1505        )
1506        .tap_ok(|_| {
1507            info!("Successfully refreshed participation_metrics");
1508        })
1509        .tap_err(|e| {
1510            tracing::error!("Failed to refresh participation_metrics: {e}");
1511        })
1512    }
1513
1514    async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1515    where
1516        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1517        R: Send + 'static,
1518    {
1519        let this = self.clone();
1520        let current_span = tracing::Span::current();
1521        tokio::task::spawn_blocking(move || {
1522            let _guard = current_span.enter();
1523            f(this)
1524        })
1525        .await
1526        .map_err(Into::into)
1527        .and_then(std::convert::identity)
1528    }
1529
1530    fn spawn_blocking_task<F, R>(
1531        &self,
1532        f: F,
1533    ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1534    where
1535        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1536        R: Send + 'static,
1537    {
1538        let this = self.clone();
1539        let current_span = tracing::Span::current();
1540        let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1541        tokio::task::spawn_blocking(move || {
1542            let _guard = current_span.enter();
1543            let _elapsed = guard.stop_and_record();
1544            f(this)
1545        })
1546    }
1547
1548    fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1549    where
1550        F: FnOnce(Self) -> Fut + Send + 'static,
1551        Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1552        R: Send + 'static,
1553    {
1554        let this = self.clone();
1555        tokio::task::spawn(async move { f(this).await })
1556    }
1557}
1558
1559#[async_trait]
1560impl IndexerStore for PgIndexerStore {
1561    async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1562        self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1563            .await
1564    }
1565
1566    async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1567        self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1568            .await
1569    }
1570
1571    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1572        self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1573            .await
1574    }
1575
1576    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1577        self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1578            .await
1579    }
1580
1581    async fn get_latest_object_snapshot_checkpoint_sequence_number(
1582        &self,
1583    ) -> Result<Option<u64>, IndexerError> {
1584        self.execute_in_blocking_worker(|this| {
1585            this.get_latest_object_snapshot_checkpoint_sequence_number()
1586        })
1587        .await
1588    }
1589
1590    async fn persist_objects(
1591        &self,
1592        object_changes: Vec<TransactionObjectChangesToCommit>,
1593    ) -> Result<(), IndexerError> {
1594        if object_changes.is_empty() {
1595            return Ok(());
1596        }
1597        let guard = self
1598            .metrics
1599            .checkpoint_db_commit_latency_objects
1600            .start_timer();
1601        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1602        let object_mutations = indexed_mutations
1603            .into_iter()
1604            .map(StoredObject::from)
1605            .collect::<Vec<_>>();
1606        let object_deletions = indexed_deletions
1607            .into_iter()
1608            .map(StoredDeletedObject::from)
1609            .collect::<Vec<_>>();
1610        let mutation_len = object_mutations.len();
1611        let deletion_len = object_deletions.len();
1612
1613        let object_mutation_chunks =
1614            chunk!(object_mutations, self.config.parallel_objects_chunk_size);
1615        let object_deletion_chunks =
1616            chunk!(object_deletions, self.config.parallel_objects_chunk_size);
1617        let mutation_futures = object_mutation_chunks
1618            .into_iter()
1619            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_mutation_chunk(c)));
1620        futures::future::try_join_all(mutation_futures)
1621            .await
1622            .map_err(|e| {
1623                tracing::error!(
1624                    "Failed to join persist_object_mutation_chunk futures: {}",
1625                    e
1626                );
1627                IndexerError::from(e)
1628            })?
1629            .into_iter()
1630            .collect::<Result<Vec<_>, _>>()
1631            .map_err(|e| {
1632                IndexerError::PostgresWrite(format!(
1633                    "Failed to persist all object mutation chunks: {:?}",
1634                    e
1635                ))
1636            })?;
1637        let deletion_futures = object_deletion_chunks
1638            .into_iter()
1639            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_deletion_chunk(c)));
1640        futures::future::try_join_all(deletion_futures)
1641            .await
1642            .map_err(|e| {
1643                tracing::error!(
1644                    "Failed to join persist_object_deletion_chunk futures: {}",
1645                    e
1646                );
1647                IndexerError::from(e)
1648            })?
1649            .into_iter()
1650            .collect::<Result<Vec<_>, _>>()
1651            .map_err(|e| {
1652                IndexerError::PostgresWrite(format!(
1653                    "Failed to persist all object deletion chunks: {:?}",
1654                    e
1655                ))
1656            })?;
1657
1658        let elapsed = guard.stop_and_record();
1659        info!(
1660            elapsed,
1661            "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
1662        );
1663        Ok(())
1664    }
1665
1666    async fn persist_objects_snapshot(
1667        &self,
1668        object_changes: Vec<TransactionObjectChangesToCommit>,
1669    ) -> Result<(), IndexerError> {
1670        if object_changes.is_empty() {
1671            return Ok(());
1672        }
1673        let guard = self
1674            .metrics
1675            .checkpoint_db_commit_latency_objects_snapshot
1676            .start_timer();
1677        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1678        let objects_snapshot = indexed_mutations
1679            .into_iter()
1680            .map(StoredObjectSnapshot::from)
1681            .chain(
1682                indexed_deletions
1683                    .into_iter()
1684                    .map(StoredObjectSnapshot::from),
1685            )
1686            .collect::<Vec<_>>();
1687        let len = objects_snapshot.len();
1688        let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1689        let futures = chunks
1690            .into_iter()
1691            .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1692
1693        futures::future::try_join_all(futures)
1694            .await
1695            .map_err(|e| {
1696                tracing::error!(
1697                    "Failed to join backfill_objects_snapshot_chunk futures: {}",
1698                    e
1699                );
1700                IndexerError::from(e)
1701            })?
1702            .into_iter()
1703            .collect::<Result<Vec<_>, _>>()
1704            .map_err(|e| {
1705                IndexerError::PostgresWrite(format!(
1706                    "Failed to persist all objects snapshot chunks: {:?}",
1707                    e
1708                ))
1709            })?;
1710        let elapsed = guard.stop_and_record();
1711        info!(elapsed, "Persisted {} objects snapshot", len);
1712        Ok(())
1713    }
1714
1715    async fn persist_object_history(
1716        &self,
1717        object_changes: Vec<TransactionObjectChangesToCommit>,
1718    ) -> Result<(), IndexerError> {
1719        let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1720            .map(|val| val.eq_ignore_ascii_case("true"))
1721            .unwrap_or(false);
1722        if skip_history {
1723            info!("skipping object history");
1724            return Ok(());
1725        }
1726
1727        if object_changes.is_empty() {
1728            return Ok(());
1729        }
1730        let objects = make_objects_history_to_commit(object_changes);
1731        let guard = self
1732            .metrics
1733            .checkpoint_db_commit_latency_objects_history
1734            .start_timer();
1735
1736        let len = objects.len();
1737        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1738        let futures = chunks
1739            .into_iter()
1740            .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1741
1742        futures::future::try_join_all(futures)
1743            .await
1744            .map_err(|e| {
1745                tracing::error!(
1746                    "Failed to join persist_objects_history_chunk futures: {}",
1747                    e
1748                );
1749                IndexerError::from(e)
1750            })?
1751            .into_iter()
1752            .collect::<Result<Vec<_>, _>>()
1753            .map_err(|e| {
1754                IndexerError::PostgresWrite(format!(
1755                    "Failed to persist all objects history chunks: {:?}",
1756                    e
1757                ))
1758            })?;
1759        let elapsed = guard.stop_and_record();
1760        info!(elapsed, "Persisted {} objects history", len);
1761        Ok(())
1762    }
1763
1764    async fn persist_object_versions(
1765        &self,
1766        object_versions: Vec<StoredObjectVersion>,
1767    ) -> Result<(), IndexerError> {
1768        if object_versions.is_empty() {
1769            return Ok(());
1770        }
1771        let object_versions_count = object_versions.len();
1772
1773        let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1774        let futures = chunks
1775            .into_iter()
1776            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1777            .collect::<Vec<_>>();
1778
1779        futures::future::try_join_all(futures)
1780            .await
1781            .map_err(|e| {
1782                tracing::error!("Failed to join persist_object_version_chunk futures: {}", e);
1783                IndexerError::from(e)
1784            })?
1785            .into_iter()
1786            .collect::<Result<Vec<_>, _>>()
1787            .map_err(|e| {
1788                IndexerError::PostgresWrite(format!(
1789                    "Failed to persist all object version chunks: {:?}",
1790                    e
1791                ))
1792            })?;
1793        info!("Persisted {} objects history", object_versions_count);
1794        Ok(())
1795    }
1796
1797    async fn persist_checkpoints(
1798        &self,
1799        checkpoints: Vec<IndexedCheckpoint>,
1800    ) -> Result<(), IndexerError> {
1801        self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1802            .await
1803    }
1804
1805    async fn persist_transactions(
1806        &self,
1807        transactions: Vec<IndexedTransaction>,
1808    ) -> Result<(), IndexerError> {
1809        let guard = self
1810            .metrics
1811            .checkpoint_db_commit_latency_transactions
1812            .start_timer();
1813        let len = transactions.len();
1814
1815        let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1816        let futures = chunks
1817            .into_iter()
1818            .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1819
1820        futures::future::try_join_all(futures)
1821            .await
1822            .map_err(|e| {
1823                tracing::error!("Failed to join persist_transactions_chunk futures: {}", e);
1824                IndexerError::from(e)
1825            })?
1826            .into_iter()
1827            .collect::<Result<Vec<_>, _>>()
1828            .map_err(|e| {
1829                IndexerError::PostgresWrite(format!(
1830                    "Failed to persist all transactions chunks: {:?}",
1831                    e
1832                ))
1833            })?;
1834        let elapsed = guard.stop_and_record();
1835        info!(elapsed, "Persisted {} transactions", len);
1836        Ok(())
1837    }
1838
1839    async fn persist_optimistic_transaction(
1840        &self,
1841        transaction: OptimisticTransaction,
1842    ) -> Result<(), IndexerError> {
1843        let insertion_order = transaction.insertion_order;
1844
1845        self.spawn_blocking_task(move |this| {
1846            transactional_blocking_with_retry!(
1847                &this.blocking_cp,
1848                |conn| {
1849                    insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1850                    Ok::<(), IndexerError>(())
1851                },
1852                PG_DB_COMMIT_SLEEP_DURATION
1853            )
1854            .tap_err(|e| {
1855                tracing::error!("Failed to persist transactions with error: {}", e);
1856            })
1857        })
1858        .await
1859        .map_err(|e| {
1860            IndexerError::PostgresWrite(format!(
1861                "Failed to persist optimistic transaction: {:?}",
1862                e
1863            ))
1864        })??;
1865
1866        info!("Persisted optimistic transaction {insertion_order}");
1867        Ok(())
1868    }
1869
1870    async fn persist_tx_insertion_order(
1871        &self,
1872        tx_order: Vec<TxInsertionOrder>,
1873    ) -> Result<(), IndexerError> {
1874        let guard = self
1875            .metrics
1876            .checkpoint_db_commit_latency_tx_insertion_order
1877            .start_timer();
1878        let len = tx_order.len();
1879
1880        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
1881        let futures = chunks.into_iter().map(|c| {
1882            self.spawn_blocking_task(move |this| this.persist_tx_insertion_order_chunk(c))
1883        });
1884
1885        futures::future::try_join_all(futures)
1886            .await
1887            .map_err(|e| {
1888                tracing::error!("Failed to join persist_tx_insertion_order_chunk futures: {e}",);
1889                IndexerError::from(e)
1890            })?
1891            .into_iter()
1892            .collect::<Result<Vec<_>, _>>()
1893            .map_err(|e| {
1894                IndexerError::PostgresWrite(format!(
1895                    "Failed to persist all txs insertion order chunks: {e:?}",
1896                ))
1897            })?;
1898        let elapsed = guard.stop_and_record();
1899        info!(elapsed, "Persisted {len} txs insertion orders");
1900        Ok(())
1901    }
1902
1903    async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1904        if events.is_empty() {
1905            return Ok(());
1906        }
1907        let len = events.len();
1908        let guard = self
1909            .metrics
1910            .checkpoint_db_commit_latency_events
1911            .start_timer();
1912        let chunks = chunk!(events, self.config.parallel_chunk_size);
1913        let futures = chunks
1914            .into_iter()
1915            .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1916
1917        futures::future::try_join_all(futures)
1918            .await
1919            .map_err(|e| {
1920                tracing::error!("Failed to join persist_events_chunk futures: {}", e);
1921                IndexerError::from(e)
1922            })?
1923            .into_iter()
1924            .collect::<Result<Vec<_>, _>>()
1925            .map_err(|e| {
1926                IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {:?}", e))
1927            })?;
1928        let elapsed = guard.stop_and_record();
1929        info!(elapsed, "Persisted {} events", len);
1930        Ok(())
1931    }
1932
1933    async fn persist_optimistic_events(
1934        &self,
1935        events: Vec<OptimisticEvent>,
1936    ) -> Result<(), IndexerError> {
1937        if events.is_empty() {
1938            return Ok(());
1939        }
1940
1941        self.spawn_blocking_task(move |this| {
1942            persist_chunk_into_table!(optimistic_events::table, events, &this.blocking_cp)
1943        })
1944        .await
1945        .map_err(|e| {
1946            tracing::error!(
1947                "Failed to join persist_chunk_into_table in persist_optimistic_events: {e}"
1948            );
1949            IndexerError::from(e)
1950        })?
1951        .map_err(|e| {
1952            IndexerError::PostgresWrite(format!(
1953                "Failed to persist all optimistic events chunks: {:?}",
1954                e
1955            ))
1956        })
1957    }
1958
1959    async fn persist_displays(
1960        &self,
1961        display_updates: BTreeMap<String, StoredDisplay>,
1962    ) -> Result<(), IndexerError> {
1963        if display_updates.is_empty() {
1964            return Ok(());
1965        }
1966
1967        self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
1968            .await?
1969    }
1970
1971    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1972        if packages.is_empty() {
1973            return Ok(());
1974        }
1975        self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
1976            .await
1977    }
1978
1979    async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
1980        if indices.is_empty() {
1981            return Ok(());
1982        }
1983        let len = indices.len();
1984        let guard = self
1985            .metrics
1986            .checkpoint_db_commit_latency_event_indices
1987            .start_timer();
1988        let chunks = chunk!(indices, self.config.parallel_chunk_size);
1989
1990        let futures = chunks.into_iter().map(|chunk| {
1991            self.spawn_task(move |this: Self| async move {
1992                this.persist_event_indices_chunk(chunk).await
1993            })
1994        });
1995
1996        futures::future::try_join_all(futures)
1997            .await
1998            .map_err(|e| {
1999                tracing::error!("Failed to join persist_event_indices_chunk futures: {}", e);
2000                IndexerError::from(e)
2001            })?
2002            .into_iter()
2003            .collect::<Result<Vec<_>, _>>()
2004            .map_err(|e| {
2005                IndexerError::PostgresWrite(format!(
2006                    "Failed to persist all event_indices chunks: {:?}",
2007                    e
2008                ))
2009            })?;
2010        let elapsed = guard.stop_and_record();
2011        info!(elapsed, "Persisted {} event_indices chunks", len);
2012        Ok(())
2013    }
2014
2015    async fn persist_optimistic_event_indices(
2016        &self,
2017        indices: OptimisticEventIndices,
2018    ) -> Result<(), IndexerError> {
2019        let OptimisticEventIndices {
2020            optimistic_event_emit_packages,
2021            optimistic_event_emit_modules,
2022            optimistic_event_senders,
2023            optimistic_event_struct_packages,
2024            optimistic_event_struct_modules,
2025            optimistic_event_struct_names,
2026            optimistic_event_struct_instantiations,
2027        } = indices;
2028
2029        let mut futures = vec![];
2030        futures.push(self.spawn_blocking_task(move |this| {
2031            persist_chunk_into_table!(
2032                optimistic_event_emit_package::table,
2033                optimistic_event_emit_packages,
2034                &this.blocking_cp
2035            )
2036        }));
2037
2038        futures.push(self.spawn_blocking_task(move |this| {
2039            persist_chunk_into_table!(
2040                optimistic_event_emit_module::table,
2041                optimistic_event_emit_modules,
2042                &this.blocking_cp
2043            )
2044        }));
2045
2046        futures.push(self.spawn_blocking_task(move |this| {
2047            persist_chunk_into_table!(
2048                optimistic_event_senders::table,
2049                optimistic_event_senders,
2050                &this.blocking_cp
2051            )
2052        }));
2053
2054        futures.push(self.spawn_blocking_task(move |this| {
2055            persist_chunk_into_table!(
2056                optimistic_event_struct_package::table,
2057                optimistic_event_struct_packages,
2058                &this.blocking_cp
2059            )
2060        }));
2061
2062        futures.push(self.spawn_blocking_task(move |this| {
2063            persist_chunk_into_table!(
2064                optimistic_event_struct_module::table,
2065                optimistic_event_struct_modules,
2066                &this.blocking_cp
2067            )
2068        }));
2069
2070        futures.push(self.spawn_blocking_task(move |this| {
2071            persist_chunk_into_table!(
2072                optimistic_event_struct_name::table,
2073                optimistic_event_struct_names,
2074                &this.blocking_cp
2075            )
2076        }));
2077
2078        futures.push(self.spawn_blocking_task(move |this| {
2079            persist_chunk_into_table!(
2080                optimistic_event_struct_instantiation::table,
2081                optimistic_event_struct_instantiations,
2082                &this.blocking_cp
2083            )
2084        }));
2085
2086        futures::future::try_join_all(futures)
2087            .await
2088            .map_err(|e| {
2089                tracing::error!("Failed to join optimistic event indices futures: {e}");
2090                IndexerError::from(e)
2091            })?
2092            .into_iter()
2093            .collect::<Result<Vec<_>, _>>()
2094            .map_err(|e| {
2095                IndexerError::PostgresWrite(format!(
2096                    "Failed to persist all optimistic event indices: {e:?}",
2097                ))
2098            })?;
2099        info!("Persisted optimistic event indices");
2100        Ok(())
2101    }
2102
2103    async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2104        if indices.is_empty() {
2105            return Ok(());
2106        }
2107        let len = indices.len();
2108        let guard = self
2109            .metrics
2110            .checkpoint_db_commit_latency_tx_indices
2111            .start_timer();
2112        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2113
2114        let futures = chunks.into_iter().map(|chunk| {
2115            self.spawn_task(
2116                move |this: Self| async move { this.persist_tx_indices_chunk(chunk).await },
2117            )
2118        });
2119        futures::future::try_join_all(futures)
2120            .await
2121            .map_err(|e| {
2122                tracing::error!("Failed to join persist_tx_indices_chunk futures: {}", e);
2123                IndexerError::from(e)
2124            })?
2125            .into_iter()
2126            .collect::<Result<Vec<_>, _>>()
2127            .map_err(|e| {
2128                IndexerError::PostgresWrite(format!(
2129                    "Failed to persist all tx_indices chunks: {:?}",
2130                    e
2131                ))
2132            })?;
2133        let elapsed = guard.stop_and_record();
2134        info!(elapsed, "Persisted {} tx_indices chunks", len);
2135        Ok(())
2136    }
2137
2138    async fn persist_optimistic_tx_indices(
2139        &self,
2140        indices: OptimisticTxIndices,
2141    ) -> Result<(), IndexerError> {
2142        let OptimisticTxIndices {
2143            optimistic_tx_senders: senders,
2144            optimistic_tx_recipients: recipients,
2145            optimistic_tx_input_objects: input_objects,
2146            optimistic_tx_changed_objects: changed_objects,
2147            optimistic_tx_pkgs: pkgs,
2148            optimistic_tx_mods: mods,
2149            optimistic_tx_funs: funs,
2150            optimistic_tx_kinds: kinds,
2151        } = indices;
2152
2153        let mut futures = vec![];
2154        futures.push(self.spawn_blocking_task(move |this| {
2155            persist_chunk_into_table!(optimistic_tx_senders::table, senders, &this.blocking_cp)
2156        }));
2157
2158        futures.push(self.spawn_blocking_task(move |this| {
2159            persist_chunk_into_table!(
2160                optimistic_tx_recipients::table,
2161                recipients,
2162                &this.blocking_cp
2163            )
2164        }));
2165
2166        futures.push(self.spawn_blocking_task(move |this| {
2167            persist_chunk_into_table!(
2168                optimistic_tx_input_objects::table,
2169                input_objects,
2170                &this.blocking_cp
2171            )
2172        }));
2173
2174        futures.push(self.spawn_blocking_task(move |this| {
2175            persist_chunk_into_table!(
2176                optimistic_tx_changed_objects::table,
2177                changed_objects,
2178                &this.blocking_cp
2179            )
2180        }));
2181
2182        futures.push(self.spawn_blocking_task(move |this| {
2183            persist_chunk_into_table!(optimistic_tx_calls_pkg::table, pkgs, &this.blocking_cp)
2184        }));
2185
2186        futures.push(self.spawn_blocking_task(move |this| {
2187            persist_chunk_into_table!(optimistic_tx_calls_mod::table, mods, &this.blocking_cp)
2188        }));
2189
2190        futures.push(self.spawn_blocking_task(move |this| {
2191            persist_chunk_into_table!(optimistic_tx_calls_fun::table, funs, &this.blocking_cp)
2192        }));
2193
2194        futures.push(self.spawn_blocking_task(move |this| {
2195            persist_chunk_into_table!(optimistic_tx_kinds::table, kinds, &this.blocking_cp)
2196        }));
2197
2198        futures::future::try_join_all(futures)
2199            .await
2200            .map_err(|e| {
2201                tracing::error!("Failed to join optimistic tx indices futures: {e}");
2202                IndexerError::from(e)
2203            })?
2204            .into_iter()
2205            .collect::<Result<Vec<_>, _>>()
2206            .map_err(|e| {
2207                IndexerError::PostgresWrite(format!(
2208                    "Failed to persist all optimistic tx indices: {e:?}",
2209                ))
2210            })?;
2211        info!("Persisted optimistic tx indices");
2212        Ok(())
2213    }
2214
2215    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2216        self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2217            .await
2218    }
2219
2220    async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2221        self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2222            .await
2223    }
2224
2225    async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2226        let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2227            (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2228            _ => Err(IndexerError::PostgresRead(format!(
2229                "Failed to get checkpoint range for epoch {}",
2230                epoch
2231            ))),
2232        }?;
2233
2234        // NOTE: for disaster recovery, min_cp is the min cp of the current epoch, which
2235        // is likely partially pruned already. min_prunable_cp is the min cp to
2236        // be pruned. By std::cmp::max, we will resume the pruning process from
2237        // the next checkpoint, instead of the first cp of the current epoch.
2238        let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2239        min_cp = std::cmp::max(min_cp, min_prunable_cp);
2240        for cp in min_cp..=max_cp {
2241            // NOTE: the order of pruning tables is crucial:
2242            // 1. prune checkpoints table, checkpoints table is the source table of
2243            //    available range,
2244            // we prune it first to make sure that we always have full data for checkpoints
2245            // within the available range;
2246            // 2. then prune tx_* tables;
2247            // 3. then prune pruner_cp_watermark table, which is the checkpoint pruning
2248            //    watermark table and also tx seq source
2249            // of a checkpoint to prune tx_* tables;
2250            // 4. lastly we prune epochs table when all checkpoints of the epoch have been
2251            //    pruned.
2252            info!(
2253                "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2254                cp, epoch, min_prunable_cp
2255            );
2256            self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2257                .await
2258                .unwrap_or_else(|e| {
2259                    tracing::error!("Failed to prune checkpoint {}: {}", cp, e);
2260                });
2261
2262            let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2263            self.execute_in_blocking_worker(move |this| {
2264                this.prune_tx_indices_table(min_tx, max_tx)
2265            })
2266            .await
2267            .unwrap_or_else(|e| {
2268                tracing::error!("Failed to prune transactions for cp {}: {}", cp, e);
2269            });
2270            info!(
2271                "Pruned transactions for checkpoint {} from tx {} to tx {}",
2272                cp, min_tx, max_tx
2273            );
2274            self.execute_in_blocking_worker(move |this| {
2275                this.prune_event_indices_table(min_tx, max_tx)
2276            })
2277            .await
2278            .unwrap_or_else(|e| {
2279                tracing::error!(
2280                    "Failed to prune events of transactions for cp {}: {}",
2281                    cp,
2282                    e
2283                );
2284            });
2285            info!(
2286                "Pruned events of transactions for checkpoint {} from tx {} to tx {}",
2287                cp, min_tx, max_tx
2288            );
2289            self.metrics.last_pruned_transaction.set(max_tx as i64);
2290
2291            self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2292                .await
2293                .unwrap_or_else(|e| {
2294                    tracing::error!(
2295                        "Failed to prune pruner_cp_watermark table for cp {}: {}",
2296                        cp,
2297                        e
2298                    );
2299                });
2300            info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2301            self.metrics.last_pruned_checkpoint.set(cp as i64);
2302        }
2303
2304        // NOTE: prune epochs table last, otherwise get_checkpoint_range_for_epoch would
2305        // fail.
2306        self.execute_in_blocking_worker(move |this| this.prune_epochs_table(epoch))
2307            .await
2308            .unwrap_or_else(|e| {
2309                tracing::error!("Failed to prune epoch table for epoch {}: {}", epoch, e);
2310            });
2311        Ok(())
2312    }
2313
2314    async fn get_network_total_transactions_by_end_of_epoch(
2315        &self,
2316        epoch: u64,
2317    ) -> Result<u64, IndexerError> {
2318        self.execute_in_blocking_worker(move |this| {
2319            this.get_network_total_transactions_by_end_of_epoch(epoch)
2320        })
2321        .await
2322    }
2323
2324    async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2325        self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2326            .await
2327    }
2328
2329    fn as_any(&self) -> &dyn StdAny {
2330        self
2331    }
2332
2333    /// Persist protocol configs and feature flags until the protocol version
2334    /// for the latest epoch we have stored in the db, inclusive.
2335    fn persist_protocol_configs_and_feature_flags(
2336        &self,
2337        chain_id: Vec<u8>,
2338    ) -> Result<(), IndexerError> {
2339        let chain_id = ChainIdentifier::from(
2340            CheckpointDigest::try_from(chain_id).expect("Unable to convert chain id"),
2341        );
2342
2343        let mut all_configs = vec![];
2344        let mut all_flags = vec![];
2345
2346        let (start_version, end_version) = self.get_protocol_version_index_range()?;
2347        info!(
2348            "Persisting protocol configs with start_version: {}, end_version: {}",
2349            start_version, end_version
2350        );
2351
2352        // Gather all protocol configs and feature flags for all versions between start
2353        // and end.
2354        for version in start_version..=end_version {
2355            let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2356                (version as u64).into(),
2357                chain_id.chain(),
2358            )
2359            .ok_or(IndexerError::Generic(format!(
2360                "Unable to fetch protocol version {} and chain {:?}",
2361                version,
2362                chain_id.chain()
2363            )))?;
2364            let configs_vec = protocol_configs
2365                .attr_map()
2366                .into_iter()
2367                .map(|(k, v)| StoredProtocolConfig {
2368                    protocol_version: version,
2369                    config_name: k,
2370                    config_value: v.map(|v| v.to_string()),
2371                })
2372                .collect::<Vec<_>>();
2373            all_configs.extend(configs_vec);
2374
2375            let feature_flags = protocol_configs
2376                .feature_map()
2377                .into_iter()
2378                .map(|(k, v)| StoredFeatureFlag {
2379                    protocol_version: version,
2380                    flag_name: k,
2381                    flag_value: v,
2382                })
2383                .collect::<Vec<_>>();
2384            all_flags.extend(feature_flags);
2385        }
2386
2387        // Now insert all of them into the db.
2388        // TODO: right now the size of these updates is manageable but later we may
2389        // consider batching.
2390        transactional_blocking_with_retry!(
2391            &self.blocking_cp,
2392            |conn| {
2393                insert_or_ignore_into!(protocol_configs::table, all_configs.clone(), conn);
2394                insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2395                Ok::<(), IndexerError>(())
2396            },
2397            PG_DB_COMMIT_SLEEP_DURATION
2398        )?;
2399        Ok(())
2400    }
2401}
2402
2403fn make_objects_history_to_commit(
2404    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2405) -> Vec<StoredHistoryObject> {
2406    let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2407        .clone()
2408        .into_iter()
2409        .flat_map(|changes| changes.deleted_objects)
2410        .map(|o| o.into())
2411        .collect();
2412    let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2413        .into_iter()
2414        .flat_map(|changes| changes.changed_objects)
2415        .map(|o| o.into())
2416        .collect();
2417    deleted_objects.into_iter().chain(mutated_objects).collect()
2418}
2419
2420/// Partition object changes into deletions and mutations,
2421/// within partition of mutations or deletions, retain the latest with highest
2422/// version; For overlappings of mutations and deletions, only keep one with
2423/// higher version. This is necessary b/c after this step, DB commit will be
2424/// done in parallel and not in order.
2425fn retain_latest_indexed_objects(
2426    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2427) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2428    // Only the last deleted / mutated object will be in the map,
2429    // b/c tx_object_changes are in order and versions always increment,
2430    let (mutations, deletions) = tx_object_changes
2431        .into_iter()
2432        .flat_map(|change| {
2433            change
2434                .changed_objects
2435                .into_iter()
2436                .map(Either::Left)
2437                .chain(
2438                    change
2439                        .deleted_objects
2440                        .into_iter()
2441                        .map(Either::Right),
2442                )
2443        })
2444        .fold(
2445            (HashMap::<ObjectID, IndexedObject>::new(), HashMap::<ObjectID, IndexedDeletedObject>::new()),
2446            |(mut mutations, mut deletions), either_change| {
2447                match either_change {
2448                    // Remove mutation / deletion with a following deletion / mutation,
2449                    // b/c following deletion / mutation always has a higher version.
2450                    // Technically, assertions below are not required, double check just in case.
2451                    Either::Left(mutation) => {
2452                        let id = mutation.object.id();
2453                        let mutation_version = mutation.object.version();
2454                        if let Some(existing) = deletions.remove(&id) {
2455                            assert!(
2456                                existing.object_version < mutation_version.value(),
2457                                "Mutation version ({mutation_version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2458                                existing.object_version
2459                            );
2460                        }
2461                        if let Some(existing) = mutations.insert(id, mutation) {
2462                            assert!(
2463                                existing.object.version() < mutation_version,
2464                                "Mutation version ({mutation_version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2465                                existing.object.version()
2466                            );
2467                        }
2468                    }
2469                    Either::Right(deletion) => {
2470                        let id = deletion.object_id;
2471                        let deletion_version = deletion.object_version;
2472                        if let Some(existing) = mutations.remove(&id) {
2473                            assert!(
2474                                existing.object.version().value() < deletion_version,
2475                                "Deletion version ({deletion_version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2476                                existing.object.version(),
2477                            );
2478                        }
2479                        if let Some(existing) = deletions.insert(id, deletion) {
2480                            assert!(
2481                                existing.object_version < deletion_version,
2482                                "Deletion version ({deletion_version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2483                                existing.object_version
2484                            );
2485                        }
2486                    }
2487                }
2488                (mutations, deletions)
2489            },
2490        );
2491    (
2492        mutations.into_values().collect(),
2493        deletions.into_values().collect(),
2494    )
2495}