1use core::result::Result::Ok;
6use std::{
7 any::Any as StdAny,
8 collections::{BTreeMap, HashMap},
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use diesel::{
14 ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
15 dsl::{max, min, sql},
16 sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
17 upsert::excluded,
18};
19use downcast::Any;
20use iota_protocol_config::ProtocolConfig;
21use iota_types::{
22 base_types::ObjectID,
23 digests::{ChainIdentifier, CheckpointDigest},
24 messages_checkpoint::CheckpointSequenceNumber,
25};
26use itertools::Itertools;
27use strum::IntoEnumIterator;
28use tap::TapFallible;
29use tracing::info;
30
31use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
32use crate::{
33 blocking_call_is_ok_or_panic,
34 db::ConnectionPool,
35 errors::{Context, IndexerError, IndexerResult},
36 ingestion::{
37 common::{
38 persist::{CommitterWatermark, ObjectsSnapshotHandlerTables},
39 prepare::{
40 CheckpointObjectChanges, LiveObject, RemovedObject,
41 retain_latest_objects_from_checkpoint_batch,
42 },
43 },
44 primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
45 },
46 insert_or_ignore_into,
47 metrics::IndexerMetrics,
48 models::{
49 checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
50 display::StoredDisplay,
51 epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
52 events::StoredEvent,
53 obj_indices::StoredObjectVersion,
54 objects::{
55 StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
56 StoredObjects,
57 },
58 packages::StoredPackage,
59 transactions::{
60 CheckpointTxGlobalOrder, IndexStatus, OptimisticTransaction, StoredTransaction,
61 },
62 tx_indices::TxIndexSplit,
63 watermarks::StoredWatermark,
64 },
65 on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
66 persist_chunk_into_table_in_existing_connection,
67 pruning::pruner::PrunableTable,
68 read_only_blocking, run_query, run_query_with_retry,
69 schema::{
70 chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
71 event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
72 event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
73 objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
74 transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
75 tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
76 tx_wrapped_or_deleted_objects, watermarks,
77 },
78 store::{IndexerStore, diesel_macro::mark_in_blocking_pool},
79 transactional_blocking_with_retry,
80 types::{
81 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
82 IndexedPackage, IndexedTransaction, TxIndex,
83 },
84};
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct TxGlobalOrderCursor {
90 pub global_sequence_number: i64,
91 pub optimistic_sequence_number: i64,
92}
93
94#[macro_export]
95macro_rules! chunk {
96 ($data: expr, $size: expr) => {{
97 $data
98 .into_iter()
99 .chunks($size)
100 .into_iter()
101 .map(|c| c.collect())
102 .collect::<Vec<Vec<_>>>()
103 }};
104}
105
106macro_rules! prune_tx_or_event_indice_table {
107 ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
108 diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
109 .execute($conn)
110 .map_err(IndexerError::from)
111 .context($context_msg)?;
112 };
113}
114
115const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
120const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
122const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
126const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
127
128#[derive(Clone)]
129pub struct PgIndexerStoreConfig {
130 pub parallel_chunk_size: usize,
131 pub parallel_objects_chunk_size: usize,
132}
133
134pub struct PgIndexerStore {
135 blocking_cp: ConnectionPool,
136 metrics: IndexerMetrics,
137 partition_manager: PgPartitionManager,
138 config: PgIndexerStoreConfig,
139}
140
141impl Clone for PgIndexerStore {
142 fn clone(&self) -> PgIndexerStore {
143 Self {
144 blocking_cp: self.blocking_cp.clone(),
145 metrics: self.metrics.clone(),
146 partition_manager: self.partition_manager.clone(),
147 config: self.config.clone(),
148 }
149 }
150}
151
152impl PgIndexerStore {
153 pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
154 let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
155 .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
156 .parse::<usize>()
157 .unwrap();
158 let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
159 .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
160 .parse::<usize>()
161 .unwrap();
162 let partition_manager = PgPartitionManager::new(blocking_cp.clone())
163 .expect("failed to initialize partition manager");
164 let config = PgIndexerStoreConfig {
165 parallel_chunk_size,
166 parallel_objects_chunk_size,
167 };
168
169 Self {
170 blocking_cp,
171 metrics,
172 partition_manager,
173 config,
174 }
175 }
176
177 pub fn get_metrics(&self) -> IndexerMetrics {
178 self.metrics.clone()
179 }
180
181 pub fn blocking_cp(&self) -> ConnectionPool {
182 self.blocking_cp.clone()
183 }
184
185 pub(crate) async fn get_latest_epoch_id_in_blocking_worker(
186 &self,
187 ) -> Result<Option<u64>, IndexerError> {
188 self.execute_in_blocking_worker(move |this| this.get_latest_epoch_id())
189 .await
190 }
191
192 pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
193 read_only_blocking!(&self.blocking_cp, |conn| {
194 epochs::dsl::epochs
195 .select(max(epochs::epoch))
196 .first::<Option<i64>>(conn)
197 .map(|v| v.map(|v| v as u64))
198 })
199 .context("Failed reading latest epoch id from PostgresDB")
200 }
201
202 pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
204 let start = read_only_blocking!(&self.blocking_cp, |conn| {
207 protocol_configs::dsl::protocol_configs
208 .select(max(protocol_configs::protocol_version))
209 .first::<Option<i64>>(conn)
210 })
211 .context("Failed reading latest protocol version from PostgresDB")?
212 .map_or(1, |v| v + 1);
213
214 let end = read_only_blocking!(&self.blocking_cp, |conn| {
216 epochs::dsl::epochs
217 .select(max(epochs::protocol_version))
218 .first::<Option<i64>>(conn)
219 })
220 .context("Failed reading latest epoch protocol version from PostgresDB")?
221 .unwrap_or(1);
222 Ok((start, end))
223 }
224
225 pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
226 read_only_blocking!(&self.blocking_cp, |conn| {
227 chain_identifier::dsl::chain_identifier
228 .select(chain_identifier::checkpoint_digest)
229 .first::<Vec<u8>>(conn)
230 .optional()
231 })
232 .context("Failed reading chain id from PostgresDB")
233 }
234
235 fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
236 read_only_blocking!(&self.blocking_cp, |conn| {
237 checkpoints::dsl::checkpoints
238 .select(max(checkpoints::sequence_number))
239 .first::<Option<i64>>(conn)
240 .map(|v| v.map(|v| v as u64))
241 })
242 .context("Failed reading latest checkpoint sequence number from PostgresDB")
243 }
244
245 fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
246 read_only_blocking!(&self.blocking_cp, |conn| {
247 checkpoints::dsl::checkpoints
248 .select((
249 min(checkpoints::sequence_number),
250 max(checkpoints::sequence_number),
251 ))
252 .first::<(Option<i64>, Option<i64>)>(conn)
253 .map(|(min, max)| {
254 (
255 min.unwrap_or_default() as u64,
256 max.unwrap_or_default() as u64,
257 )
258 })
259 })
260 .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
261 }
262
263 fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
264 read_only_blocking!(&self.blocking_cp, |conn| {
265 epochs::dsl::epochs
266 .select((min(epochs::epoch), max(epochs::epoch)))
267 .first::<(Option<i64>, Option<i64>)>(conn)
268 .map(|(min, max)| {
269 (
270 min.unwrap_or_default() as u64,
271 max.unwrap_or_default() as u64,
272 )
273 })
274 })
275 .context("Failed reading min and max epoch numbers from PostgresDB")
276 }
277
278 fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
279 read_only_blocking!(&self.blocking_cp, |conn| {
280 pruner_cp_watermark::dsl::pruner_cp_watermark
281 .select(min(pruner_cp_watermark::checkpoint_sequence_number))
282 .first::<Option<i64>>(conn)
283 .map(|v| v.unwrap_or_default() as u64)
284 })
285 .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
286 }
287
288 fn get_checkpoint_range_for_epoch(
289 &self,
290 epoch: u64,
291 ) -> Result<(u64, Option<u64>), IndexerError> {
292 read_only_blocking!(&self.blocking_cp, |conn| {
293 epochs::dsl::epochs
294 .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
295 .filter(epochs::epoch.eq(epoch as i64))
296 .first::<(i64, Option<i64>)>(conn)
297 .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
298 })
299 .context(
300 format!("failed reading checkpoint range from PostgresDB for epoch {epoch}").as_str(),
301 )
302 }
303
304 fn get_transaction_range_for_checkpoint(
305 &self,
306 checkpoint: u64,
307 ) -> Result<(u64, u64), IndexerError> {
308 read_only_blocking!(&self.blocking_cp, |conn| {
309 pruner_cp_watermark::dsl::pruner_cp_watermark
310 .select((
311 pruner_cp_watermark::min_tx_sequence_number,
312 pruner_cp_watermark::max_tx_sequence_number,
313 ))
314 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
315 .first::<(i64, i64)>(conn)
316 .map(|(min, max)| (min as u64, max as u64))
317 })
318 .context(
319 format!("failed reading transaction range from PostgresDB for checkpoint {checkpoint}")
320 .as_str(),
321 )
322 }
323
324 pub(crate) async fn get_global_order_for_tx_seq_in_blocking_worker(
325 &self,
326 tx_seq: i64,
327 ) -> Result<TxGlobalOrderCursor, IndexerError> {
328 self.execute_in_blocking_worker(move |this| this.get_global_order_for_tx_seq(tx_seq))
329 .await
330 }
331
332 fn get_global_order_for_tx_seq(
333 &self,
334 tx_seq: i64,
335 ) -> Result<TxGlobalOrderCursor, IndexerError> {
336 let result = read_only_blocking!(&self.blocking_cp, |conn| {
337 tx_global_order::dsl::tx_global_order
338 .select((
339 tx_global_order::global_sequence_number,
340 tx_global_order::optimistic_sequence_number,
341 ))
342 .filter(tx_global_order::chk_tx_sequence_number.eq(tx_seq))
343 .first::<(i64, i64)>(conn)
344 })
345 .context(
346 format!("failed reading global sequence number from PostgresDB for tx seq {tx_seq}")
347 .as_str(),
348 )?;
349 let (global_sequence_number, optimistic_sequence_number) = result;
350 Ok(TxGlobalOrderCursor {
351 global_sequence_number,
352 optimistic_sequence_number,
353 })
354 }
355
356 pub(crate) async fn prune_optimistic_transactions_up_to_in_blocking_worker(
357 &self,
358 to: TxGlobalOrderCursor,
359 limit: i64,
360 ) -> IndexerResult<usize> {
361 self.execute_in_blocking_worker(move |this| {
362 this.prune_optimistic_transactions_up_to(to, limit)
363 })
364 .await
365 }
366
367 fn prune_optimistic_transactions_up_to(
368 &self,
369 to: TxGlobalOrderCursor,
370 limit: i64,
371 ) -> IndexerResult<usize> {
372 transactional_blocking_with_retry!(
373 &self.blocking_cp,
374 |conn| {
375 let sql = r#"
376 WITH ids_to_delete AS (
377 SELECT global_sequence_number, optimistic_sequence_number
378 FROM optimistic_transactions
379 WHERE (global_sequence_number, optimistic_sequence_number) <= ($1, $2)
380 ORDER BY global_sequence_number, optimistic_sequence_number
381 FOR UPDATE LIMIT $3
382 )
383 DELETE FROM optimistic_transactions otx
384 USING ids_to_delete
385 WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
386 (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
387 "#;
388 diesel::sql_query(sql)
389 .bind::<BigInt, _>(to.global_sequence_number)
390 .bind::<BigInt, _>(to.optimistic_sequence_number)
391 .bind::<BigInt, _>(limit)
392 .execute(conn)
393 .map_err(IndexerError::from)
394 .context(
395 format!("failed to prune optimistic_transactions table to {to:?} with limit {limit}").as_str(),
396 )
397 },
398 PG_DB_COMMIT_SLEEP_DURATION
399 )
400 }
401
402 fn get_latest_object_snapshot_watermark(
403 &self,
404 ) -> Result<Option<CommitterWatermark>, IndexerError> {
405 read_only_blocking!(&self.blocking_cp, |conn| {
406 watermarks::table
407 .select((
408 watermarks::epoch_hi_inclusive,
409 watermarks::checkpoint_hi_inclusive,
410 watermarks::tx_hi,
411 ))
412 .filter(
413 watermarks::entity
414 .eq(ObjectsSnapshotHandlerTables::ObjectsSnapshot.to_string()),
415 )
416 .first::<(i64, i64, i64)>(conn)
417 .optional()
419 .map(|v| {
420 v.map(|(epoch, cp, tx)| CommitterWatermark {
421 epoch_hi_inclusive: epoch as u64,
422 checkpoint_hi_inclusive: cp as u64,
423 tx_hi: tx as u64,
424 })
425 })
426 })
427 .context("Failed reading latest object snapshot watermark from PostgresDB")
428 }
429
430 fn get_latest_object_snapshot_checkpoint_sequence_number(
431 &self,
432 ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
433 read_only_blocking!(&self.blocking_cp, |conn| {
434 objects_snapshot::table
435 .select(max(objects_snapshot::checkpoint_sequence_number))
436 .first::<Option<i64>>(conn)
437 .map(|v| v.map(|v| v as CheckpointSequenceNumber))
438 })
439 .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
440 }
441
442 fn persist_display_updates(
443 &self,
444 display_updates: BTreeMap<String, StoredDisplay>,
445 ) -> Result<(), IndexerError> {
446 transactional_blocking_with_retry!(
447 &self.blocking_cp,
448 {
449 let value = display_updates.values().collect::<Vec<_>>();
450 |conn| self.persist_displays_in_existing_transaction(conn, value)
451 },
452 PG_DB_COMMIT_SLEEP_DURATION
453 )?;
454
455 Ok(())
456 }
457
458 fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
459 let guard = self
460 .metrics
461 .checkpoint_db_commit_latency_objects_chunks
462 .start_timer();
463 let len = objects.len();
464 let raw_query = r#"
465 INSERT INTO objects (
466 object_id,
467 object_version,
468 object_digest,
469 owner_type,
470 owner_id,
471 object_type,
472 object_type_package,
473 object_type_module,
474 object_type_name,
475 serialized_object,
476 coin_type,
477 coin_balance,
478 df_kind
479 )
480 SELECT
481 u.object_id,
482 u.object_version,
483 u.object_digest,
484 u.owner_type,
485 u.owner_id,
486 u.object_type,
487 u.object_type_package,
488 u.object_type_module,
489 u.object_type_name,
490 u.serialized_object,
491 u.coin_type,
492 u.coin_balance,
493 u.df_kind
494 FROM UNNEST(
495 $1::BYTEA[],
496 $2::BIGINT[],
497 $3::BYTEA[],
498 $4::SMALLINT[],
499 $5::BYTEA[],
500 $6::TEXT[],
501 $7::BYTEA[],
502 $8::TEXT[],
503 $9::TEXT[],
504 $10::BYTEA[],
505 $11::TEXT[],
506 $12::BIGINT[],
507 $13::SMALLINT[],
508 $14::BYTEA[]
509 ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest)
510 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
511 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
512 ON CONFLICT (object_id) DO UPDATE
513 SET
514 object_version = EXCLUDED.object_version,
515 object_digest = EXCLUDED.object_digest,
516 owner_type = EXCLUDED.owner_type,
517 owner_id = EXCLUDED.owner_id,
518 object_type = EXCLUDED.object_type,
519 object_type_package = EXCLUDED.object_type_package,
520 object_type_module = EXCLUDED.object_type_module,
521 object_type_name = EXCLUDED.object_type_name,
522 serialized_object = EXCLUDED.serialized_object,
523 coin_type = EXCLUDED.coin_type,
524 coin_balance = EXCLUDED.coin_balance,
525 df_kind = EXCLUDED.df_kind
526 "#;
527 let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
528 .into_iter()
529 .map(LiveObject::split)
530 .map(|(indexed_object, tx_digest)| {
531 (
532 StoredObject::from(indexed_object),
533 tx_digest.into_inner().to_vec(),
534 )
535 })
536 .unzip();
537 let query = diesel::sql_query(raw_query)
538 .bind::<Array<Bytea>, _>(objects.object_ids)
539 .bind::<Array<BigInt>, _>(objects.object_versions)
540 .bind::<Array<Bytea>, _>(objects.object_digests)
541 .bind::<Array<SmallInt>, _>(objects.owner_types)
542 .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
543 .bind::<Array<Nullable<Text>>, _>(objects.object_types)
544 .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
545 .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
546 .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
547 .bind::<Array<Bytea>, _>(objects.serialized_objects)
548 .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
549 .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
550 .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
551 .bind::<Array<Bytea>, _>(tx_digests);
552 transactional_blocking_with_retry!(
553 &self.blocking_cp,
554 |conn| {
555 query.clone().execute(conn)?;
556 Ok::<(), IndexerError>(())
557 },
558 PG_DB_COMMIT_SLEEP_DURATION
559 )
560 .tap_ok(|_| {
561 let elapsed = guard.stop_and_record();
562 info!(elapsed, "Persisted {len} chunked objects");
563 })
564 .tap_err(|e| {
565 tracing::error!("failed to persist object mutations with error: {e}");
566 })
567 }
568
569 fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
570 let guard = self
571 .metrics
572 .checkpoint_db_commit_latency_objects_chunks
573 .start_timer();
574 let len = objects.len();
575 let raw_query = r#"
576 DELETE FROM objects
577 WHERE object_id IN (
578 SELECT u.object_id
579 FROM UNNEST(
580 $1::BYTEA[],
581 $2::BYTEA[]
582 ) AS u(object_id, tx_digest)
583 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
584 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
585 )
586 "#;
587 let (object_ids, tx_digests): (Vec<_>, Vec<_>) = objects
588 .into_iter()
589 .map(|removed_object| {
590 (
591 removed_object.object_id().to_vec(),
592 removed_object.transaction_digest.into_inner().to_vec(),
593 )
594 })
595 .unzip();
596 let query = diesel::sql_query(raw_query)
597 .bind::<Array<Bytea>, _>(object_ids)
598 .bind::<Array<Bytea>, _>(tx_digests);
599 transactional_blocking_with_retry!(
600 &self.blocking_cp,
601 |conn| {
602 query.clone().execute(conn)?;
603 Ok::<(), IndexerError>(())
604 },
605 PG_DB_COMMIT_SLEEP_DURATION
606 )
607 .tap_ok(|_| {
608 let elapsed = guard.stop_and_record();
609 info!(elapsed, "Deleted {len} chunked objects");
610 })
611 .tap_err(|e| {
612 tracing::error!("failed to persist object deletions with error: {e}");
613 })
614 }
615
616 fn persist_object_mutation_chunk_in_existing_transaction(
617 &self,
618 conn: &mut PgConnection,
619 mutated_object_mutation_chunk: Vec<StoredObject>,
620 ) -> Result<(), IndexerError> {
621 on_conflict_do_update!(
622 objects::table,
623 mutated_object_mutation_chunk,
624 objects::object_id,
625 (
626 objects::object_id.eq(excluded(objects::object_id)),
627 objects::object_version.eq(excluded(objects::object_version)),
628 objects::object_digest.eq(excluded(objects::object_digest)),
629 objects::owner_type.eq(excluded(objects::owner_type)),
630 objects::owner_id.eq(excluded(objects::owner_id)),
631 objects::object_type.eq(excluded(objects::object_type)),
632 objects::serialized_object.eq(excluded(objects::serialized_object)),
633 objects::coin_type.eq(excluded(objects::coin_type)),
634 objects::coin_balance.eq(excluded(objects::coin_balance)),
635 objects::df_kind.eq(excluded(objects::df_kind)),
636 ),
637 conn
638 );
639 Ok::<(), IndexerError>(())
640 }
641
642 fn persist_object_deletion_chunk_in_existing_transaction(
643 &self,
644 conn: &mut PgConnection,
645 deleted_objects_chunk: Vec<StoredDeletedObject>,
646 ) -> Result<(), IndexerError> {
647 diesel::delete(
648 objects::table.filter(
649 objects::object_id.eq_any(
650 deleted_objects_chunk
651 .iter()
652 .map(|o| o.object_id.clone())
653 .collect::<Vec<_>>(),
654 ),
655 ),
656 )
657 .execute(conn)
658 .map_err(IndexerError::from)
659 .context("Failed to write object deletion to PostgresDB")?;
660
661 Ok::<(), IndexerError>(())
662 }
663
664 fn backfill_objects_snapshot_chunk(
665 &self,
666 objects_snapshot: Vec<StoredObjectSnapshot>,
667 ) -> Result<(), IndexerError> {
668 let guard = self
669 .metrics
670 .checkpoint_db_commit_latency_objects_snapshot_chunks
671 .start_timer();
672 transactional_blocking_with_retry!(
673 &self.blocking_cp,
674 |conn| {
675 for objects_snapshot_chunk in
676 objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
677 {
678 on_conflict_do_update!(
679 objects_snapshot::table,
680 objects_snapshot_chunk,
681 objects_snapshot::object_id,
682 (
683 objects_snapshot::object_version
684 .eq(excluded(objects_snapshot::object_version)),
685 objects_snapshot::object_status
686 .eq(excluded(objects_snapshot::object_status)),
687 objects_snapshot::object_digest
688 .eq(excluded(objects_snapshot::object_digest)),
689 objects_snapshot::checkpoint_sequence_number
690 .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
691 objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
692 objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
693 objects_snapshot::object_type_package
694 .eq(excluded(objects_snapshot::object_type_package)),
695 objects_snapshot::object_type_module
696 .eq(excluded(objects_snapshot::object_type_module)),
697 objects_snapshot::object_type_name
698 .eq(excluded(objects_snapshot::object_type_name)),
699 objects_snapshot::object_type
700 .eq(excluded(objects_snapshot::object_type)),
701 objects_snapshot::serialized_object
702 .eq(excluded(objects_snapshot::serialized_object)),
703 objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
704 objects_snapshot::coin_balance
705 .eq(excluded(objects_snapshot::coin_balance)),
706 objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
707 ),
708 conn
709 );
710 }
711 Ok::<(), IndexerError>(())
712 },
713 PG_DB_COMMIT_SLEEP_DURATION
714 )
715 .tap_ok(|_| {
716 let elapsed = guard.stop_and_record();
717 info!(
718 elapsed,
719 "Persisted {} chunked objects snapshot",
720 objects_snapshot.len(),
721 );
722 })
723 .tap_err(|e| {
724 tracing::error!("failed to persist object snapshot with error: {e}");
725 })
726 }
727
728 fn persist_objects_history_chunk(
729 &self,
730 stored_objects_history: Vec<StoredHistoryObject>,
731 ) -> Result<(), IndexerError> {
732 let guard = self
733 .metrics
734 .checkpoint_db_commit_latency_objects_history_chunks
735 .start_timer();
736 transactional_blocking_with_retry!(
737 &self.blocking_cp,
738 |conn| {
739 for stored_objects_history_chunk in
740 stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
741 {
742 insert_or_ignore_into!(
743 objects_history::table,
744 stored_objects_history_chunk,
745 conn
746 );
747 }
748 Ok::<(), IndexerError>(())
749 },
750 PG_DB_COMMIT_SLEEP_DURATION
751 )
752 .tap_ok(|_| {
753 let elapsed = guard.stop_and_record();
754 info!(
755 elapsed,
756 "Persisted {} chunked objects history",
757 stored_objects_history.len(),
758 );
759 })
760 .tap_err(|e| {
761 tracing::error!("failed to persist object history with error: {e}");
762 })
763 }
764
765 fn persist_object_version_chunk(
766 &self,
767 object_versions: Vec<StoredObjectVersion>,
768 ) -> Result<(), IndexerError> {
769 let guard = self
770 .metrics
771 .checkpoint_db_commit_latency_objects_version_chunks
772 .start_timer();
773
774 transactional_blocking_with_retry!(
775 &self.blocking_cp,
776 |conn| {
777 for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
778 {
779 insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
780 }
781 Ok::<(), IndexerError>(())
782 },
783 PG_DB_COMMIT_SLEEP_DURATION
784 )
785 .tap_ok(|_| {
786 let elapsed = guard.stop_and_record();
787 info!(
788 elapsed,
789 "Persisted {} chunked object versions",
790 object_versions.len(),
791 );
792 })
793 .tap_err(|e| {
794 tracing::error!("failed to persist object versions with error: {e}");
795 })
796 }
797
798 fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
799 let Some(first_checkpoint) = checkpoints.first() else {
800 return Ok(());
801 };
802
803 if first_checkpoint.sequence_number == 0 {
806 let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
807 self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
808 transactional_blocking_with_retry!(
809 &self.blocking_cp,
810 |conn| {
811 let checkpoint_digest =
812 first_checkpoint.checkpoint_digest.into_inner().to_vec();
813 insert_or_ignore_into!(
814 chain_identifier::table,
815 StoredChainIdentifier { checkpoint_digest },
816 conn
817 );
818 Ok::<(), IndexerError>(())
819 },
820 PG_DB_COMMIT_SLEEP_DURATION
821 )?;
822 }
823 let guard = self
824 .metrics
825 .checkpoint_db_commit_latency_checkpoints
826 .start_timer();
827
828 let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
829 transactional_blocking_with_retry!(
830 &self.blocking_cp,
831 |conn| {
832 for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
833 insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
834 }
835 Ok::<(), IndexerError>(())
836 },
837 PG_DB_COMMIT_SLEEP_DURATION
838 )
839 .tap_ok(|_| {
840 info!(
841 "Persisted {} pruner_cp_watermark rows.",
842 stored_cp_txs.len(),
843 );
844 })
845 .tap_err(|e| {
846 tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
847 })?;
848
849 let stored_checkpoints = checkpoints
850 .iter()
851 .map(StoredCheckpoint::from)
852 .collect::<Vec<_>>();
853 transactional_blocking_with_retry!(
854 &self.blocking_cp,
855 |conn| {
856 for stored_checkpoint_chunk in
857 stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
858 {
859 insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
860 let time_now_ms = chrono::Utc::now().timestamp_millis();
861 for stored_checkpoint in stored_checkpoint_chunk {
862 self.metrics
863 .db_commit_lag_ms
864 .set(time_now_ms - stored_checkpoint.timestamp_ms);
865 self.metrics.max_committed_checkpoint_sequence_number.set(
866 stored_checkpoint.sequence_number,
867 );
868 self.metrics.committed_checkpoint_timestamp_ms.set(
869 stored_checkpoint.timestamp_ms,
870 );
871 }
872 for stored_checkpoint in stored_checkpoint_chunk {
873 info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
874 }
875 }
876 Ok::<(), IndexerError>(())
877 },
878 PG_DB_COMMIT_SLEEP_DURATION
879 )
880 .tap_ok(|_| {
881 let elapsed = guard.stop_and_record();
882 info!(
883 elapsed,
884 "Persisted {} checkpoints",
885 stored_checkpoints.len()
886 );
887 })
888 .tap_err(|e| {
889 tracing::error!("failed to persist checkpoints with error: {e}");
890 })
891 }
892
893 fn persist_transactions_chunk(
894 &self,
895 transactions: Vec<IndexedTransaction>,
896 ) -> Result<(), IndexerError> {
897 let guard = self
898 .metrics
899 .checkpoint_db_commit_latency_transactions_chunks
900 .start_timer();
901 let transformation_guard = self
902 .metrics
903 .checkpoint_db_commit_latency_transactions_chunks_transformation
904 .start_timer();
905 let transactions = transactions
906 .iter()
907 .map(StoredTransaction::from)
908 .collect::<Vec<_>>();
909 drop(transformation_guard);
910
911 transactional_blocking_with_retry!(
912 &self.blocking_cp,
913 |conn| {
914 for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
915 insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
916 }
917 Ok::<(), IndexerError>(())
918 },
919 PG_DB_COMMIT_SLEEP_DURATION
920 )
921 .tap_ok(|_| {
922 let elapsed = guard.stop_and_record();
923 info!(
924 elapsed,
925 "Persisted {} chunked transactions",
926 transactions.len()
927 );
928 })
929 .tap_err(|e| {
930 tracing::error!("failed to persist transactions with error: {e}");
931 })
932 }
933
934 fn persist_tx_global_order_chunk(
935 &self,
936 tx_order: Vec<CheckpointTxGlobalOrder>,
937 ) -> Result<(), IndexerError> {
938 let guard = self
939 .metrics
940 .checkpoint_db_commit_latency_tx_insertion_order_chunks
941 .start_timer();
942
943 transactional_blocking_with_retry!(
944 &self.blocking_cp,
945 |conn| {
946 for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
947 insert_or_ignore_into!(tx_global_order::table, tx_order_chunk, conn);
948 }
949 Ok::<(), IndexerError>(())
950 },
951 PG_DB_COMMIT_SLEEP_DURATION
952 )
953 .tap_ok(|_| {
954 let elapsed = guard.stop_and_record();
955 info!(
956 elapsed,
957 "Persisted {} chunked txs insertion order",
958 tx_order.len()
959 );
960 })
961 .tap_err(|e| {
962 tracing::error!("failed to persist txs insertion order with error: {e}");
963 })
964 }
965
966 fn update_status_for_checkpoint_transactions_chunk(
973 &self,
974 tx_order: Vec<CheckpointTxGlobalOrder>,
975 ) -> Result<(), IndexerError> {
976 let guard = self
977 .metrics
978 .checkpoint_db_commit_latency_tx_insertion_order_chunks
979 .start_timer();
980
981 let num_transactions = tx_order.len();
982 transactional_blocking_with_retry!(
983 &self.blocking_cp,
984 |conn| {
985 on_conflict_do_update_with_condition!(
986 tx_global_order::table,
987 tx_order.clone(),
988 tx_global_order::tx_digest,
989 tx_global_order::optimistic_sequence_number.eq(IndexStatus::Completed),
990 tx_global_order::optimistic_sequence_number.eq(IndexStatus::Started),
991 conn
992 );
993 on_conflict_do_update_with_condition!(
994 tx_global_order::table,
995 tx_order.clone(),
996 tx_global_order::tx_digest,
997 tx_global_order::chk_tx_sequence_number
998 .eq(excluded(tx_global_order::chk_tx_sequence_number)),
999 tx_global_order::chk_tx_sequence_number.is_null(),
1000 conn
1001 );
1002 Ok::<(), IndexerError>(())
1003 },
1004 PG_DB_COMMIT_SLEEP_DURATION
1005 )
1006 .tap_ok(|_| {
1007 let elapsed = guard.stop_and_record();
1008 info!(
1009 elapsed,
1010 "Updated {} chunked values of `tx_global_order`", num_transactions
1011 );
1012 })
1013 .tap_err(|e| {
1014 tracing::error!("failed to update `tx_global_order` with error: {e}");
1015 })
1016 }
1017
1018 fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1019 let guard = self
1020 .metrics
1021 .checkpoint_db_commit_latency_events_chunks
1022 .start_timer();
1023 let len = events.len();
1024 let events = events
1025 .into_iter()
1026 .map(StoredEvent::from)
1027 .collect::<Vec<_>>();
1028
1029 transactional_blocking_with_retry!(
1030 &self.blocking_cp,
1031 |conn| {
1032 for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1033 insert_or_ignore_into!(events::table, event_chunk, conn);
1034 }
1035 Ok::<(), IndexerError>(())
1036 },
1037 PG_DB_COMMIT_SLEEP_DURATION
1038 )
1039 .tap_ok(|_| {
1040 let elapsed = guard.stop_and_record();
1041 info!(elapsed, "Persisted {} chunked events", len);
1042 })
1043 .tap_err(|e| {
1044 tracing::error!("failed to persist events with error: {e}");
1045 })
1046 }
1047
1048 fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1049 if packages.is_empty() {
1050 return Ok(());
1051 }
1052 let guard = self
1053 .metrics
1054 .checkpoint_db_commit_latency_packages
1055 .start_timer();
1056 let packages = packages
1057 .into_iter()
1058 .map(StoredPackage::from)
1059 .collect::<Vec<_>>();
1060 transactional_blocking_with_retry!(
1061 &self.blocking_cp,
1062 |conn| {
1063 for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1064 on_conflict_do_update!(
1065 packages::table,
1066 packages_chunk,
1067 packages::package_id,
1068 (
1069 packages::package_id.eq(excluded(packages::package_id)),
1070 packages::move_package.eq(excluded(packages::move_package)),
1071 ),
1072 conn
1073 );
1074 }
1075 Ok::<(), IndexerError>(())
1076 },
1077 PG_DB_COMMIT_SLEEP_DURATION
1078 )
1079 .tap_ok(|_| {
1080 let elapsed = guard.stop_and_record();
1081 info!(elapsed, "Persisted {} packages", packages.len());
1082 })
1083 .tap_err(|e| {
1084 tracing::error!("failed to persist packages with error: {e}");
1085 })
1086 }
1087
1088 async fn persist_event_indices_chunk(
1089 &self,
1090 indices: Vec<EventIndex>,
1091 ) -> Result<(), IndexerError> {
1092 let guard = self
1093 .metrics
1094 .checkpoint_db_commit_latency_event_indices_chunks
1095 .start_timer();
1096 let len = indices.len();
1097 let (
1098 event_emit_packages,
1099 event_emit_modules,
1100 event_senders,
1101 event_struct_packages,
1102 event_struct_modules,
1103 event_struct_names,
1104 event_struct_instantiations,
1105 ) = indices.into_iter().map(|i| i.split()).fold(
1106 (
1107 Vec::new(),
1108 Vec::new(),
1109 Vec::new(),
1110 Vec::new(),
1111 Vec::new(),
1112 Vec::new(),
1113 Vec::new(),
1114 ),
1115 |(
1116 mut event_emit_packages,
1117 mut event_emit_modules,
1118 mut event_senders,
1119 mut event_struct_packages,
1120 mut event_struct_modules,
1121 mut event_struct_names,
1122 mut event_struct_instantiations,
1123 ),
1124 index| {
1125 event_emit_packages.push(index.0);
1126 event_emit_modules.push(index.1);
1127 event_senders.push(index.2);
1128 event_struct_packages.push(index.3);
1129 event_struct_modules.push(index.4);
1130 event_struct_names.push(index.5);
1131 event_struct_instantiations.push(index.6);
1132 (
1133 event_emit_packages,
1134 event_emit_modules,
1135 event_senders,
1136 event_struct_packages,
1137 event_struct_modules,
1138 event_struct_names,
1139 event_struct_instantiations,
1140 )
1141 },
1142 );
1143
1144 let mut futures = vec![];
1146 futures.push(self.spawn_blocking_task(move |this| {
1147 persist_chunk_into_table!(
1148 event_emit_package::table,
1149 event_emit_packages,
1150 &this.blocking_cp
1151 )
1152 }));
1153
1154 futures.push(self.spawn_blocking_task(move |this| {
1155 persist_chunk_into_table!(
1156 event_emit_module::table,
1157 event_emit_modules,
1158 &this.blocking_cp
1159 )
1160 }));
1161
1162 futures.push(self.spawn_blocking_task(move |this| {
1163 persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
1164 }));
1165
1166 futures.push(self.spawn_blocking_task(move |this| {
1167 persist_chunk_into_table!(
1168 event_struct_package::table,
1169 event_struct_packages,
1170 &this.blocking_cp
1171 )
1172 }));
1173
1174 futures.push(self.spawn_blocking_task(move |this| {
1175 persist_chunk_into_table!(
1176 event_struct_module::table,
1177 event_struct_modules,
1178 &this.blocking_cp
1179 )
1180 }));
1181
1182 futures.push(self.spawn_blocking_task(move |this| {
1183 persist_chunk_into_table!(
1184 event_struct_name::table,
1185 event_struct_names,
1186 &this.blocking_cp
1187 )
1188 }));
1189
1190 futures.push(self.spawn_blocking_task(move |this| {
1191 persist_chunk_into_table!(
1192 event_struct_instantiation::table,
1193 event_struct_instantiations,
1194 &this.blocking_cp
1195 )
1196 }));
1197
1198 futures::future::try_join_all(futures)
1199 .await
1200 .map_err(|e| {
1201 tracing::error!("failed to join event indices futures in a chunk: {e}");
1202 IndexerError::from(e)
1203 })?
1204 .into_iter()
1205 .collect::<Result<Vec<_>, _>>()
1206 .map_err(|e| {
1207 IndexerError::PostgresWrite(format!(
1208 "Failed to persist all event indices in a chunk: {e:?}"
1209 ))
1210 })?;
1211 let elapsed = guard.stop_and_record();
1212 info!(elapsed, "Persisted {} chunked event indices", len);
1213 Ok(())
1214 }
1215
1216 async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1217 let guard = self
1218 .metrics
1219 .checkpoint_db_commit_latency_tx_indices_chunks
1220 .start_timer();
1221 let len = indices.len();
1222
1223 let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
1224
1225 let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1226 let recipients: Vec<_> = splits
1227 .iter()
1228 .flat_map(|ix| ix.tx_recipients.clone())
1229 .collect();
1230 let input_objects: Vec<_> = splits
1231 .iter()
1232 .flat_map(|ix| ix.tx_input_objects.clone())
1233 .collect();
1234 let changed_objects: Vec<_> = splits
1235 .iter()
1236 .flat_map(|ix| ix.tx_changed_objects.clone())
1237 .collect();
1238 let wrapped_or_deleted_objects: Vec<_> = splits
1239 .iter()
1240 .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1241 .collect();
1242 let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1243 let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1244 let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1245 let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1246 let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1247
1248 let futures = [
1249 self.spawn_blocking_task(move |this| {
1250 persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1251 }),
1252 self.spawn_blocking_task(move |this| {
1253 persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1254 }),
1255 self.spawn_blocking_task(move |this| {
1256 persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1257 }),
1258 self.spawn_blocking_task(move |this| {
1259 persist_chunk_into_table!(
1260 tx_changed_objects::table,
1261 changed_objects,
1262 &this.blocking_cp
1263 )
1264 }),
1265 self.spawn_blocking_task(move |this| {
1266 persist_chunk_into_table!(
1267 tx_wrapped_or_deleted_objects::table,
1268 wrapped_or_deleted_objects,
1269 &this.blocking_cp
1270 )
1271 }),
1272 self.spawn_blocking_task(move |this| {
1273 persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1274 }),
1275 self.spawn_blocking_task(move |this| {
1276 persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1277 }),
1278 self.spawn_blocking_task(move |this| {
1279 persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1280 }),
1281 self.spawn_blocking_task(move |this| {
1282 persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1283 }),
1284 self.spawn_blocking_task(move |this| {
1285 persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1286 }),
1287 ];
1288
1289 futures::future::try_join_all(futures)
1290 .await
1291 .map_err(|e| {
1292 tracing::error!("failed to join tx indices futures in a chunk: {e}");
1293 IndexerError::from(e)
1294 })?
1295 .into_iter()
1296 .collect::<Result<Vec<_>, _>>()
1297 .map_err(|e| {
1298 IndexerError::PostgresWrite(format!(
1299 "Failed to persist all tx indices in a chunk: {e:?}"
1300 ))
1301 })?;
1302 let elapsed = guard.stop_and_record();
1303 info!(elapsed, "Persisted {} chunked tx_indices", len);
1304 Ok(())
1305 }
1306
1307 fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1308 let guard = self
1309 .metrics
1310 .checkpoint_db_commit_latency_epoch
1311 .start_timer();
1312 let epoch_id = epoch.new_epoch.epoch;
1313
1314 transactional_blocking_with_retry!(
1315 &self.blocking_cp,
1316 |conn| {
1317 if let Some(last_epoch) = &epoch.last_epoch {
1318 info!(last_epoch.epoch, "Persisting epoch end data.");
1319 diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1320 .set(last_epoch)
1321 .execute(conn)?;
1322 }
1323
1324 info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1325 insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1326 Ok::<(), IndexerError>(())
1327 },
1328 PG_DB_COMMIT_SLEEP_DURATION
1329 )
1330 .tap_ok(|_| {
1331 let elapsed = guard.stop_and_record();
1332 info!(elapsed, epoch_id, "Persisted epoch beginning info");
1333 })
1334 .tap_err(|e| {
1335 tracing::error!("failed to persist epoch with error: {e}");
1336 })
1337 }
1338
1339 fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1340 let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1341 if let Some(last_epoch_id) = last_epoch_id {
1343 let last_db_epoch: Option<StoredEpochInfo> =
1344 read_only_blocking!(&self.blocking_cp, |conn| {
1345 epochs::table
1346 .filter(epochs::epoch.eq(last_epoch_id))
1347 .first::<StoredEpochInfo>(conn)
1348 .optional()
1349 })
1350 .context("Failed to read last epoch from PostgresDB")?;
1351 if let Some(last_epoch) = last_db_epoch {
1352 let epoch_partition_data =
1353 EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1354 let table_partitions = self.partition_manager.get_table_partitions()?;
1355 for (table, (_, last_partition)) in table_partitions {
1356 if !self
1358 .partition_manager
1359 .get_strategy(&table)
1360 .is_epoch_partitioned()
1361 {
1362 continue;
1363 }
1364 let guard = self.metrics.advance_epoch_latency.start_timer();
1365 self.partition_manager.advance_epoch(
1366 table.clone(),
1367 last_partition,
1368 &epoch_partition_data,
1369 )?;
1370 let elapsed = guard.stop_and_record();
1371 info!(
1372 elapsed,
1373 "Advanced epoch partition {} for table {}",
1374 last_partition,
1375 table.clone()
1376 );
1377 }
1378 } else {
1379 tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1380 }
1381 }
1382
1383 Ok(())
1384 }
1385
1386 fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1387 transactional_blocking_with_retry!(
1388 &self.blocking_cp,
1389 |conn| {
1390 diesel::delete(
1391 checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1392 )
1393 .execute(conn)
1394 .map_err(IndexerError::from)
1395 .context("Failed to prune checkpoints table")?;
1396
1397 Ok::<(), IndexerError>(())
1398 },
1399 PG_DB_COMMIT_SLEEP_DURATION
1400 )
1401 }
1402
1403 fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1404 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1405 transactional_blocking_with_retry!(
1406 &self.blocking_cp,
1407 |conn| {
1408 prune_tx_or_event_indice_table!(
1409 event_emit_module,
1410 conn,
1411 min_tx,
1412 max_tx,
1413 "Failed to prune event_emit_module table"
1414 );
1415 prune_tx_or_event_indice_table!(
1416 event_emit_package,
1417 conn,
1418 min_tx,
1419 max_tx,
1420 "Failed to prune event_emit_package table"
1421 );
1422 prune_tx_or_event_indice_table![
1423 event_senders,
1424 conn,
1425 min_tx,
1426 max_tx,
1427 "Failed to prune event_senders table"
1428 ];
1429 prune_tx_or_event_indice_table![
1430 event_struct_instantiation,
1431 conn,
1432 min_tx,
1433 max_tx,
1434 "Failed to prune event_struct_instantiation table"
1435 ];
1436 prune_tx_or_event_indice_table![
1437 event_struct_module,
1438 conn,
1439 min_tx,
1440 max_tx,
1441 "Failed to prune event_struct_module table"
1442 ];
1443 prune_tx_or_event_indice_table![
1444 event_struct_name,
1445 conn,
1446 min_tx,
1447 max_tx,
1448 "Failed to prune event_struct_name table"
1449 ];
1450 prune_tx_or_event_indice_table![
1451 event_struct_package,
1452 conn,
1453 min_tx,
1454 max_tx,
1455 "Failed to prune event_struct_package table"
1456 ];
1457 Ok::<(), IndexerError>(())
1458 },
1459 PG_DB_COMMIT_SLEEP_DURATION
1460 )
1461 }
1462
1463 fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1464 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1465 transactional_blocking_with_retry!(
1466 &self.blocking_cp,
1467 |conn| {
1468 prune_tx_or_event_indice_table!(
1469 tx_senders,
1470 conn,
1471 min_tx,
1472 max_tx,
1473 "Failed to prune tx_senders table"
1474 );
1475 prune_tx_or_event_indice_table!(
1476 tx_recipients,
1477 conn,
1478 min_tx,
1479 max_tx,
1480 "Failed to prune tx_recipients table"
1481 );
1482 prune_tx_or_event_indice_table![
1483 tx_input_objects,
1484 conn,
1485 min_tx,
1486 max_tx,
1487 "Failed to prune tx_input_objects table"
1488 ];
1489 prune_tx_or_event_indice_table![
1490 tx_changed_objects,
1491 conn,
1492 min_tx,
1493 max_tx,
1494 "Failed to prune tx_changed_objects table"
1495 ];
1496 prune_tx_or_event_indice_table![
1497 tx_wrapped_or_deleted_objects,
1498 conn,
1499 min_tx,
1500 max_tx,
1501 "Failed to prune tx_wrapped_or_deleted_objects table"
1502 ];
1503 prune_tx_or_event_indice_table![
1504 tx_calls_pkg,
1505 conn,
1506 min_tx,
1507 max_tx,
1508 "Failed to prune tx_calls_pkg table"
1509 ];
1510 prune_tx_or_event_indice_table![
1511 tx_calls_mod,
1512 conn,
1513 min_tx,
1514 max_tx,
1515 "Failed to prune tx_calls_mod table"
1516 ];
1517 prune_tx_or_event_indice_table![
1518 tx_calls_fun,
1519 conn,
1520 min_tx,
1521 max_tx,
1522 "Failed to prune tx_calls_fun table"
1523 ];
1524 prune_tx_or_event_indice_table![
1525 tx_digests,
1526 conn,
1527 min_tx,
1528 max_tx,
1529 "Failed to prune tx_digests table"
1530 ];
1531 Ok::<(), IndexerError>(())
1532 },
1533 PG_DB_COMMIT_SLEEP_DURATION
1534 )
1535 }
1536
1537 fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1538 transactional_blocking_with_retry!(
1539 &self.blocking_cp,
1540 |conn| {
1541 diesel::delete(
1542 pruner_cp_watermark::table
1543 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1544 )
1545 .execute(conn)
1546 .map_err(IndexerError::from)
1547 .context("Failed to prune pruner_cp_watermark table")?;
1548 Ok::<(), IndexerError>(())
1549 },
1550 PG_DB_COMMIT_SLEEP_DURATION
1551 )
1552 }
1553
1554 fn get_network_total_transactions_by_end_of_epoch(
1555 &self,
1556 epoch: u64,
1557 ) -> Result<Option<u64>, IndexerError> {
1558 read_only_blocking!(&self.blocking_cp, |conn| {
1559 epochs::table
1560 .filter(epochs::epoch.eq(epoch as i64))
1561 .select(epochs::network_total_transactions)
1562 .get_result::<Option<i64>>(conn)
1563 })
1564 .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1565 .map(|option| option.map(|v| v as u64))
1566 }
1567
1568 fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1569 transactional_blocking_with_retry!(
1570 &self.blocking_cp,
1571 |conn| {
1572 diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1573 .execute(conn)?;
1574 Ok::<(), IndexerError>(())
1575 },
1576 PG_DB_COMMIT_SLEEP_DURATION
1577 )
1578 .tap_ok(|_| {
1579 info!("Successfully refreshed participation_metrics");
1580 })
1581 .tap_err(|e| {
1582 tracing::error!("failed to refresh participation_metrics: {e}");
1583 })
1584 }
1585
1586 fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1587 &self,
1588 watermark: CommitterWatermark,
1589 ) -> Result<(), IndexerError>
1590 where
1591 E::Iterator: Iterator<Item: AsRef<str>>,
1592 {
1593 let guard = self
1594 .metrics
1595 .checkpoint_db_commit_latency_watermarks
1596 .start_timer();
1597
1598 let upper_bound_updates = E::iter()
1599 .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1600 .collect::<Vec<_>>();
1601
1602 transactional_blocking_with_retry!(
1603 &self.blocking_cp,
1604 |conn| {
1605 diesel::insert_into(watermarks::table)
1606 .values(&upper_bound_updates)
1607 .on_conflict(watermarks::entity)
1608 .do_update()
1609 .set((
1610 watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)),
1611 watermarks::checkpoint_hi_inclusive
1612 .eq(excluded(watermarks::checkpoint_hi_inclusive)),
1613 watermarks::tx_hi.eq(excluded(watermarks::tx_hi)),
1614 ))
1615 .execute(conn)
1616 .map_err(IndexerError::from)
1617 .context("Failed to update watermarks upper bound")?;
1618 Ok::<(), IndexerError>(())
1619 },
1620 PG_DB_COMMIT_SLEEP_DURATION
1621 )
1622 .tap_ok(|_| {
1623 let elapsed = guard.stop_and_record();
1624 info!(elapsed, "Persisted watermarks");
1625 })
1626 .tap_err(|e| {
1627 tracing::error!("Failed to persist watermarks with error: {}", e);
1628 })
1629 }
1630
1631 fn map_epochs_to_cp_tx(
1632 &self,
1633 epochs: &[u64],
1634 ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1635 let pool = &self.blocking_cp;
1636 let results: Vec<(i64, i64, i64)> = run_query!(pool, move |conn| {
1637 epochs::table
1638 .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1639 .select((
1640 epochs::epoch,
1641 epochs::first_checkpoint_id,
1642 epochs::first_tx_sequence_number,
1643 ))
1644 .load::<(i64, i64, i64)>(conn)
1645 })
1646 .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1647
1648 Ok(results
1649 .into_iter()
1650 .map(|(epoch, checkpoint, tx)| (epoch as u64, (checkpoint as u64, tx as u64)))
1651 .collect())
1652 }
1653
1654 fn update_watermarks_lower_bound(
1655 &self,
1656 watermarks: Vec<(PrunableTable, u64)>,
1657 ) -> Result<(), IndexerError> {
1658 use diesel::query_dsl::methods::FilterDsl;
1659
1660 let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1661 let epoch_mapping = self.map_epochs_to_cp_tx(&epochs)?;
1662 let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1663 .into_iter()
1664 .map(|(table, epoch)| {
1665 let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1666 IndexerError::PersistentStorageDataCorruption(format!(
1667 "epoch {epoch} not found in epoch mapping",
1668 ))
1669 })?;
1670 Ok(StoredWatermark::from_lower_bound_update(
1671 table.as_ref(),
1672 epoch,
1673 table.select_reader_lo(*checkpoint, *tx),
1674 ))
1675 })
1676 .collect();
1677 let lower_bound_updates = lookups?;
1678 let guard = self
1679 .metrics
1680 .checkpoint_db_commit_latency_watermarks
1681 .start_timer();
1682 transactional_blocking_with_retry!(
1683 &self.blocking_cp,
1684 |conn| {
1685 diesel::insert_into(watermarks::table)
1686 .values(&lower_bound_updates)
1687 .on_conflict(watermarks::entity)
1688 .do_update()
1689 .set((
1690 watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),
1691 watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)),
1692 watermarks::timestamp_ms.eq(sql::<diesel::sql_types::BigInt>(
1693 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1694 )),
1695 ))
1696 .filter(excluded(watermarks::reader_lo).gt(watermarks::reader_lo))
1697 .filter(excluded(watermarks::epoch_lo).gt(watermarks::epoch_lo))
1698 .filter(
1699 diesel::dsl::sql::<diesel::sql_types::BigInt>(
1700 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1701 )
1702 .gt(watermarks::timestamp_ms),
1703 )
1704 .execute(conn)
1705 },
1706 PG_DB_COMMIT_SLEEP_DURATION
1707 )
1708 .tap_ok(|_| {
1709 let elapsed = guard.stop_and_record();
1710 info!(elapsed, "Persisted watermarks");
1711 })
1712 .tap_err(|e| {
1713 tracing::error!("Failed to persist watermarks with error: {}", e);
1714 })?;
1715 Ok(())
1716 }
1717
1718 fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1719 run_query_with_retry!(
1722 &self.blocking_cp,
1723 |conn| {
1724 let stored = watermarks::table
1725 .load::<StoredWatermark>(conn)
1726 .map_err(Into::into)
1727 .context("Failed reading watermarks from PostgresDB")?;
1728 let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1729 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1730 ))
1731 .get_result(conn)
1732 .map_err(Into::into)
1733 .context("Failed reading current timestamp from PostgresDB")?;
1734 Ok::<_, IndexerError>((stored, timestamp))
1735 },
1736 PG_DB_COMMIT_SLEEP_DURATION
1737 )
1738 }
1739
1740 async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1741 where
1742 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1743 R: Send + 'static,
1744 {
1745 let this = self.clone();
1746 let current_span = tracing::Span::current();
1747 tokio::task::spawn_blocking(move || {
1748 mark_in_blocking_pool();
1749 let _guard = current_span.enter();
1750 f(this)
1751 })
1752 .await
1753 .map_err(Into::into)
1754 .and_then(std::convert::identity)
1755 }
1756
1757 fn spawn_blocking_task<F, R>(
1758 &self,
1759 f: F,
1760 ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1761 where
1762 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1763 R: Send + 'static,
1764 {
1765 let this = self.clone();
1766 let current_span = tracing::Span::current();
1767 let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1768 tokio::task::spawn_blocking(move || {
1769 mark_in_blocking_pool();
1770 let _guard = current_span.enter();
1771 let _elapsed = guard.stop_and_record();
1772 f(this)
1773 })
1774 }
1775
1776 fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1777 where
1778 F: FnOnce(Self) -> Fut + Send + 'static,
1779 Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1780 R: Send + 'static,
1781 {
1782 let this = self.clone();
1783 tokio::task::spawn(async move { f(this).await })
1784 }
1785}
1786
1787#[async_trait]
1788impl IndexerStore for PgIndexerStore {
1789 async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1790 self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1791 .await
1792 }
1793
1794 async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1795 self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1796 .await
1797 }
1798
1799 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1800 self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1801 .await
1802 }
1803
1804 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1805 self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1806 .await
1807 }
1808
1809 async fn get_latest_object_snapshot_watermark(
1810 &self,
1811 ) -> Result<Option<CommitterWatermark>, IndexerError> {
1812 self.execute_in_blocking_worker(|this| this.get_latest_object_snapshot_watermark())
1813 .await
1814 }
1815
1816 async fn get_latest_object_snapshot_checkpoint_sequence_number(
1817 &self,
1818 ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
1819 self.execute_in_blocking_worker(|this| {
1820 this.get_latest_object_snapshot_checkpoint_sequence_number()
1821 })
1822 .await
1823 }
1824
1825 fn persist_objects_in_existing_transaction(
1826 &self,
1827 conn: &mut PgConnection,
1828 object_changes: Vec<TransactionObjectChangesToCommit>,
1829 ) -> Result<(), IndexerError> {
1830 if object_changes.is_empty() {
1831 return Ok(());
1832 }
1833
1834 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1835 let object_mutations = indexed_mutations
1836 .into_iter()
1837 .map(StoredObject::from)
1838 .collect::<Vec<_>>();
1839 let object_deletions = indexed_deletions
1840 .into_iter()
1841 .map(StoredDeletedObject::from)
1842 .collect::<Vec<_>>();
1843
1844 self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1845 self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1846
1847 Ok(())
1848 }
1849
1850 async fn persist_objects_snapshot(
1851 &self,
1852 object_changes: Vec<TransactionObjectChangesToCommit>,
1853 ) -> Result<(), IndexerError> {
1854 if object_changes.is_empty() {
1855 return Ok(());
1856 }
1857 let guard = self
1858 .metrics
1859 .checkpoint_db_commit_latency_objects_snapshot
1860 .start_timer();
1861 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1862 let objects_snapshot = indexed_mutations
1863 .into_iter()
1864 .map(StoredObjectSnapshot::from)
1865 .chain(
1866 indexed_deletions
1867 .into_iter()
1868 .map(StoredObjectSnapshot::from),
1869 )
1870 .collect::<Vec<_>>();
1871 let len = objects_snapshot.len();
1872 let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1873 let futures = chunks
1874 .into_iter()
1875 .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1876
1877 futures::future::try_join_all(futures)
1878 .await
1879 .map_err(|e| {
1880 tracing::error!("failed to join backfill_objects_snapshot_chunk futures: {e}");
1881 IndexerError::from(e)
1882 })?
1883 .into_iter()
1884 .collect::<Result<Vec<_>, _>>()
1885 .map_err(|e| {
1886 IndexerError::PostgresWrite(format!(
1887 "Failed to persist all objects snapshot chunks: {e:?}"
1888 ))
1889 })?;
1890 let elapsed = guard.stop_and_record();
1891 info!(elapsed, "Persisted {} objects snapshot", len);
1892 Ok(())
1893 }
1894
1895 async fn persist_object_history(
1896 &self,
1897 object_changes: Vec<TransactionObjectChangesToCommit>,
1898 ) -> Result<(), IndexerError> {
1899 let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1900 .map(|val| val.eq_ignore_ascii_case("true"))
1901 .unwrap_or(false);
1902 if skip_history {
1903 info!("skipping object history");
1904 return Ok(());
1905 }
1906
1907 if object_changes.is_empty() {
1908 return Ok(());
1909 }
1910 let objects = make_objects_history_to_commit(object_changes);
1911 let guard = self
1912 .metrics
1913 .checkpoint_db_commit_latency_objects_history
1914 .start_timer();
1915
1916 let len = objects.len();
1917 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1918 let futures = chunks
1919 .into_iter()
1920 .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1921
1922 futures::future::try_join_all(futures)
1923 .await
1924 .map_err(|e| {
1925 tracing::error!("failed to join persist_objects_history_chunk futures: {e}");
1926 IndexerError::from(e)
1927 })?
1928 .into_iter()
1929 .collect::<Result<Vec<_>, _>>()
1930 .map_err(|e| {
1931 IndexerError::PostgresWrite(format!(
1932 "Failed to persist all objects history chunks: {e:?}"
1933 ))
1934 })?;
1935 let elapsed = guard.stop_and_record();
1936 info!(elapsed, "Persisted {} objects history", len);
1937 Ok(())
1938 }
1939
1940 async fn persist_object_versions(
1941 &self,
1942 object_versions: Vec<StoredObjectVersion>,
1943 ) -> Result<(), IndexerError> {
1944 if object_versions.is_empty() {
1945 return Ok(());
1946 }
1947
1948 let guard = self
1949 .metrics
1950 .checkpoint_db_commit_latency_objects_version
1951 .start_timer();
1952
1953 let object_versions_count = object_versions.len();
1954
1955 let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1956 let futures = chunks
1957 .into_iter()
1958 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1959 .collect::<Vec<_>>();
1960
1961 futures::future::try_join_all(futures)
1962 .await
1963 .map_err(|e| {
1964 tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1965 IndexerError::from(e)
1966 })?
1967 .into_iter()
1968 .collect::<Result<Vec<_>, _>>()
1969 .map_err(|e| {
1970 IndexerError::PostgresWrite(format!(
1971 "Failed to persist all objects version chunks: {e:?}"
1972 ))
1973 })?;
1974 let elapsed = guard.stop_and_record();
1975 info!(elapsed, "Persisted {object_versions_count} object versions");
1976 Ok(())
1977 }
1978
1979 async fn persist_checkpoints(
1980 &self,
1981 checkpoints: Vec<IndexedCheckpoint>,
1982 ) -> Result<(), IndexerError> {
1983 self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1984 .await
1985 }
1986
1987 async fn persist_transactions(
1988 &self,
1989 transactions: Vec<IndexedTransaction>,
1990 ) -> Result<(), IndexerError> {
1991 let guard = self
1992 .metrics
1993 .checkpoint_db_commit_latency_transactions
1994 .start_timer();
1995 let len = transactions.len();
1996
1997 let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1998 let futures = chunks
1999 .into_iter()
2000 .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
2001
2002 futures::future::try_join_all(futures)
2003 .await
2004 .map_err(|e| {
2005 tracing::error!("failed to join persist_transactions_chunk futures: {e}");
2006 IndexerError::from(e)
2007 })?
2008 .into_iter()
2009 .collect::<Result<Vec<_>, _>>()
2010 .map_err(|e| {
2011 IndexerError::PostgresWrite(format!(
2012 "Failed to persist all transactions chunks: {e:?}"
2013 ))
2014 })?;
2015 let elapsed = guard.stop_and_record();
2016 info!(elapsed, "Persisted {} transactions", len);
2017 Ok(())
2018 }
2019
2020 fn persist_optimistic_transaction_in_existing_transaction(
2021 &self,
2022 conn: &mut PgConnection,
2023 transaction: OptimisticTransaction,
2024 ) -> Result<(), IndexerError> {
2025 insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
2026 Ok(())
2027 }
2028
2029 async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
2030 if events.is_empty() {
2031 return Ok(());
2032 }
2033 let len = events.len();
2034 let guard = self
2035 .metrics
2036 .checkpoint_db_commit_latency_events
2037 .start_timer();
2038 let chunks = chunk!(events, self.config.parallel_chunk_size);
2039 let futures = chunks
2040 .into_iter()
2041 .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
2042
2043 futures::future::try_join_all(futures)
2044 .await
2045 .map_err(|e| {
2046 tracing::error!("failed to join persist_events_chunk futures: {e}");
2047 IndexerError::from(e)
2048 })?
2049 .into_iter()
2050 .collect::<Result<Vec<_>, _>>()
2051 .map_err(|e| {
2052 IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
2053 })?;
2054 let elapsed = guard.stop_and_record();
2055 info!(elapsed, "Persisted {} events", len);
2056 Ok(())
2057 }
2058
2059 async fn persist_displays(
2060 &self,
2061 display_updates: BTreeMap<String, StoredDisplay>,
2062 ) -> Result<(), IndexerError> {
2063 if display_updates.is_empty() {
2064 return Ok(());
2065 }
2066
2067 self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
2068 .await?
2069 }
2070
2071 fn persist_displays_in_existing_transaction(
2072 &self,
2073 conn: &mut PgConnection,
2074 display_updates: Vec<&StoredDisplay>,
2075 ) -> Result<(), IndexerError> {
2076 if display_updates.is_empty() {
2077 return Ok(());
2078 }
2079
2080 on_conflict_do_update_with_condition!(
2081 display::table,
2082 display_updates,
2083 display::object_type,
2084 (
2085 display::id.eq(excluded(display::id)),
2086 display::version.eq(excluded(display::version)),
2087 display::bcs.eq(excluded(display::bcs)),
2088 ),
2089 excluded(display::version).gt(display::version),
2090 conn
2091 );
2092
2093 Ok(())
2094 }
2095
2096 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2097 if packages.is_empty() {
2098 return Ok(());
2099 }
2100 self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
2101 .await
2102 }
2103
2104 async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2105 if indices.is_empty() {
2106 return Ok(());
2107 }
2108 let len = indices.len();
2109 let guard = self
2110 .metrics
2111 .checkpoint_db_commit_latency_event_indices
2112 .start_timer();
2113 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2114
2115 let futures = chunks.into_iter().map(|chunk| {
2116 self.spawn_task(move |this: Self| async move {
2117 this.persist_event_indices_chunk(chunk).await
2118 })
2119 });
2120
2121 futures::future::try_join_all(futures)
2122 .await
2123 .map_err(|e| {
2124 tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
2125 IndexerError::from(e)
2126 })?
2127 .into_iter()
2128 .collect::<Result<Vec<_>, _>>()
2129 .map_err(|e| {
2130 IndexerError::PostgresWrite(format!(
2131 "Failed to persist all event_indices chunks: {e:?}"
2132 ))
2133 })?;
2134 let elapsed = guard.stop_and_record();
2135 info!(elapsed, "Persisted {} event_indices chunks", len);
2136 Ok(())
2137 }
2138
2139 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2140 self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2141 .await
2142 }
2143
2144 async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2145 self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2146 .await
2147 }
2148
2149 async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2150 let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2151 (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2152 _ => Err(IndexerError::PostgresRead(format!(
2153 "Failed to get checkpoint range for epoch {epoch}"
2154 ))),
2155 }?;
2156
2157 let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2162 min_cp = std::cmp::max(min_cp, min_prunable_cp);
2163 for cp in min_cp..=max_cp {
2164 info!(
2176 "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2177 cp, epoch, min_prunable_cp
2178 );
2179 self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2180 .await
2181 .unwrap_or_else(|e| {
2182 tracing::error!("failed to prune checkpoint {cp}: {e}");
2183 });
2184
2185 let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2186 self.execute_in_blocking_worker(move |this| {
2187 this.prune_tx_indices_table(min_tx, max_tx)
2188 })
2189 .await
2190 .unwrap_or_else(|e| {
2191 tracing::error!("failed to prune transactions for cp {cp}: {e}");
2192 });
2193 info!(
2194 "Pruned transactions for checkpoint {} from tx {} to tx {}",
2195 cp, min_tx, max_tx
2196 );
2197 self.execute_in_blocking_worker(move |this| {
2198 this.prune_event_indices_table(min_tx, max_tx)
2199 })
2200 .await
2201 .unwrap_or_else(|e| {
2202 tracing::error!("failed to prune events of transactions for cp {cp}: {e}");
2203 });
2204 info!(
2205 "Pruned events of transactions for checkpoint {cp} from tx {min_tx} to tx {max_tx}"
2206 );
2207 self.metrics.last_pruned_transaction.set(max_tx as i64);
2208
2209 self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2210 .await
2211 .unwrap_or_else(|e| {
2212 tracing::error!("failed to prune pruner_cp_watermark table for cp {cp}: {e}");
2213 });
2214 info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2215 self.metrics.last_pruned_checkpoint.set(cp as i64);
2216 }
2217
2218 Ok(())
2219 }
2220
2221 async fn get_network_total_transactions_by_end_of_epoch(
2222 &self,
2223 epoch: u64,
2224 ) -> Result<Option<u64>, IndexerError> {
2225 self.execute_in_blocking_worker(move |this| {
2226 this.get_network_total_transactions_by_end_of_epoch(epoch)
2227 })
2228 .await
2229 }
2230
2231 async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2232 self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2233 .await
2234 }
2235
2236 async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2237 &self,
2238 watermark: CommitterWatermark,
2239 ) -> Result<(), IndexerError>
2240 where
2241 E::Iterator: Iterator<Item: AsRef<str>>,
2242 {
2243 self.execute_in_blocking_worker(move |this| {
2244 this.update_watermarks_upper_bound::<E>(watermark)
2245 })
2246 .await
2247 }
2248
2249 fn as_any(&self) -> &dyn StdAny {
2250 self
2251 }
2252
2253 fn persist_protocol_configs_and_feature_flags(
2256 &self,
2257 chain_id: Vec<u8>,
2258 ) -> Result<(), IndexerError> {
2259 let chain_id = ChainIdentifier::from(
2260 CheckpointDigest::try_from(chain_id).expect("unable to convert chain id"),
2261 );
2262
2263 let mut all_configs = vec![];
2264 let mut all_flags = vec![];
2265
2266 let (start_version, end_version) = self.get_protocol_version_index_range()?;
2267 info!(
2268 "Persisting protocol configs with start_version: {}, end_version: {}",
2269 start_version, end_version
2270 );
2271
2272 for version in start_version..=end_version {
2275 let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2276 (version as u64).into(),
2277 chain_id.chain(),
2278 )
2279 .ok_or(IndexerError::Generic(format!(
2280 "Unable to fetch protocol version {} and chain {:?}",
2281 version,
2282 chain_id.chain()
2283 )))?;
2284 let configs_vec = protocol_configs
2285 .attr_map()
2286 .into_iter()
2287 .map(|(k, v)| StoredProtocolConfig {
2288 protocol_version: version,
2289 config_name: k,
2290 config_value: v.map(|v| v.to_string()),
2291 })
2292 .collect::<Vec<_>>();
2293 all_configs.extend(configs_vec);
2294
2295 let feature_flags = protocol_configs
2296 .feature_map()
2297 .into_iter()
2298 .map(|(k, v)| StoredFeatureFlag {
2299 protocol_version: version,
2300 flag_name: k,
2301 flag_value: v,
2302 })
2303 .collect::<Vec<_>>();
2304 all_flags.extend(feature_flags);
2305 }
2306
2307 transactional_blocking_with_retry!(
2308 &self.blocking_cp,
2309 |conn| {
2310 for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2311 insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2312 }
2313 insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2314 Ok::<(), IndexerError>(())
2315 },
2316 PG_DB_COMMIT_SLEEP_DURATION
2317 )?;
2318 Ok(())
2319 }
2320
2321 async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2322 if indices.is_empty() {
2323 return Ok(());
2324 }
2325 let len = indices.len();
2326 let guard = self
2327 .metrics
2328 .checkpoint_db_commit_latency_tx_indices
2329 .start_timer();
2330 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2331
2332 let futures = chunks.into_iter().map(|chunk| {
2333 self.spawn_task(move |this: Self| async move {
2334 this.persist_tx_indices_chunk_v2(chunk).await
2335 })
2336 });
2337 futures::future::try_join_all(futures)
2338 .await
2339 .map_err(|e| {
2340 tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2341 IndexerError::from(e)
2342 })?
2343 .into_iter()
2344 .collect::<Result<Vec<_>, _>>()
2345 .map_err(|e| {
2346 IndexerError::PostgresWrite(format!(
2347 "Failed to persist all tx_indices chunks: {e:?}"
2348 ))
2349 })?;
2350 let elapsed = guard.stop_and_record();
2351 info!(elapsed, "Persisted {} tx_indices chunks", len);
2352 Ok(())
2353 }
2354
2355 async fn persist_checkpoint_objects(
2356 &self,
2357 objects: Vec<CheckpointObjectChanges>,
2358 ) -> Result<(), IndexerError> {
2359 if objects.is_empty() {
2360 return Ok(());
2361 }
2362 let guard = self
2363 .metrics
2364 .checkpoint_db_commit_latency_objects
2365 .start_timer();
2366 let CheckpointObjectChanges {
2367 changed_objects: mutations,
2368 deleted_objects: deletions,
2369 } = retain_latest_objects_from_checkpoint_batch(objects);
2370 let mutation_len = mutations.len();
2371 let deletion_len = deletions.len();
2372
2373 let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2374 let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2375 let mutation_futures = mutation_chunks
2376 .into_iter()
2377 .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2378 let deletion_futures = deletion_chunks
2379 .into_iter()
2380 .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2381 futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2382 .await
2383 .map_err(|e| {
2384 tracing::error!("failed to join futures for persisting objects: {e}");
2385 IndexerError::from(e)
2386 })?
2387 .into_iter()
2388 .collect::<Result<Vec<_>, _>>()
2389 .map_err(|e| {
2390 IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2391 })?;
2392
2393 let elapsed = guard.stop_and_record();
2394 info!(
2395 elapsed,
2396 "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2397 );
2398 Ok(())
2399 }
2400
2401 async fn update_status_for_checkpoint_transactions(
2402 &self,
2403 tx_order: Vec<CheckpointTxGlobalOrder>,
2404 ) -> Result<(), IndexerError> {
2405 let guard = self
2406 .metrics
2407 .checkpoint_db_commit_latency_tx_insertion_order
2408 .start_timer();
2409 let len = tx_order.len();
2410
2411 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2412 let futures = chunks.into_iter().map(|c| {
2413 self.spawn_blocking_task(move |this| {
2414 this.update_status_for_checkpoint_transactions_chunk(c)
2415 })
2416 });
2417
2418 futures::future::try_join_all(futures)
2419 .await
2420 .map_err(|e| {
2421 tracing::error!(
2422 "failed to join update_status_for_checkpoint_transactions_chunk futures: {e}",
2423 );
2424 IndexerError::from(e)
2425 })?
2426 .into_iter()
2427 .collect::<Result<Vec<_>, _>>()
2428 .map_err(|e| {
2429 IndexerError::PostgresWrite(format!(
2430 "Failed to update all `tx_global_order` chunks: {e:?}",
2431 ))
2432 })?;
2433 let elapsed = guard.stop_and_record();
2434 info!(
2435 elapsed,
2436 "Updated index status for {len} txs insertion orders"
2437 );
2438 Ok(())
2439 }
2440
2441 async fn persist_tx_global_order(
2442 &self,
2443 tx_order: Vec<CheckpointTxGlobalOrder>,
2444 ) -> Result<(), IndexerError> {
2445 let guard = self
2446 .metrics
2447 .checkpoint_db_commit_latency_tx_insertion_order
2448 .start_timer();
2449 let len = tx_order.len();
2450
2451 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2452 let futures = chunks
2453 .into_iter()
2454 .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2455
2456 futures::future::try_join_all(futures)
2457 .await
2458 .map_err(|e| {
2459 tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2460 IndexerError::from(e)
2461 })?
2462 .into_iter()
2463 .collect::<Result<Vec<_>, _>>()
2464 .map_err(|e| {
2465 IndexerError::PostgresWrite(format!(
2466 "Failed to persist all txs insertion order chunks: {e:?}",
2467 ))
2468 })?;
2469 let elapsed = guard.stop_and_record();
2470 info!(elapsed, "Persisted {len} txs insertion orders");
2471 Ok(())
2472 }
2473
2474 async fn update_watermarks_lower_bound(
2475 &self,
2476 watermarks: Vec<(PrunableTable, u64)>,
2477 ) -> Result<(), IndexerError> {
2478 self.execute_in_blocking_worker(move |this| this.update_watermarks_lower_bound(watermarks))
2479 .await
2480 }
2481
2482 async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2483 self.execute_in_blocking_worker(move |this| this.get_watermarks())
2484 .await
2485 }
2486}
2487
2488fn make_objects_history_to_commit(
2489 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2490) -> Vec<StoredHistoryObject> {
2491 let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2492 .clone()
2493 .into_iter()
2494 .flat_map(|changes| changes.deleted_objects)
2495 .map(|o| o.into())
2496 .collect();
2497 let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2498 .into_iter()
2499 .flat_map(|changes| changes.changed_objects)
2500 .map(|o| o.into())
2501 .collect();
2502 deleted_objects.into_iter().chain(mutated_objects).collect()
2503}
2504
2505fn retain_latest_indexed_objects(
2511 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2512) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2513 use std::collections::HashMap;
2514
2515 let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2516 let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2517
2518 for change in tx_object_changes {
2519 for mutation in change.changed_objects {
2523 let id = mutation.object.id();
2524 let version = mutation.object.version();
2525
2526 if let Some(existing) = deletions.remove(&id) {
2527 assert!(
2528 existing.object_version < version.value(),
2529 "mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2530 existing.object_version
2531 );
2532 }
2533
2534 if let Some(existing) = mutations.insert(id, mutation) {
2535 assert!(
2536 existing.object.version() < version,
2537 "mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2538 existing.object.version()
2539 );
2540 }
2541 }
2542 for deletion in change.deleted_objects {
2544 let id = deletion.object_id;
2545 let version = deletion.object_version;
2546
2547 if let Some(existing) = mutations.remove(&id) {
2548 assert!(
2549 existing.object.version().value() < version,
2550 "deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2551 existing.object.version(),
2552 );
2553 }
2554
2555 if let Some(existing) = deletions.insert(id, deletion) {
2556 assert!(
2557 existing.object_version < version,
2558 "deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2559 existing.object_version
2560 );
2561 }
2562 }
2563 }
2564
2565 (
2566 mutations.into_values().collect(),
2567 deletions.into_values().collect(),
2568 )
2569}