iota_indexer/store/
pg_indexer_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use core::result::Result::Ok;
6use std::{
7    any::Any as StdAny,
8    collections::{BTreeMap, HashMap},
9    time::{Duration, Instant},
10};
11
12use async_trait::async_trait;
13use diesel::{
14    ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
15    dsl::{max, min},
16    upsert::excluded,
17};
18use downcast::Any;
19use futures::future::Either;
20use iota_protocol_config::ProtocolConfig;
21use iota_types::{
22    base_types::ObjectID,
23    digests::{ChainIdentifier, CheckpointDigest},
24};
25use itertools::Itertools;
26use tap::TapFallible;
27use tracing::info;
28
29use super::{
30    IndexerStore,
31    pg_partition_manager::{EpochPartitionData, PgPartitionManager},
32};
33use crate::{
34    db::ConnectionPool,
35    errors::{Context, IndexerError},
36    handlers::{EpochToCommit, TransactionObjectChangesToCommit},
37    insert_or_ignore_into,
38    metrics::IndexerMetrics,
39    models::{
40        checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
41        display::StoredDisplay,
42        epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
43        event_indices::OptimisticEventIndices,
44        events::{OptimisticEvent, StoredEvent},
45        obj_indices::StoredObjectVersion,
46        objects::{StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot},
47        packages::StoredPackage,
48        transactions::{OptimisticTransaction, StoredTransaction, TxInsertionOrder},
49        tx_indices::OptimisticTxIndices,
50    },
51    on_conflict_do_update, persist_chunk_into_table, read_only_blocking,
52    schema::{
53        chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
54        event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
55        event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
56        objects_version, optimistic_event_emit_module, optimistic_event_emit_package,
57        optimistic_event_senders, optimistic_event_struct_instantiation,
58        optimistic_event_struct_module, optimistic_event_struct_name,
59        optimistic_event_struct_package, optimistic_events, optimistic_transactions,
60        optimistic_tx_calls_fun, optimistic_tx_calls_mod, optimistic_tx_calls_pkg,
61        optimistic_tx_changed_objects, optimistic_tx_input_objects, optimistic_tx_kinds,
62        optimistic_tx_recipients, optimistic_tx_senders, packages, protocol_configs,
63        pruner_cp_watermark, transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg,
64        tx_changed_objects, tx_digests, tx_input_objects, tx_insertion_order, tx_kinds,
65        tx_recipients, tx_senders,
66    },
67    transactional_blocking_with_retry,
68    types::{
69        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
70        IndexedPackage, IndexedTransaction, TxIndex,
71    },
72};
73
74#[macro_export]
75macro_rules! chunk {
76    ($data: expr, $size: expr) => {{
77        $data
78            .into_iter()
79            .chunks($size)
80            .into_iter()
81            .map(|c| c.collect())
82            .collect::<Vec<Vec<_>>>()
83    }};
84}
85
86macro_rules! prune_tx_or_event_indice_table {
87    ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
88        diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
89            .execute($conn)
90            .map_err(IndexerError::from)
91            .context($context_msg)?;
92    };
93}
94
95// In one DB transaction, the update could be chunked into
96// a few statements, this is the amount of rows to update in one statement
97// TODO: I think with the `per_db_tx` params, `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX`
98// is now less relevant. We should do experiments and remove it if it's true.
99const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
100// The amount of rows to update in one DB transaction
101const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
102// The amount of rows to update in one DB transaction, for objects particularly
103// Having this number too high may cause many db deadlocks because of
104// optimistic locking.
105const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
106const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
107
108#[derive(Clone)]
109pub struct PgIndexerStoreConfig {
110    pub parallel_chunk_size: usize,
111    pub parallel_objects_chunk_size: usize,
112}
113
114pub struct PgIndexerStore {
115    blocking_cp: ConnectionPool,
116    metrics: IndexerMetrics,
117    partition_manager: PgPartitionManager,
118    config: PgIndexerStoreConfig,
119}
120
121impl Clone for PgIndexerStore {
122    fn clone(&self) -> PgIndexerStore {
123        Self {
124            blocking_cp: self.blocking_cp.clone(),
125            metrics: self.metrics.clone(),
126            partition_manager: self.partition_manager.clone(),
127            config: self.config.clone(),
128        }
129    }
130}
131
132impl PgIndexerStore {
133    pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
134        let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
135            .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
136            .parse::<usize>()
137            .unwrap();
138        let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
139            .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
140            .parse::<usize>()
141            .unwrap();
142        let partition_manager = PgPartitionManager::new(blocking_cp.clone())
143            .expect("Failed to initialize partition manager");
144        let config = PgIndexerStoreConfig {
145            parallel_chunk_size,
146            parallel_objects_chunk_size,
147        };
148
149        Self {
150            blocking_cp,
151            metrics,
152            partition_manager,
153            config,
154        }
155    }
156
157    pub fn blocking_cp(&self) -> ConnectionPool {
158        self.blocking_cp.clone()
159    }
160
161    pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
162        read_only_blocking!(&self.blocking_cp, |conn| {
163            epochs::dsl::epochs
164                .select(max(epochs::epoch))
165                .first::<Option<i64>>(conn)
166                .map(|v| v.map(|v| v as u64))
167        })
168        .context("Failed reading latest epoch id from PostgresDB")
169    }
170
171    /// Get the range of the protocol versions that need to be indexed.
172    pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
173        // We start indexing from the next protocol version after the latest one stored
174        // in the db.
175        let start = read_only_blocking!(&self.blocking_cp, |conn| {
176            protocol_configs::dsl::protocol_configs
177                .select(max(protocol_configs::protocol_version))
178                .first::<Option<i64>>(conn)
179        })
180        .context("Failed reading latest protocol version from PostgresDB")?
181        .map_or(1, |v| v + 1);
182
183        // We end indexing at the protocol version of the latest epoch stored in the db.
184        let end = read_only_blocking!(&self.blocking_cp, |conn| {
185            epochs::dsl::epochs
186                .select(max(epochs::protocol_version))
187                .first::<Option<i64>>(conn)
188        })
189        .context("Failed reading latest epoch protocol version from PostgresDB")?
190        .unwrap_or(1);
191        Ok((start, end))
192    }
193
194    pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
195        read_only_blocking!(&self.blocking_cp, |conn| {
196            chain_identifier::dsl::chain_identifier
197                .select(chain_identifier::checkpoint_digest)
198                .first::<Vec<u8>>(conn)
199                .optional()
200        })
201        .context("Failed reading chain id from PostgresDB")
202    }
203
204    fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
205        read_only_blocking!(&self.blocking_cp, |conn| {
206            checkpoints::dsl::checkpoints
207                .select(max(checkpoints::sequence_number))
208                .first::<Option<i64>>(conn)
209                .map(|v| v.map(|v| v as u64))
210        })
211        .context("Failed reading latest checkpoint sequence number from PostgresDB")
212    }
213
214    fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
215        read_only_blocking!(&self.blocking_cp, |conn| {
216            checkpoints::dsl::checkpoints
217                .select((
218                    min(checkpoints::sequence_number),
219                    max(checkpoints::sequence_number),
220                ))
221                .first::<(Option<i64>, Option<i64>)>(conn)
222                .map(|(min, max)| {
223                    (
224                        min.unwrap_or_default() as u64,
225                        max.unwrap_or_default() as u64,
226                    )
227                })
228        })
229        .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
230    }
231
232    fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
233        read_only_blocking!(&self.blocking_cp, |conn| {
234            epochs::dsl::epochs
235                .select((min(epochs::epoch), max(epochs::epoch)))
236                .first::<(Option<i64>, Option<i64>)>(conn)
237                .map(|(min, max)| {
238                    (
239                        min.unwrap_or_default() as u64,
240                        max.unwrap_or_default() as u64,
241                    )
242                })
243        })
244        .context("Failed reading min and max epoch numbers from PostgresDB")
245    }
246
247    fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
248        read_only_blocking!(&self.blocking_cp, |conn| {
249            pruner_cp_watermark::dsl::pruner_cp_watermark
250                .select(min(pruner_cp_watermark::checkpoint_sequence_number))
251                .first::<Option<i64>>(conn)
252                .map(|v| v.unwrap_or_default() as u64)
253        })
254        .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
255    }
256
257    fn get_checkpoint_range_for_epoch(
258        &self,
259        epoch: u64,
260    ) -> Result<(u64, Option<u64>), IndexerError> {
261        read_only_blocking!(&self.blocking_cp, |conn| {
262            epochs::dsl::epochs
263                .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
264                .filter(epochs::epoch.eq(epoch as i64))
265                .first::<(i64, Option<i64>)>(conn)
266                .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
267        })
268        .context("Failed reading checkpoint range from PostgresDB")
269    }
270
271    fn get_transaction_range_for_checkpoint(
272        &self,
273        checkpoint: u64,
274    ) -> Result<(u64, u64), IndexerError> {
275        read_only_blocking!(&self.blocking_cp, |conn| {
276            pruner_cp_watermark::dsl::pruner_cp_watermark
277                .select((
278                    pruner_cp_watermark::min_tx_sequence_number,
279                    pruner_cp_watermark::max_tx_sequence_number,
280                ))
281                .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
282                .first::<(i64, i64)>(conn)
283                .map(|(min, max)| (min as u64, max as u64))
284        })
285        .context("Failed reading transaction range from PostgresDB")
286    }
287
288    fn get_latest_object_snapshot_checkpoint_sequence_number(
289        &self,
290    ) -> Result<Option<u64>, IndexerError> {
291        read_only_blocking!(&self.blocking_cp, |conn| {
292            objects_snapshot::dsl::objects_snapshot
293                .select(max(objects_snapshot::checkpoint_sequence_number))
294                .first::<Option<i64>>(conn)
295                .map(|v| v.map(|v| v as u64))
296        })
297        .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
298    }
299
300    fn persist_display_updates(
301        &self,
302        display_updates: BTreeMap<String, StoredDisplay>,
303    ) -> Result<(), IndexerError> {
304        transactional_blocking_with_retry!(
305            &self.blocking_cp,
306            |conn| {
307                on_conflict_do_update!(
308                    display::table,
309                    display_updates.values().collect::<Vec<_>>(),
310                    display::object_type,
311                    (
312                        display::id.eq(excluded(display::id)),
313                        display::version.eq(excluded(display::version)),
314                        display::bcs.eq(excluded(display::bcs)),
315                    ),
316                    conn
317                );
318                Ok::<(), IndexerError>(())
319            },
320            PG_DB_COMMIT_SLEEP_DURATION
321        )?;
322
323        Ok(())
324    }
325
326    fn persist_object_mutation_chunk(
327        &self,
328        mutated_object_mutation_chunk: Vec<StoredObject>,
329    ) -> Result<(), IndexerError> {
330        let guard = self
331            .metrics
332            .checkpoint_db_commit_latency_objects_chunks
333            .start_timer();
334        let len = mutated_object_mutation_chunk.len();
335        transactional_blocking_with_retry!(
336            &self.blocking_cp,
337            |conn| {
338                on_conflict_do_update!(
339                    objects::table,
340                    mutated_object_mutation_chunk.clone(),
341                    objects::object_id,
342                    (
343                        objects::object_id.eq(excluded(objects::object_id)),
344                        objects::object_version.eq(excluded(objects::object_version)),
345                        objects::object_digest.eq(excluded(objects::object_digest)),
346                        objects::owner_type.eq(excluded(objects::owner_type)),
347                        objects::owner_id.eq(excluded(objects::owner_id)),
348                        objects::object_type.eq(excluded(objects::object_type)),
349                        objects::serialized_object.eq(excluded(objects::serialized_object)),
350                        objects::coin_type.eq(excluded(objects::coin_type)),
351                        objects::coin_balance.eq(excluded(objects::coin_balance)),
352                        objects::df_kind.eq(excluded(objects::df_kind)),
353                    ),
354                    conn
355                );
356                Ok::<(), IndexerError>(())
357            },
358            PG_DB_COMMIT_SLEEP_DURATION
359        )
360        .tap_ok(|_| {
361            let elapsed = guard.stop_and_record();
362            info!(elapsed, "Persisted {} chunked objects", len);
363        })
364        .tap_err(|e| {
365            tracing::error!("Failed to persist object mutations with error: {}", e);
366        })
367    }
368
369    fn persist_object_deletion_chunk(
370        &self,
371        deleted_objects_chunk: Vec<StoredDeletedObject>,
372    ) -> Result<(), IndexerError> {
373        let guard = self
374            .metrics
375            .checkpoint_db_commit_latency_objects_chunks
376            .start_timer();
377        let len = deleted_objects_chunk.len();
378        transactional_blocking_with_retry!(
379            &self.blocking_cp,
380            |conn| {
381                diesel::delete(
382                    objects::table.filter(
383                        objects::object_id.eq_any(
384                            deleted_objects_chunk
385                                .iter()
386                                .map(|o| o.object_id.clone())
387                                .collect::<Vec<_>>(),
388                        ),
389                    ),
390                )
391                .execute(conn)
392                .map_err(IndexerError::from)
393                .context("Failed to write object deletion to PostgresDB")?;
394
395                Ok::<(), IndexerError>(())
396            },
397            PG_DB_COMMIT_SLEEP_DURATION
398        )
399        .tap_ok(|_| {
400            let elapsed = guard.stop_and_record();
401            info!(elapsed, "Deleted {} chunked objects", len);
402        })
403        .tap_err(|e| {
404            tracing::error!("Failed to persist object deletions with error: {}", e);
405        })
406    }
407
408    fn backfill_objects_snapshot_chunk(
409        &self,
410        objects_snapshot: Vec<StoredObjectSnapshot>,
411    ) -> Result<(), IndexerError> {
412        let guard = self
413            .metrics
414            .checkpoint_db_commit_latency_objects_snapshot_chunks
415            .start_timer();
416        transactional_blocking_with_retry!(
417            &self.blocking_cp,
418            |conn| {
419                for objects_snapshot_chunk in
420                    objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
421                {
422                    on_conflict_do_update!(
423                        objects_snapshot::table,
424                        objects_snapshot_chunk,
425                        objects_snapshot::object_id,
426                        (
427                            objects_snapshot::object_version
428                                .eq(excluded(objects_snapshot::object_version)),
429                            objects_snapshot::object_status
430                                .eq(excluded(objects_snapshot::object_status)),
431                            objects_snapshot::object_digest
432                                .eq(excluded(objects_snapshot::object_digest)),
433                            objects_snapshot::checkpoint_sequence_number
434                                .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
435                            objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
436                            objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
437                            objects_snapshot::object_type_package
438                                .eq(excluded(objects_snapshot::object_type_package)),
439                            objects_snapshot::object_type_module
440                                .eq(excluded(objects_snapshot::object_type_module)),
441                            objects_snapshot::object_type_name
442                                .eq(excluded(objects_snapshot::object_type_name)),
443                            objects_snapshot::object_type
444                                .eq(excluded(objects_snapshot::object_type)),
445                            objects_snapshot::serialized_object
446                                .eq(excluded(objects_snapshot::serialized_object)),
447                            objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
448                            objects_snapshot::coin_balance
449                                .eq(excluded(objects_snapshot::coin_balance)),
450                            objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
451                        ),
452                        conn
453                    );
454                }
455                Ok::<(), IndexerError>(())
456            },
457            PG_DB_COMMIT_SLEEP_DURATION
458        )
459        .tap_ok(|_| {
460            let elapsed = guard.stop_and_record();
461            info!(
462                elapsed,
463                "Persisted {} chunked objects snapshot",
464                objects_snapshot.len(),
465            );
466        })
467        .tap_err(|e| {
468            tracing::error!("Failed to persist object snapshot with error: {}", e);
469        })
470    }
471
472    fn persist_objects_history_chunk(
473        &self,
474        stored_objects_history: Vec<StoredHistoryObject>,
475    ) -> Result<(), IndexerError> {
476        let guard = self
477            .metrics
478            .checkpoint_db_commit_latency_objects_history_chunks
479            .start_timer();
480        transactional_blocking_with_retry!(
481            &self.blocking_cp,
482            |conn| {
483                for stored_objects_history_chunk in
484                    stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
485                {
486                    insert_or_ignore_into!(
487                        objects_history::table,
488                        stored_objects_history_chunk,
489                        conn
490                    );
491                }
492                Ok::<(), IndexerError>(())
493            },
494            PG_DB_COMMIT_SLEEP_DURATION
495        )
496        .tap_ok(|_| {
497            let elapsed = guard.stop_and_record();
498            info!(
499                elapsed,
500                "Persisted {} chunked objects history",
501                stored_objects_history.len(),
502            );
503        })
504        .tap_err(|e| {
505            tracing::error!("Failed to persist object history with error: {}", e);
506        })
507    }
508
509    fn persist_object_version_chunk(
510        &self,
511        object_versions: Vec<StoredObjectVersion>,
512    ) -> Result<(), IndexerError> {
513        transactional_blocking_with_retry!(
514            &self.blocking_cp,
515            |conn| {
516                for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
517                {
518                    insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
519                }
520                Ok::<(), IndexerError>(())
521            },
522            PG_DB_COMMIT_SLEEP_DURATION
523        )
524        .tap_ok(|_| {
525            info!(
526                "Persisted {} chunked object versions",
527                object_versions.len(),
528            );
529        })
530        .tap_err(|e| {
531            tracing::error!("Failed to persist object version chunk with error: {}", e);
532        })
533    }
534
535    fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
536        let Some(first_checkpoint) = checkpoints.first() else {
537            return Ok(());
538        };
539
540        // If the first checkpoint has sequence number 0, we need to persist the digest
541        // as chain identifier.
542        if first_checkpoint.sequence_number == 0 {
543            let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
544            self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
545            transactional_blocking_with_retry!(
546                &self.blocking_cp,
547                |conn| {
548                    let checkpoint_digest =
549                        first_checkpoint.checkpoint_digest.into_inner().to_vec();
550                    insert_or_ignore_into!(
551                        chain_identifier::table,
552                        StoredChainIdentifier { checkpoint_digest },
553                        conn
554                    );
555                    Ok::<(), IndexerError>(())
556                },
557                PG_DB_COMMIT_SLEEP_DURATION
558            )?;
559        }
560        let guard = self
561            .metrics
562            .checkpoint_db_commit_latency_checkpoints
563            .start_timer();
564
565        let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
566        transactional_blocking_with_retry!(
567            &self.blocking_cp,
568            |conn| {
569                for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
570                    insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
571                }
572                Ok::<(), IndexerError>(())
573            },
574            PG_DB_COMMIT_SLEEP_DURATION
575        )
576        .tap_ok(|_| {
577            info!(
578                "Persisted {} pruner_cp_watermark rows.",
579                stored_cp_txs.len(),
580            );
581        })
582        .tap_err(|e| {
583            tracing::error!("Failed to persist pruner_cp_watermark with error: {}", e);
584        })?;
585
586        let stored_checkpoints = checkpoints
587            .iter()
588            .map(StoredCheckpoint::from)
589            .collect::<Vec<_>>();
590        transactional_blocking_with_retry!(
591            &self.blocking_cp,
592            |conn| {
593                for stored_checkpoint_chunk in
594                    stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
595                {
596                    insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
597                    let time_now_ms = chrono::Utc::now().timestamp_millis();
598                    for stored_checkpoint in stored_checkpoint_chunk {
599                        self.metrics
600                            .db_commit_lag_ms
601                            .set(time_now_ms - stored_checkpoint.timestamp_ms);
602                        self.metrics.max_committed_checkpoint_sequence_number.set(
603                            stored_checkpoint.sequence_number,
604                        );
605                        self.metrics.committed_checkpoint_timestamp_ms.set(
606                            stored_checkpoint.timestamp_ms,
607                        );
608                    }
609                    for stored_checkpoint in stored_checkpoint_chunk {
610                        info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
611                    }
612                }
613                Ok::<(), IndexerError>(())
614            },
615            PG_DB_COMMIT_SLEEP_DURATION
616        )
617        .tap_ok(|_| {
618            let elapsed = guard.stop_and_record();
619            info!(
620                elapsed,
621                "Persisted {} checkpoints",
622                stored_checkpoints.len()
623            );
624        })
625        .tap_err(|e| {
626            tracing::error!("Failed to persist checkpoints with error: {}", e);
627        })
628    }
629
630    fn persist_transactions_chunk(
631        &self,
632        transactions: Vec<IndexedTransaction>,
633    ) -> Result<(), IndexerError> {
634        let guard = self
635            .metrics
636            .checkpoint_db_commit_latency_transactions_chunks
637            .start_timer();
638        let transformation_guard = self
639            .metrics
640            .checkpoint_db_commit_latency_transactions_chunks_transformation
641            .start_timer();
642        let transactions = transactions
643            .iter()
644            .map(StoredTransaction::from)
645            .collect::<Vec<_>>();
646        drop(transformation_guard);
647
648        transactional_blocking_with_retry!(
649            &self.blocking_cp,
650            |conn| {
651                for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
652                    insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
653                }
654                Ok::<(), IndexerError>(())
655            },
656            PG_DB_COMMIT_SLEEP_DURATION
657        )
658        .tap_ok(|_| {
659            let elapsed = guard.stop_and_record();
660            info!(
661                elapsed,
662                "Persisted {} chunked transactions",
663                transactions.len()
664            );
665        })
666        .tap_err(|e| {
667            tracing::error!("Failed to persist transactions with error: {}", e);
668        })
669    }
670
671    fn persist_tx_insertion_order_chunk(
672        &self,
673        tx_order: Vec<TxInsertionOrder>,
674    ) -> Result<(), IndexerError> {
675        let guard = self
676            .metrics
677            .checkpoint_db_commit_latency_tx_insertion_order_chunks
678            .start_timer();
679
680        transactional_blocking_with_retry!(
681            &self.blocking_cp,
682            |conn| {
683                for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
684                    insert_or_ignore_into!(tx_insertion_order::table, tx_order_chunk, conn);
685                }
686                Ok::<(), IndexerError>(())
687            },
688            PG_DB_COMMIT_SLEEP_DURATION
689        )
690        .tap_ok(|_| {
691            let elapsed = guard.stop_and_record();
692            info!(
693                elapsed,
694                "Persisted {} chunked txs insertion order",
695                tx_order.len()
696            );
697        })
698        .tap_err(|e| {
699            tracing::error!("Failed to persist txs insertion order with error: {e}");
700        })
701    }
702
703    fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
704        let guard = self
705            .metrics
706            .checkpoint_db_commit_latency_events_chunks
707            .start_timer();
708        let len = events.len();
709        let events = events
710            .into_iter()
711            .map(StoredEvent::from)
712            .collect::<Vec<_>>();
713
714        transactional_blocking_with_retry!(
715            &self.blocking_cp,
716            |conn| {
717                for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
718                    insert_or_ignore_into!(events::table, event_chunk, conn);
719                }
720                Ok::<(), IndexerError>(())
721            },
722            PG_DB_COMMIT_SLEEP_DURATION
723        )
724        .tap_ok(|_| {
725            let elapsed = guard.stop_and_record();
726            info!(elapsed, "Persisted {} chunked events", len);
727        })
728        .tap_err(|e| {
729            tracing::error!("Failed to persist events with error: {}", e);
730        })
731    }
732
733    fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
734        if packages.is_empty() {
735            return Ok(());
736        }
737        let guard = self
738            .metrics
739            .checkpoint_db_commit_latency_packages
740            .start_timer();
741        let packages = packages
742            .into_iter()
743            .map(StoredPackage::from)
744            .collect::<Vec<_>>();
745        transactional_blocking_with_retry!(
746            &self.blocking_cp,
747            |conn| {
748                for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
749                    on_conflict_do_update!(
750                        packages::table,
751                        packages_chunk,
752                        packages::package_id,
753                        (
754                            packages::package_id.eq(excluded(packages::package_id)),
755                            packages::move_package.eq(excluded(packages::move_package)),
756                        ),
757                        conn
758                    );
759                }
760                Ok::<(), IndexerError>(())
761            },
762            PG_DB_COMMIT_SLEEP_DURATION
763        )
764        .tap_ok(|_| {
765            let elapsed = guard.stop_and_record();
766            info!(elapsed, "Persisted {} packages", packages.len());
767        })
768        .tap_err(|e| {
769            tracing::error!("Failed to persist packages with error: {}", e);
770        })
771    }
772
773    async fn persist_event_indices_chunk(
774        &self,
775        indices: Vec<EventIndex>,
776    ) -> Result<(), IndexerError> {
777        let guard = self
778            .metrics
779            .checkpoint_db_commit_latency_event_indices_chunks
780            .start_timer();
781        let len = indices.len();
782        let (
783            event_emit_packages,
784            event_emit_modules,
785            event_senders,
786            event_struct_packages,
787            event_struct_modules,
788            event_struct_names,
789            event_struct_instantiations,
790        ) = indices.into_iter().map(|i| i.split()).fold(
791            (
792                Vec::new(),
793                Vec::new(),
794                Vec::new(),
795                Vec::new(),
796                Vec::new(),
797                Vec::new(),
798                Vec::new(),
799            ),
800            |(
801                mut event_emit_packages,
802                mut event_emit_modules,
803                mut event_senders,
804                mut event_struct_packages,
805                mut event_struct_modules,
806                mut event_struct_names,
807                mut event_struct_instantiations,
808            ),
809             index| {
810                event_emit_packages.push(index.0);
811                event_emit_modules.push(index.1);
812                event_senders.push(index.2);
813                event_struct_packages.push(index.3);
814                event_struct_modules.push(index.4);
815                event_struct_names.push(index.5);
816                event_struct_instantiations.push(index.6);
817                (
818                    event_emit_packages,
819                    event_emit_modules,
820                    event_senders,
821                    event_struct_packages,
822                    event_struct_modules,
823                    event_struct_names,
824                    event_struct_instantiations,
825                )
826            },
827        );
828
829        // Now persist all the event indices in parallel into their tables.
830        let mut futures = vec![];
831        futures.push(self.spawn_blocking_task(move |this| {
832            persist_chunk_into_table!(
833                event_emit_package::table,
834                event_emit_packages,
835                &this.blocking_cp
836            )
837        }));
838
839        futures.push(self.spawn_blocking_task(move |this| {
840            persist_chunk_into_table!(
841                event_emit_module::table,
842                event_emit_modules,
843                &this.blocking_cp
844            )
845        }));
846
847        futures.push(self.spawn_blocking_task(move |this| {
848            persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
849        }));
850
851        futures.push(self.spawn_blocking_task(move |this| {
852            persist_chunk_into_table!(
853                event_struct_package::table,
854                event_struct_packages,
855                &this.blocking_cp
856            )
857        }));
858
859        futures.push(self.spawn_blocking_task(move |this| {
860            persist_chunk_into_table!(
861                event_struct_module::table,
862                event_struct_modules,
863                &this.blocking_cp
864            )
865        }));
866
867        futures.push(self.spawn_blocking_task(move |this| {
868            persist_chunk_into_table!(
869                event_struct_name::table,
870                event_struct_names,
871                &this.blocking_cp
872            )
873        }));
874
875        futures.push(self.spawn_blocking_task(move |this| {
876            persist_chunk_into_table!(
877                event_struct_instantiation::table,
878                event_struct_instantiations,
879                &this.blocking_cp
880            )
881        }));
882
883        futures::future::try_join_all(futures)
884            .await
885            .map_err(|e| {
886                tracing::error!("Failed to join event indices futures in a chunk: {}", e);
887                IndexerError::from(e)
888            })?
889            .into_iter()
890            .collect::<Result<Vec<_>, _>>()
891            .map_err(|e| {
892                IndexerError::PostgresWrite(format!(
893                    "Failed to persist all event indices in a chunk: {e:?}"
894                ))
895            })?;
896        let elapsed = guard.stop_and_record();
897        info!(elapsed, "Persisted {} chunked event indices", len);
898        Ok(())
899    }
900
901    async fn persist_tx_indices_chunk(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
902        let guard = self
903            .metrics
904            .checkpoint_db_commit_latency_tx_indices_chunks
905            .start_timer();
906        let len = indices.len();
907        let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) =
908            indices.into_iter().map(|i| i.split()).fold(
909                (
910                    Vec::new(),
911                    Vec::new(),
912                    Vec::new(),
913                    Vec::new(),
914                    Vec::new(),
915                    Vec::new(),
916                    Vec::new(),
917                    Vec::new(),
918                    Vec::new(),
919                ),
920                |(
921                    mut tx_senders,
922                    mut tx_recipients,
923                    mut tx_input_objects,
924                    mut tx_changed_objects,
925                    mut tx_pkgs,
926                    mut tx_mods,
927                    mut tx_funs,
928                    mut tx_digests,
929                    mut tx_kinds,
930                ),
931                 index| {
932                    tx_senders.extend(index.0);
933                    tx_recipients.extend(index.1);
934                    tx_input_objects.extend(index.2);
935                    tx_changed_objects.extend(index.3);
936                    tx_pkgs.extend(index.4);
937                    tx_mods.extend(index.5);
938                    tx_funs.extend(index.6);
939                    tx_digests.extend(index.7);
940                    tx_kinds.extend(index.8);
941                    (
942                        tx_senders,
943                        tx_recipients,
944                        tx_input_objects,
945                        tx_changed_objects,
946                        tx_pkgs,
947                        tx_mods,
948                        tx_funs,
949                        tx_digests,
950                        tx_kinds,
951                    )
952                },
953            );
954
955        let mut futures = vec![];
956        futures.push(self.spawn_blocking_task(move |this| {
957            let now = Instant::now();
958            let senders_len = senders.len();
959            let recipients_len = recipients.len();
960            transactional_blocking_with_retry!(
961                &this.blocking_cp,
962                |conn| {
963                    for chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
964                        insert_or_ignore_into!(tx_senders::table, chunk, conn);
965                    }
966                    for chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
967                        insert_or_ignore_into!(tx_recipients::table, chunk, conn);
968                    }
969                    Ok::<(), IndexerError>(())
970                },
971                PG_DB_COMMIT_SLEEP_DURATION
972            )
973            .tap_ok(|_| {
974                let elapsed = now.elapsed().as_secs_f64();
975                info!(
976                    elapsed,
977                    "Persisted {} rows to tx_senders and {} rows to tx_recipients",
978                    senders_len,
979                    recipients_len,
980                );
981            })
982            .tap_err(|e| {
983                tracing::error!(
984                    "Failed to persist tx_senders and tx_recipients with error: {}",
985                    e
986                );
987            })
988        }));
989
990        futures.push(self.spawn_blocking_task(move |this| {
991            let now = Instant::now();
992            let input_objects_len = input_objects.len();
993            transactional_blocking_with_retry!(
994                &this.blocking_cp,
995                |conn| {
996                    for chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
997                        insert_or_ignore_into!(tx_input_objects::table, chunk, conn);
998                    }
999                    Ok::<(), IndexerError>(())
1000                },
1001                PG_DB_COMMIT_SLEEP_DURATION
1002            )
1003            .tap_ok(|_| {
1004                let elapsed = now.elapsed().as_secs_f64();
1005                info!(
1006                    elapsed,
1007                    "Persisted {} rows to tx_input_objects", input_objects_len,
1008                );
1009            })
1010            .tap_err(|e| {
1011                tracing::error!("Failed to persist tx_input_objects with error: {}", e);
1012            })
1013        }));
1014
1015        futures.push(self.spawn_blocking_task(move |this| {
1016            let now = Instant::now();
1017            let changed_objects_len = changed_objects.len();
1018            transactional_blocking_with_retry!(
1019                &this.blocking_cp,
1020                |conn| {
1021                    for chunk in changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1022                        insert_or_ignore_into!(tx_changed_objects::table, chunk, conn);
1023                    }
1024                    Ok::<(), IndexerError>(())
1025                },
1026                PG_DB_COMMIT_SLEEP_DURATION
1027            )
1028            .tap_ok(|_| {
1029                let elapsed = now.elapsed().as_secs_f64();
1030                info!(
1031                    elapsed,
1032                    "Persisted {} rows to tx_changed_objects table", changed_objects_len,
1033                );
1034            })
1035            .tap_err(|e| {
1036                tracing::error!("Failed to persist tx_changed_objects with error: {}", e);
1037            })
1038        }));
1039
1040        futures.push(self.spawn_blocking_task(move |this| {
1041            let now = Instant::now();
1042            let rows_len = pkgs.len();
1043            transactional_blocking_with_retry!(
1044                &this.blocking_cp,
1045                |conn| {
1046                    for chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1047                        insert_or_ignore_into!(tx_calls_pkg::table, chunk, conn);
1048                    }
1049                    Ok::<(), IndexerError>(())
1050                },
1051                PG_DB_COMMIT_SLEEP_DURATION
1052            )
1053            .tap_ok(|_| {
1054                let elapsed = now.elapsed().as_secs_f64();
1055                info!(
1056                    elapsed,
1057                    "Persisted {} rows to tx_calls_pkg tables", rows_len
1058                );
1059            })
1060            .tap_err(|e| {
1061                tracing::error!("Failed to persist tx_calls_pkg with error: {}", e);
1062            })
1063        }));
1064
1065        futures.push(self.spawn_blocking_task(move |this| {
1066            let now = Instant::now();
1067            let rows_len = mods.len();
1068            transactional_blocking_with_retry!(
1069                &this.blocking_cp,
1070                |conn| {
1071                    for chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1072                        insert_or_ignore_into!(tx_calls_mod::table, chunk, conn);
1073                    }
1074                    Ok::<(), IndexerError>(())
1075                },
1076                PG_DB_COMMIT_SLEEP_DURATION
1077            )
1078            .tap_ok(|_| {
1079                let elapsed = now.elapsed().as_secs_f64();
1080                info!(elapsed, "Persisted {} rows to tx_calls_mod table", rows_len);
1081            })
1082            .tap_err(|e| {
1083                tracing::error!("Failed to persist tx_calls_mod with error: {}", e);
1084            })
1085        }));
1086
1087        futures.push(self.spawn_blocking_task(move |this| {
1088            let now = Instant::now();
1089            let rows_len = funs.len();
1090            transactional_blocking_with_retry!(
1091                &this.blocking_cp,
1092                |conn| {
1093                    for chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1094                        insert_or_ignore_into!(tx_calls_fun::table, chunk, conn);
1095                    }
1096                    Ok::<(), IndexerError>(())
1097                },
1098                PG_DB_COMMIT_SLEEP_DURATION
1099            )
1100            .tap_ok(|_| {
1101                let elapsed = now.elapsed().as_secs_f64();
1102                info!(elapsed, "Persisted {} rows to tx_calls_fun table", rows_len);
1103            })
1104            .tap_err(|e| {
1105                tracing::error!("Failed to persist tx_calls_fun with error: {}", e);
1106            })
1107        }));
1108
1109        futures.push(self.spawn_blocking_task(move |this| {
1110            let now = Instant::now();
1111            let calls_len = digests.len();
1112            transactional_blocking_with_retry!(
1113                &this.blocking_cp,
1114                |conn| {
1115                    for chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1116                        insert_or_ignore_into!(tx_digests::table, chunk, conn);
1117                    }
1118                    Ok::<(), IndexerError>(())
1119                },
1120                Duration::from_secs(60)
1121            )
1122            .tap_ok(|_| {
1123                let elapsed = now.elapsed().as_secs_f64();
1124                info!(elapsed, "Persisted {} rows to tx_digests tables", calls_len);
1125            })
1126            .tap_err(|e| {
1127                tracing::error!("Failed to persist tx_digests with error: {}", e);
1128            })
1129        }));
1130
1131        futures.push(self.spawn_blocking_task(move |this| {
1132            let now = Instant::now();
1133            let rows_len = kinds.len();
1134            transactional_blocking_with_retry!(
1135                &this.blocking_cp,
1136                |conn| {
1137                    for chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1138                        insert_or_ignore_into!(tx_kinds::table, chunk, conn);
1139                    }
1140                    Ok::<(), IndexerError>(())
1141                },
1142                Duration::from_secs(60)
1143            )
1144            .tap_ok(|_| {
1145                let elapsed = now.elapsed().as_secs_f64();
1146                info!(elapsed, "Persisted {} rows to tx_kinds tables", rows_len);
1147            })
1148            .tap_err(|e| {
1149                tracing::error!("Failed to persist tx_kinds with error: {}", e);
1150            })
1151        }));
1152
1153        futures::future::try_join_all(futures)
1154            .await
1155            .map_err(|e| {
1156                tracing::error!("Failed to join tx indices futures in a chunk: {}", e);
1157                IndexerError::from(e)
1158            })?
1159            .into_iter()
1160            .collect::<Result<Vec<_>, _>>()
1161            .map_err(|e| {
1162                IndexerError::PostgresWrite(format!(
1163                    "Failed to persist all tx indices in a chunk: {e:?}"
1164                ))
1165            })?;
1166        let elapsed = guard.stop_and_record();
1167        info!(elapsed, "Persisted {} chunked tx_indices", len);
1168        Ok(())
1169    }
1170
1171    fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1172        let guard = self
1173            .metrics
1174            .checkpoint_db_commit_latency_epoch
1175            .start_timer();
1176        let epoch_id = epoch.new_epoch.epoch;
1177
1178        transactional_blocking_with_retry!(
1179            &self.blocking_cp,
1180            |conn| {
1181                if let Some(last_epoch) = &epoch.last_epoch {
1182                    let last_epoch_id = last_epoch.epoch;
1183                    // Overwrites the `epoch_total_transactions` field on `epoch.last_epoch` because
1184                    // we are not guaranteed to have the latest data in db when this is set on
1185                    // indexer's chain-reading side. However, when we `persist_epoch`, the
1186                    // checkpoints from an epoch ago must have been indexed.
1187                    let previous_epoch_network_total_transactions = match epoch_id {
1188                        0 | 1 => 0,
1189                        _ => {
1190                            let prev_epoch_id = epoch_id - 2;
1191                            let result = checkpoints::table
1192                                .filter(checkpoints::epoch.eq(prev_epoch_id as i64))
1193                                .select(max(checkpoints::network_total_transactions))
1194                                .first::<Option<i64>>(conn)
1195                                .map(|o| o.unwrap_or(0))?;
1196
1197                            result as u64
1198                        }
1199                    };
1200
1201                    let epoch_total_transactions = epoch.network_total_transactions
1202                        - previous_epoch_network_total_transactions;
1203
1204                    let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch);
1205                    last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64);
1206                    info!(last_epoch_id, "Persisting epoch end data.");
1207                    on_conflict_do_update!(
1208                        epochs::table,
1209                        vec![last_epoch],
1210                        epochs::epoch,
1211                        (
1212                            // Note: Update only what is not present in epoch beginning info.
1213                            epochs::epoch_total_transactions
1214                                .eq(excluded(epochs::epoch_total_transactions)),
1215                            epochs::last_checkpoint_id.eq(excluded(epochs::last_checkpoint_id)),
1216                            epochs::epoch_end_timestamp.eq(excluded(epochs::epoch_end_timestamp)),
1217                            epochs::storage_charge.eq(excluded(epochs::storage_charge)),
1218                            epochs::storage_rebate.eq(excluded(epochs::storage_rebate)),
1219                            epochs::total_gas_fees.eq(excluded(epochs::total_gas_fees)),
1220                            epochs::total_stake_rewards_distributed
1221                                .eq(excluded(epochs::total_stake_rewards_distributed)),
1222                            epochs::epoch_commitments.eq(excluded(epochs::epoch_commitments)),
1223                            epochs::burnt_tokens_amount.eq(excluded(epochs::burnt_tokens_amount)),
1224                            epochs::minted_tokens_amount.eq(excluded(epochs::minted_tokens_amount)),
1225                        ),
1226                        conn
1227                    );
1228                }
1229
1230                let epoch_id = epoch.new_epoch.epoch;
1231                info!(epoch_id, "Persisting epoch beginning info");
1232                let new_epoch = StoredEpochInfo::from_epoch_beginning_info(&epoch.new_epoch);
1233                insert_or_ignore_into!(epochs::table, new_epoch, conn);
1234                Ok::<(), IndexerError>(())
1235            },
1236            PG_DB_COMMIT_SLEEP_DURATION
1237        )
1238        .tap_ok(|_| {
1239            let elapsed = guard.stop_and_record();
1240            info!(elapsed, epoch_id, "Persisted epoch beginning info");
1241        })
1242        .tap_err(|e| {
1243            tracing::error!("Failed to persist epoch with error: {}", e);
1244        })
1245    }
1246
1247    fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1248        let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1249        // partition_0 has been created, so no need to advance it.
1250        if let Some(last_epoch_id) = last_epoch_id {
1251            let last_db_epoch: Option<StoredEpochInfo> =
1252                read_only_blocking!(&self.blocking_cp, |conn| {
1253                    epochs::table
1254                        .filter(epochs::epoch.eq(last_epoch_id as i64))
1255                        .first::<StoredEpochInfo>(conn)
1256                        .optional()
1257                })
1258                .context("Failed to read last epoch from PostgresDB")?;
1259            if let Some(last_epoch) = last_db_epoch {
1260                let epoch_partition_data =
1261                    EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1262                let table_partitions = self.partition_manager.get_table_partitions()?;
1263                for (table, (_, last_partition)) in table_partitions {
1264                    // Only advance epoch partition for epoch partitioned tables.
1265                    if !self
1266                        .partition_manager
1267                        .get_strategy(&table)
1268                        .is_epoch_partitioned()
1269                    {
1270                        continue;
1271                    }
1272                    let guard = self.metrics.advance_epoch_latency.start_timer();
1273                    self.partition_manager.advance_epoch(
1274                        table.clone(),
1275                        last_partition,
1276                        &epoch_partition_data,
1277                    )?;
1278                    let elapsed = guard.stop_and_record();
1279                    info!(
1280                        elapsed,
1281                        "Advanced epoch partition {} for table {}",
1282                        last_partition,
1283                        table.clone()
1284                    );
1285                }
1286            } else {
1287                tracing::error!("Last epoch: {} from PostgresDB is None.", last_epoch_id);
1288            }
1289        }
1290
1291        Ok(())
1292    }
1293
1294    fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1295        transactional_blocking_with_retry!(
1296            &self.blocking_cp,
1297            |conn| {
1298                diesel::delete(
1299                    checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1300                )
1301                .execute(conn)
1302                .map_err(IndexerError::from)
1303                .context("Failed to prune checkpoints table")?;
1304
1305                Ok::<(), IndexerError>(())
1306            },
1307            PG_DB_COMMIT_SLEEP_DURATION
1308        )
1309    }
1310
1311    fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1312        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1313        transactional_blocking_with_retry!(
1314            &self.blocking_cp,
1315            |conn| {
1316                prune_tx_or_event_indice_table!(
1317                    event_emit_module,
1318                    conn,
1319                    min_tx,
1320                    max_tx,
1321                    "Failed to prune event_emit_module table"
1322                );
1323                prune_tx_or_event_indice_table!(
1324                    event_emit_package,
1325                    conn,
1326                    min_tx,
1327                    max_tx,
1328                    "Failed to prune event_emit_package table"
1329                );
1330                prune_tx_or_event_indice_table![
1331                    event_senders,
1332                    conn,
1333                    min_tx,
1334                    max_tx,
1335                    "Failed to prune event_senders table"
1336                ];
1337                prune_tx_or_event_indice_table![
1338                    event_struct_instantiation,
1339                    conn,
1340                    min_tx,
1341                    max_tx,
1342                    "Failed to prune event_struct_instantiation table"
1343                ];
1344                prune_tx_or_event_indice_table![
1345                    event_struct_module,
1346                    conn,
1347                    min_tx,
1348                    max_tx,
1349                    "Failed to prune event_struct_module table"
1350                ];
1351                prune_tx_or_event_indice_table![
1352                    event_struct_name,
1353                    conn,
1354                    min_tx,
1355                    max_tx,
1356                    "Failed to prune event_struct_name table"
1357                ];
1358                prune_tx_or_event_indice_table![
1359                    event_struct_package,
1360                    conn,
1361                    min_tx,
1362                    max_tx,
1363                    "Failed to prune event_struct_package table"
1364                ];
1365                Ok::<(), IndexerError>(())
1366            },
1367            PG_DB_COMMIT_SLEEP_DURATION
1368        )
1369    }
1370
1371    fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1372        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1373        transactional_blocking_with_retry!(
1374            &self.blocking_cp,
1375            |conn| {
1376                prune_tx_or_event_indice_table!(
1377                    tx_senders,
1378                    conn,
1379                    min_tx,
1380                    max_tx,
1381                    "Failed to prune tx_senders table"
1382                );
1383                prune_tx_or_event_indice_table!(
1384                    tx_recipients,
1385                    conn,
1386                    min_tx,
1387                    max_tx,
1388                    "Failed to prune tx_recipients table"
1389                );
1390                prune_tx_or_event_indice_table![
1391                    tx_input_objects,
1392                    conn,
1393                    min_tx,
1394                    max_tx,
1395                    "Failed to prune tx_input_objects table"
1396                ];
1397                prune_tx_or_event_indice_table![
1398                    tx_changed_objects,
1399                    conn,
1400                    min_tx,
1401                    max_tx,
1402                    "Failed to prune tx_changed_objects table"
1403                ];
1404                prune_tx_or_event_indice_table![
1405                    tx_calls_pkg,
1406                    conn,
1407                    min_tx,
1408                    max_tx,
1409                    "Failed to prune tx_calls_pkg table"
1410                ];
1411                prune_tx_or_event_indice_table![
1412                    tx_calls_mod,
1413                    conn,
1414                    min_tx,
1415                    max_tx,
1416                    "Failed to prune tx_calls_mod table"
1417                ];
1418                prune_tx_or_event_indice_table![
1419                    tx_calls_fun,
1420                    conn,
1421                    min_tx,
1422                    max_tx,
1423                    "Failed to prune tx_calls_fun table"
1424                ];
1425                prune_tx_or_event_indice_table![
1426                    tx_digests,
1427                    conn,
1428                    min_tx,
1429                    max_tx,
1430                    "Failed to prune tx_digests table"
1431                ];
1432                Ok::<(), IndexerError>(())
1433            },
1434            PG_DB_COMMIT_SLEEP_DURATION
1435        )
1436    }
1437
1438    fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1439        transactional_blocking_with_retry!(
1440            &self.blocking_cp,
1441            |conn| {
1442                diesel::delete(
1443                    pruner_cp_watermark::table
1444                        .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1445                )
1446                .execute(conn)
1447                .map_err(IndexerError::from)
1448                .context("Failed to prune pruner_cp_watermark table")?;
1449                Ok::<(), IndexerError>(())
1450            },
1451            PG_DB_COMMIT_SLEEP_DURATION
1452        )
1453    }
1454
1455    fn get_network_total_transactions_by_end_of_epoch(
1456        &self,
1457        epoch: u64,
1458    ) -> Result<u64, IndexerError> {
1459        read_only_blocking!(&self.blocking_cp, |conn| {
1460            checkpoints::table
1461                .filter(checkpoints::epoch.eq(epoch as i64))
1462                .select(checkpoints::network_total_transactions)
1463                .order_by(checkpoints::sequence_number.desc())
1464                .first::<i64>(conn)
1465        })
1466        .context("Failed to get network total transactions in epoch")
1467        .map(|v| v as u64)
1468    }
1469
1470    fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1471        transactional_blocking_with_retry!(
1472            &self.blocking_cp,
1473            |conn| {
1474                diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1475                    .execute(conn)?;
1476                Ok::<(), IndexerError>(())
1477            },
1478            PG_DB_COMMIT_SLEEP_DURATION
1479        )
1480        .tap_ok(|_| {
1481            info!("Successfully refreshed participation_metrics");
1482        })
1483        .tap_err(|e| {
1484            tracing::error!("Failed to refresh participation_metrics: {e}");
1485        })
1486    }
1487
1488    async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1489    where
1490        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1491        R: Send + 'static,
1492    {
1493        let this = self.clone();
1494        let current_span = tracing::Span::current();
1495        tokio::task::spawn_blocking(move || {
1496            let _guard = current_span.enter();
1497            f(this)
1498        })
1499        .await
1500        .map_err(Into::into)
1501        .and_then(std::convert::identity)
1502    }
1503
1504    fn spawn_blocking_task<F, R>(
1505        &self,
1506        f: F,
1507    ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1508    where
1509        F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1510        R: Send + 'static,
1511    {
1512        let this = self.clone();
1513        let current_span = tracing::Span::current();
1514        let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1515        tokio::task::spawn_blocking(move || {
1516            let _guard = current_span.enter();
1517            let _elapsed = guard.stop_and_record();
1518            f(this)
1519        })
1520    }
1521
1522    fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1523    where
1524        F: FnOnce(Self) -> Fut + Send + 'static,
1525        Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1526        R: Send + 'static,
1527    {
1528        let this = self.clone();
1529        tokio::task::spawn(async move { f(this).await })
1530    }
1531}
1532
1533#[async_trait]
1534impl IndexerStore for PgIndexerStore {
1535    async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1536        self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1537            .await
1538    }
1539
1540    async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1541        self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1542            .await
1543    }
1544
1545    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1546        self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1547            .await
1548    }
1549
1550    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1551        self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1552            .await
1553    }
1554
1555    async fn get_latest_object_snapshot_checkpoint_sequence_number(
1556        &self,
1557    ) -> Result<Option<u64>, IndexerError> {
1558        self.execute_in_blocking_worker(|this| {
1559            this.get_latest_object_snapshot_checkpoint_sequence_number()
1560        })
1561        .await
1562    }
1563
1564    async fn persist_objects(
1565        &self,
1566        object_changes: Vec<TransactionObjectChangesToCommit>,
1567    ) -> Result<(), IndexerError> {
1568        if object_changes.is_empty() {
1569            return Ok(());
1570        }
1571        let guard = self
1572            .metrics
1573            .checkpoint_db_commit_latency_objects
1574            .start_timer();
1575        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1576        let object_mutations = indexed_mutations
1577            .into_iter()
1578            .map(StoredObject::from)
1579            .collect::<Vec<_>>();
1580        let object_deletions = indexed_deletions
1581            .into_iter()
1582            .map(StoredDeletedObject::from)
1583            .collect::<Vec<_>>();
1584        let mutation_len = object_mutations.len();
1585        let deletion_len = object_deletions.len();
1586
1587        let object_mutation_chunks =
1588            chunk!(object_mutations, self.config.parallel_objects_chunk_size);
1589        let object_deletion_chunks =
1590            chunk!(object_deletions, self.config.parallel_objects_chunk_size);
1591        let mutation_futures = object_mutation_chunks
1592            .into_iter()
1593            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_mutation_chunk(c)));
1594        futures::future::try_join_all(mutation_futures)
1595            .await
1596            .map_err(|e| {
1597                tracing::error!(
1598                    "Failed to join persist_object_mutation_chunk futures: {}",
1599                    e
1600                );
1601                IndexerError::from(e)
1602            })?
1603            .into_iter()
1604            .collect::<Result<Vec<_>, _>>()
1605            .map_err(|e| {
1606                IndexerError::PostgresWrite(format!(
1607                    "Failed to persist all object mutation chunks: {e:?}"
1608                ))
1609            })?;
1610        let deletion_futures = object_deletion_chunks
1611            .into_iter()
1612            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_deletion_chunk(c)));
1613        futures::future::try_join_all(deletion_futures)
1614            .await
1615            .map_err(|e| {
1616                tracing::error!(
1617                    "Failed to join persist_object_deletion_chunk futures: {}",
1618                    e
1619                );
1620                IndexerError::from(e)
1621            })?
1622            .into_iter()
1623            .collect::<Result<Vec<_>, _>>()
1624            .map_err(|e| {
1625                IndexerError::PostgresWrite(format!(
1626                    "Failed to persist all object deletion chunks: {e:?}"
1627                ))
1628            })?;
1629
1630        let elapsed = guard.stop_and_record();
1631        info!(
1632            elapsed,
1633            "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
1634        );
1635        Ok(())
1636    }
1637
1638    async fn persist_objects_snapshot(
1639        &self,
1640        object_changes: Vec<TransactionObjectChangesToCommit>,
1641    ) -> Result<(), IndexerError> {
1642        if object_changes.is_empty() {
1643            return Ok(());
1644        }
1645        let guard = self
1646            .metrics
1647            .checkpoint_db_commit_latency_objects_snapshot
1648            .start_timer();
1649        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1650        let objects_snapshot = indexed_mutations
1651            .into_iter()
1652            .map(StoredObjectSnapshot::from)
1653            .chain(
1654                indexed_deletions
1655                    .into_iter()
1656                    .map(StoredObjectSnapshot::from),
1657            )
1658            .collect::<Vec<_>>();
1659        let len = objects_snapshot.len();
1660        let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1661        let futures = chunks
1662            .into_iter()
1663            .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1664
1665        futures::future::try_join_all(futures)
1666            .await
1667            .map_err(|e| {
1668                tracing::error!(
1669                    "Failed to join backfill_objects_snapshot_chunk futures: {}",
1670                    e
1671                );
1672                IndexerError::from(e)
1673            })?
1674            .into_iter()
1675            .collect::<Result<Vec<_>, _>>()
1676            .map_err(|e| {
1677                IndexerError::PostgresWrite(format!(
1678                    "Failed to persist all objects snapshot chunks: {e:?}"
1679                ))
1680            })?;
1681        let elapsed = guard.stop_and_record();
1682        info!(elapsed, "Persisted {} objects snapshot", len);
1683        Ok(())
1684    }
1685
1686    async fn persist_object_history(
1687        &self,
1688        object_changes: Vec<TransactionObjectChangesToCommit>,
1689    ) -> Result<(), IndexerError> {
1690        let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1691            .map(|val| val.eq_ignore_ascii_case("true"))
1692            .unwrap_or(false);
1693        if skip_history {
1694            info!("skipping object history");
1695            return Ok(());
1696        }
1697
1698        if object_changes.is_empty() {
1699            return Ok(());
1700        }
1701        let objects = make_objects_history_to_commit(object_changes);
1702        let guard = self
1703            .metrics
1704            .checkpoint_db_commit_latency_objects_history
1705            .start_timer();
1706
1707        let len = objects.len();
1708        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1709        let futures = chunks
1710            .into_iter()
1711            .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1712
1713        futures::future::try_join_all(futures)
1714            .await
1715            .map_err(|e| {
1716                tracing::error!(
1717                    "Failed to join persist_objects_history_chunk futures: {}",
1718                    e
1719                );
1720                IndexerError::from(e)
1721            })?
1722            .into_iter()
1723            .collect::<Result<Vec<_>, _>>()
1724            .map_err(|e| {
1725                IndexerError::PostgresWrite(format!(
1726                    "Failed to persist all objects history chunks: {e:?}"
1727                ))
1728            })?;
1729        let elapsed = guard.stop_and_record();
1730        info!(elapsed, "Persisted {} objects history", len);
1731        Ok(())
1732    }
1733
1734    async fn persist_object_versions(
1735        &self,
1736        object_versions: Vec<StoredObjectVersion>,
1737    ) -> Result<(), IndexerError> {
1738        if object_versions.is_empty() {
1739            return Ok(());
1740        }
1741        let object_versions_count = object_versions.len();
1742
1743        let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1744        let futures = chunks
1745            .into_iter()
1746            .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1747            .collect::<Vec<_>>();
1748
1749        futures::future::try_join_all(futures)
1750            .await
1751            .map_err(|e| {
1752                tracing::error!("Failed to join persist_object_version_chunk futures: {}", e);
1753                IndexerError::from(e)
1754            })?
1755            .into_iter()
1756            .collect::<Result<Vec<_>, _>>()
1757            .map_err(|e| {
1758                IndexerError::PostgresWrite(format!(
1759                    "Failed to persist all object version chunks: {e:?}"
1760                ))
1761            })?;
1762        info!("Persisted {} objects history", object_versions_count);
1763        Ok(())
1764    }
1765
1766    async fn persist_checkpoints(
1767        &self,
1768        checkpoints: Vec<IndexedCheckpoint>,
1769    ) -> Result<(), IndexerError> {
1770        self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1771            .await
1772    }
1773
1774    async fn persist_transactions(
1775        &self,
1776        transactions: Vec<IndexedTransaction>,
1777    ) -> Result<(), IndexerError> {
1778        let guard = self
1779            .metrics
1780            .checkpoint_db_commit_latency_transactions
1781            .start_timer();
1782        let len = transactions.len();
1783
1784        let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1785        let futures = chunks
1786            .into_iter()
1787            .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1788
1789        futures::future::try_join_all(futures)
1790            .await
1791            .map_err(|e| {
1792                tracing::error!("Failed to join persist_transactions_chunk futures: {}", e);
1793                IndexerError::from(e)
1794            })?
1795            .into_iter()
1796            .collect::<Result<Vec<_>, _>>()
1797            .map_err(|e| {
1798                IndexerError::PostgresWrite(format!(
1799                    "Failed to persist all transactions chunks: {e:?}"
1800                ))
1801            })?;
1802        let elapsed = guard.stop_and_record();
1803        info!(elapsed, "Persisted {} transactions", len);
1804        Ok(())
1805    }
1806
1807    async fn persist_optimistic_transaction(
1808        &self,
1809        transaction: OptimisticTransaction,
1810    ) -> Result<(), IndexerError> {
1811        let insertion_order = transaction.insertion_order;
1812
1813        self.spawn_blocking_task(move |this| {
1814            transactional_blocking_with_retry!(
1815                &this.blocking_cp,
1816                |conn| {
1817                    insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1818                    Ok::<(), IndexerError>(())
1819                },
1820                PG_DB_COMMIT_SLEEP_DURATION
1821            )
1822            .tap_err(|e| {
1823                tracing::error!("Failed to persist transactions with error: {}", e);
1824            })
1825        })
1826        .await
1827        .map_err(|e| {
1828            IndexerError::PostgresWrite(format!("Failed to persist optimistic transaction: {e:?}"))
1829        })??;
1830
1831        info!("Persisted optimistic transaction {insertion_order}");
1832        Ok(())
1833    }
1834
1835    async fn persist_tx_insertion_order(
1836        &self,
1837        tx_order: Vec<TxInsertionOrder>,
1838    ) -> Result<(), IndexerError> {
1839        let guard = self
1840            .metrics
1841            .checkpoint_db_commit_latency_tx_insertion_order
1842            .start_timer();
1843        let len = tx_order.len();
1844
1845        let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
1846        let futures = chunks.into_iter().map(|c| {
1847            self.spawn_blocking_task(move |this| this.persist_tx_insertion_order_chunk(c))
1848        });
1849
1850        futures::future::try_join_all(futures)
1851            .await
1852            .map_err(|e| {
1853                tracing::error!("Failed to join persist_tx_insertion_order_chunk futures: {e}",);
1854                IndexerError::from(e)
1855            })?
1856            .into_iter()
1857            .collect::<Result<Vec<_>, _>>()
1858            .map_err(|e| {
1859                IndexerError::PostgresWrite(format!(
1860                    "Failed to persist all txs insertion order chunks: {e:?}",
1861                ))
1862            })?;
1863        let elapsed = guard.stop_and_record();
1864        info!(elapsed, "Persisted {len} txs insertion orders");
1865        Ok(())
1866    }
1867
1868    async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1869        if events.is_empty() {
1870            return Ok(());
1871        }
1872        let len = events.len();
1873        let guard = self
1874            .metrics
1875            .checkpoint_db_commit_latency_events
1876            .start_timer();
1877        let chunks = chunk!(events, self.config.parallel_chunk_size);
1878        let futures = chunks
1879            .into_iter()
1880            .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1881
1882        futures::future::try_join_all(futures)
1883            .await
1884            .map_err(|e| {
1885                tracing::error!("Failed to join persist_events_chunk futures: {}", e);
1886                IndexerError::from(e)
1887            })?
1888            .into_iter()
1889            .collect::<Result<Vec<_>, _>>()
1890            .map_err(|e| {
1891                IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
1892            })?;
1893        let elapsed = guard.stop_and_record();
1894        info!(elapsed, "Persisted {} events", len);
1895        Ok(())
1896    }
1897
1898    async fn persist_optimistic_events(
1899        &self,
1900        events: Vec<OptimisticEvent>,
1901    ) -> Result<(), IndexerError> {
1902        if events.is_empty() {
1903            return Ok(());
1904        }
1905
1906        self.spawn_blocking_task(move |this| {
1907            persist_chunk_into_table!(optimistic_events::table, events, &this.blocking_cp)
1908        })
1909        .await
1910        .map_err(|e| {
1911            tracing::error!(
1912                "Failed to join persist_chunk_into_table in persist_optimistic_events: {e}"
1913            );
1914            IndexerError::from(e)
1915        })?
1916        .map_err(|e| {
1917            IndexerError::PostgresWrite(format!(
1918                "Failed to persist all optimistic events chunks: {e:?}"
1919            ))
1920        })
1921    }
1922
1923    async fn persist_displays(
1924        &self,
1925        display_updates: BTreeMap<String, StoredDisplay>,
1926    ) -> Result<(), IndexerError> {
1927        if display_updates.is_empty() {
1928            return Ok(());
1929        }
1930
1931        self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
1932            .await?
1933    }
1934
1935    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1936        if packages.is_empty() {
1937            return Ok(());
1938        }
1939        self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
1940            .await
1941    }
1942
1943    async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
1944        if indices.is_empty() {
1945            return Ok(());
1946        }
1947        let len = indices.len();
1948        let guard = self
1949            .metrics
1950            .checkpoint_db_commit_latency_event_indices
1951            .start_timer();
1952        let chunks = chunk!(indices, self.config.parallel_chunk_size);
1953
1954        let futures = chunks.into_iter().map(|chunk| {
1955            self.spawn_task(move |this: Self| async move {
1956                this.persist_event_indices_chunk(chunk).await
1957            })
1958        });
1959
1960        futures::future::try_join_all(futures)
1961            .await
1962            .map_err(|e| {
1963                tracing::error!("Failed to join persist_event_indices_chunk futures: {}", e);
1964                IndexerError::from(e)
1965            })?
1966            .into_iter()
1967            .collect::<Result<Vec<_>, _>>()
1968            .map_err(|e| {
1969                IndexerError::PostgresWrite(format!(
1970                    "Failed to persist all event_indices chunks: {e:?}"
1971                ))
1972            })?;
1973        let elapsed = guard.stop_and_record();
1974        info!(elapsed, "Persisted {} event_indices chunks", len);
1975        Ok(())
1976    }
1977
1978    async fn persist_optimistic_event_indices(
1979        &self,
1980        indices: OptimisticEventIndices,
1981    ) -> Result<(), IndexerError> {
1982        let OptimisticEventIndices {
1983            optimistic_event_emit_packages,
1984            optimistic_event_emit_modules,
1985            optimistic_event_senders,
1986            optimistic_event_struct_packages,
1987            optimistic_event_struct_modules,
1988            optimistic_event_struct_names,
1989            optimistic_event_struct_instantiations,
1990        } = indices;
1991
1992        let mut futures = vec![];
1993        futures.push(self.spawn_blocking_task(move |this| {
1994            persist_chunk_into_table!(
1995                optimistic_event_emit_package::table,
1996                optimistic_event_emit_packages,
1997                &this.blocking_cp
1998            )
1999        }));
2000
2001        futures.push(self.spawn_blocking_task(move |this| {
2002            persist_chunk_into_table!(
2003                optimistic_event_emit_module::table,
2004                optimistic_event_emit_modules,
2005                &this.blocking_cp
2006            )
2007        }));
2008
2009        futures.push(self.spawn_blocking_task(move |this| {
2010            persist_chunk_into_table!(
2011                optimistic_event_senders::table,
2012                optimistic_event_senders,
2013                &this.blocking_cp
2014            )
2015        }));
2016
2017        futures.push(self.spawn_blocking_task(move |this| {
2018            persist_chunk_into_table!(
2019                optimistic_event_struct_package::table,
2020                optimistic_event_struct_packages,
2021                &this.blocking_cp
2022            )
2023        }));
2024
2025        futures.push(self.spawn_blocking_task(move |this| {
2026            persist_chunk_into_table!(
2027                optimistic_event_struct_module::table,
2028                optimistic_event_struct_modules,
2029                &this.blocking_cp
2030            )
2031        }));
2032
2033        futures.push(self.spawn_blocking_task(move |this| {
2034            persist_chunk_into_table!(
2035                optimistic_event_struct_name::table,
2036                optimistic_event_struct_names,
2037                &this.blocking_cp
2038            )
2039        }));
2040
2041        futures.push(self.spawn_blocking_task(move |this| {
2042            persist_chunk_into_table!(
2043                optimistic_event_struct_instantiation::table,
2044                optimistic_event_struct_instantiations,
2045                &this.blocking_cp
2046            )
2047        }));
2048
2049        futures::future::try_join_all(futures)
2050            .await
2051            .map_err(|e| {
2052                tracing::error!("Failed to join optimistic event indices futures: {e}");
2053                IndexerError::from(e)
2054            })?
2055            .into_iter()
2056            .collect::<Result<Vec<_>, _>>()
2057            .map_err(|e| {
2058                IndexerError::PostgresWrite(format!(
2059                    "Failed to persist all optimistic event indices: {e:?}",
2060                ))
2061            })?;
2062        info!("Persisted optimistic event indices");
2063        Ok(())
2064    }
2065
2066    async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2067        if indices.is_empty() {
2068            return Ok(());
2069        }
2070        let len = indices.len();
2071        let guard = self
2072            .metrics
2073            .checkpoint_db_commit_latency_tx_indices
2074            .start_timer();
2075        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2076
2077        let futures = chunks.into_iter().map(|chunk| {
2078            self.spawn_task(
2079                move |this: Self| async move { this.persist_tx_indices_chunk(chunk).await },
2080            )
2081        });
2082        futures::future::try_join_all(futures)
2083            .await
2084            .map_err(|e| {
2085                tracing::error!("Failed to join persist_tx_indices_chunk futures: {}", e);
2086                IndexerError::from(e)
2087            })?
2088            .into_iter()
2089            .collect::<Result<Vec<_>, _>>()
2090            .map_err(|e| {
2091                IndexerError::PostgresWrite(format!(
2092                    "Failed to persist all tx_indices chunks: {e:?}"
2093                ))
2094            })?;
2095        let elapsed = guard.stop_and_record();
2096        info!(elapsed, "Persisted {} tx_indices chunks", len);
2097        Ok(())
2098    }
2099
2100    async fn persist_optimistic_tx_indices(
2101        &self,
2102        indices: OptimisticTxIndices,
2103    ) -> Result<(), IndexerError> {
2104        let OptimisticTxIndices {
2105            optimistic_tx_senders: senders,
2106            optimistic_tx_recipients: recipients,
2107            optimistic_tx_input_objects: input_objects,
2108            optimistic_tx_changed_objects: changed_objects,
2109            optimistic_tx_pkgs: pkgs,
2110            optimistic_tx_mods: mods,
2111            optimistic_tx_funs: funs,
2112            optimistic_tx_kinds: kinds,
2113        } = indices;
2114
2115        let mut futures = vec![];
2116        futures.push(self.spawn_blocking_task(move |this| {
2117            persist_chunk_into_table!(optimistic_tx_senders::table, senders, &this.blocking_cp)
2118        }));
2119
2120        futures.push(self.spawn_blocking_task(move |this| {
2121            persist_chunk_into_table!(
2122                optimistic_tx_recipients::table,
2123                recipients,
2124                &this.blocking_cp
2125            )
2126        }));
2127
2128        futures.push(self.spawn_blocking_task(move |this| {
2129            persist_chunk_into_table!(
2130                optimistic_tx_input_objects::table,
2131                input_objects,
2132                &this.blocking_cp
2133            )
2134        }));
2135
2136        futures.push(self.spawn_blocking_task(move |this| {
2137            persist_chunk_into_table!(
2138                optimistic_tx_changed_objects::table,
2139                changed_objects,
2140                &this.blocking_cp
2141            )
2142        }));
2143
2144        futures.push(self.spawn_blocking_task(move |this| {
2145            persist_chunk_into_table!(optimistic_tx_calls_pkg::table, pkgs, &this.blocking_cp)
2146        }));
2147
2148        futures.push(self.spawn_blocking_task(move |this| {
2149            persist_chunk_into_table!(optimistic_tx_calls_mod::table, mods, &this.blocking_cp)
2150        }));
2151
2152        futures.push(self.spawn_blocking_task(move |this| {
2153            persist_chunk_into_table!(optimistic_tx_calls_fun::table, funs, &this.blocking_cp)
2154        }));
2155
2156        futures.push(self.spawn_blocking_task(move |this| {
2157            persist_chunk_into_table!(optimistic_tx_kinds::table, kinds, &this.blocking_cp)
2158        }));
2159
2160        futures::future::try_join_all(futures)
2161            .await
2162            .map_err(|e| {
2163                tracing::error!("Failed to join optimistic tx indices futures: {e}");
2164                IndexerError::from(e)
2165            })?
2166            .into_iter()
2167            .collect::<Result<Vec<_>, _>>()
2168            .map_err(|e| {
2169                IndexerError::PostgresWrite(format!(
2170                    "Failed to persist all optimistic tx indices: {e:?}",
2171                ))
2172            })?;
2173        info!("Persisted optimistic tx indices");
2174        Ok(())
2175    }
2176
2177    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2178        self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2179            .await
2180    }
2181
2182    async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2183        self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2184            .await
2185    }
2186
2187    async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2188        let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2189            (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2190            _ => Err(IndexerError::PostgresRead(format!(
2191                "Failed to get checkpoint range for epoch {epoch}"
2192            ))),
2193        }?;
2194
2195        // NOTE: for disaster recovery, min_cp is the min cp of the current epoch, which
2196        // is likely partially pruned already. min_prunable_cp is the min cp to
2197        // be pruned. By std::cmp::max, we will resume the pruning process from
2198        // the next checkpoint, instead of the first cp of the current epoch.
2199        let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2200        min_cp = std::cmp::max(min_cp, min_prunable_cp);
2201        for cp in min_cp..=max_cp {
2202            // NOTE: the order of pruning tables is crucial:
2203            // 1. prune checkpoints table, checkpoints table is the source table of
2204            //    available range,
2205            // we prune it first to make sure that we always have full data for checkpoints
2206            // within the available range;
2207            // 2. then prune tx_* tables;
2208            // 3. then prune pruner_cp_watermark table, which is the checkpoint pruning
2209            //    watermark table and also tx seq source
2210            // of a checkpoint to prune tx_* tables;
2211            // 4. lastly we prune epochs table when all checkpoints of the epoch have been
2212            //    pruned.
2213            info!(
2214                "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2215                cp, epoch, min_prunable_cp
2216            );
2217            self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2218                .await
2219                .unwrap_or_else(|e| {
2220                    tracing::error!("Failed to prune checkpoint {}: {}", cp, e);
2221                });
2222
2223            let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2224            self.execute_in_blocking_worker(move |this| {
2225                this.prune_tx_indices_table(min_tx, max_tx)
2226            })
2227            .await
2228            .unwrap_or_else(|e| {
2229                tracing::error!("Failed to prune transactions for cp {}: {}", cp, e);
2230            });
2231            info!(
2232                "Pruned transactions for checkpoint {} from tx {} to tx {}",
2233                cp, min_tx, max_tx
2234            );
2235            self.execute_in_blocking_worker(move |this| {
2236                this.prune_event_indices_table(min_tx, max_tx)
2237            })
2238            .await
2239            .unwrap_or_else(|e| {
2240                tracing::error!(
2241                    "Failed to prune events of transactions for cp {}: {}",
2242                    cp,
2243                    e
2244                );
2245            });
2246            info!(
2247                "Pruned events of transactions for checkpoint {} from tx {} to tx {}",
2248                cp, min_tx, max_tx
2249            );
2250            self.metrics.last_pruned_transaction.set(max_tx as i64);
2251
2252            self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2253                .await
2254                .unwrap_or_else(|e| {
2255                    tracing::error!(
2256                        "Failed to prune pruner_cp_watermark table for cp {}: {}",
2257                        cp,
2258                        e
2259                    );
2260                });
2261            info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2262            self.metrics.last_pruned_checkpoint.set(cp as i64);
2263        }
2264
2265        Ok(())
2266    }
2267
2268    async fn get_network_total_transactions_by_end_of_epoch(
2269        &self,
2270        epoch: u64,
2271    ) -> Result<u64, IndexerError> {
2272        self.execute_in_blocking_worker(move |this| {
2273            this.get_network_total_transactions_by_end_of_epoch(epoch)
2274        })
2275        .await
2276    }
2277
2278    async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2279        self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2280            .await
2281    }
2282
2283    fn as_any(&self) -> &dyn StdAny {
2284        self
2285    }
2286
2287    /// Persist protocol configs and feature flags until the protocol version
2288    /// for the latest epoch we have stored in the db, inclusive.
2289    fn persist_protocol_configs_and_feature_flags(
2290        &self,
2291        chain_id: Vec<u8>,
2292    ) -> Result<(), IndexerError> {
2293        let chain_id = ChainIdentifier::from(
2294            CheckpointDigest::try_from(chain_id).expect("Unable to convert chain id"),
2295        );
2296
2297        let mut all_configs = vec![];
2298        let mut all_flags = vec![];
2299
2300        let (start_version, end_version) = self.get_protocol_version_index_range()?;
2301        info!(
2302            "Persisting protocol configs with start_version: {}, end_version: {}",
2303            start_version, end_version
2304        );
2305
2306        // Gather all protocol configs and feature flags for all versions between start
2307        // and end.
2308        for version in start_version..=end_version {
2309            let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2310                (version as u64).into(),
2311                chain_id.chain(),
2312            )
2313            .ok_or(IndexerError::Generic(format!(
2314                "Unable to fetch protocol version {} and chain {:?}",
2315                version,
2316                chain_id.chain()
2317            )))?;
2318            let configs_vec = protocol_configs
2319                .attr_map()
2320                .into_iter()
2321                .map(|(k, v)| StoredProtocolConfig {
2322                    protocol_version: version,
2323                    config_name: k,
2324                    config_value: v.map(|v| v.to_string()),
2325                })
2326                .collect::<Vec<_>>();
2327            all_configs.extend(configs_vec);
2328
2329            let feature_flags = protocol_configs
2330                .feature_map()
2331                .into_iter()
2332                .map(|(k, v)| StoredFeatureFlag {
2333                    protocol_version: version,
2334                    flag_name: k,
2335                    flag_value: v,
2336                })
2337                .collect::<Vec<_>>();
2338            all_flags.extend(feature_flags);
2339        }
2340
2341        transactional_blocking_with_retry!(
2342            &self.blocking_cp,
2343            |conn| {
2344                for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2345                    insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2346                }
2347                insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2348                Ok::<(), IndexerError>(())
2349            },
2350            PG_DB_COMMIT_SLEEP_DURATION
2351        )?;
2352        Ok(())
2353    }
2354}
2355
2356fn make_objects_history_to_commit(
2357    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2358) -> Vec<StoredHistoryObject> {
2359    let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2360        .clone()
2361        .into_iter()
2362        .flat_map(|changes| changes.deleted_objects)
2363        .map(|o| o.into())
2364        .collect();
2365    let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2366        .into_iter()
2367        .flat_map(|changes| changes.changed_objects)
2368        .map(|o| o.into())
2369        .collect();
2370    deleted_objects.into_iter().chain(mutated_objects).collect()
2371}
2372
2373/// Partition object changes into deletions and mutations,
2374/// within partition of mutations or deletions, retain the latest with highest
2375/// version; For overlappings of mutations and deletions, only keep one with
2376/// higher version. This is necessary b/c after this step, DB commit will be
2377/// done in parallel and not in order.
2378fn retain_latest_indexed_objects(
2379    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2380) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2381    // Only the last deleted / mutated object will be in the map,
2382    // b/c tx_object_changes are in order and versions always increment,
2383    let (mutations, deletions) = tx_object_changes
2384        .into_iter()
2385        .flat_map(|change| {
2386            change
2387                .changed_objects
2388                .into_iter()
2389                .map(Either::Left)
2390                .chain(
2391                    change
2392                        .deleted_objects
2393                        .into_iter()
2394                        .map(Either::Right),
2395                )
2396        })
2397        .fold(
2398            (HashMap::<ObjectID, IndexedObject>::new(), HashMap::<ObjectID, IndexedDeletedObject>::new()),
2399            |(mut mutations, mut deletions), either_change| {
2400                match either_change {
2401                    // Remove mutation / deletion with a following deletion / mutation,
2402                    // b/c following deletion / mutation always has a higher version.
2403                    // Technically, assertions below are not required, double check just in case.
2404                    Either::Left(mutation) => {
2405                        let id = mutation.object.id();
2406                        let mutation_version = mutation.object.version();
2407                        if let Some(existing) = deletions.remove(&id) {
2408                            assert!(
2409                                existing.object_version < mutation_version.value(),
2410                                "Mutation version ({mutation_version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2411                                existing.object_version
2412                            );
2413                        }
2414                        if let Some(existing) = mutations.insert(id, mutation) {
2415                            assert!(
2416                                existing.object.version() < mutation_version,
2417                                "Mutation version ({mutation_version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2418                                existing.object.version()
2419                            );
2420                        }
2421                    }
2422                    Either::Right(deletion) => {
2423                        let id = deletion.object_id;
2424                        let deletion_version = deletion.object_version;
2425                        if let Some(existing) = mutations.remove(&id) {
2426                            assert!(
2427                                existing.object.version().value() < deletion_version,
2428                                "Deletion version ({deletion_version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2429                                existing.object.version(),
2430                            );
2431                        }
2432                        if let Some(existing) = deletions.insert(id, deletion) {
2433                            assert!(
2434                                existing.object_version < deletion_version,
2435                                "Deletion version ({deletion_version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2436                                existing.object_version
2437                            );
2438                        }
2439                    }
2440                }
2441                (mutations, deletions)
2442            },
2443        );
2444    (
2445        mutations.into_values().collect(),
2446        deletions.into_values().collect(),
2447    )
2448}