1use core::result::Result::Ok;
6use std::{
7 any::Any as StdAny,
8 collections::{BTreeMap, HashMap},
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use diesel::{
14 ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
15 dsl::{max, min, sql},
16 sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
17 upsert::excluded,
18};
19use downcast::Any;
20use iota_protocol_config::ProtocolConfig;
21use iota_types::{
22 base_types::ObjectID,
23 digests::{ChainIdentifier, CheckpointDigest},
24 messages_checkpoint::CheckpointSequenceNumber,
25};
26use itertools::Itertools;
27use strum::IntoEnumIterator;
28use tap::TapFallible;
29use tracing::info;
30
31use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
32use crate::{
33 blocking_call_is_ok_or_panic,
34 db::ConnectionPool,
35 errors::{Context, IndexerError},
36 ingestion::{
37 common::{
38 persist::{CommitterWatermark, ObjectsSnapshotHandlerTables},
39 prepare::{
40 CheckpointObjectChanges, LiveObject, RemovedObject,
41 retain_latest_objects_from_checkpoint_batch,
42 },
43 },
44 primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
45 },
46 insert_or_ignore_into,
47 metrics::IndexerMetrics,
48 models::{
49 checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
50 display::StoredDisplay,
51 epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
52 events::StoredEvent,
53 obj_indices::StoredObjectVersion,
54 objects::{
55 StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
56 StoredObjects,
57 },
58 packages::StoredPackage,
59 transactions::{OptimisticTransaction, StoredTransaction, TxGlobalOrder},
60 tx_indices::TxIndexSplit,
61 watermarks::StoredWatermark,
62 },
63 on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
64 persist_chunk_into_table_in_existing_connection,
65 pruning::pruner::PrunableTable,
66 read_only_blocking, run_query, run_query_with_retry,
67 schema::{
68 chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
69 event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
70 event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
71 objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
72 transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
73 tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
74 tx_wrapped_or_deleted_objects, watermarks,
75 },
76 store::{IndexerStore, diesel_macro::mark_in_blocking_pool},
77 transactional_blocking_with_retry,
78 types::{
79 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
80 IndexedPackage, IndexedTransaction, TxIndex,
81 },
82};
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct TxGlobalOrderCursor {
88 pub global_sequence_number: i64,
89 pub optimistic_sequence_number: i64,
90}
91
92#[macro_export]
93macro_rules! chunk {
94 ($data: expr, $size: expr) => {{
95 $data
96 .into_iter()
97 .chunks($size)
98 .into_iter()
99 .map(|c| c.collect())
100 .collect::<Vec<Vec<_>>>()
101 }};
102}
103
104macro_rules! prune_tx_or_event_indice_table {
105 ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
106 diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
107 .execute($conn)
108 .map_err(IndexerError::from)
109 .context($context_msg)?;
110 };
111}
112
113const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
118const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
120const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
124const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
125
126#[derive(Clone)]
127pub struct PgIndexerStoreConfig {
128 pub parallel_chunk_size: usize,
129 pub parallel_objects_chunk_size: usize,
130}
131
132pub struct PgIndexerStore {
133 blocking_cp: ConnectionPool,
134 metrics: IndexerMetrics,
135 partition_manager: PgPartitionManager,
136 config: PgIndexerStoreConfig,
137}
138
139impl Clone for PgIndexerStore {
140 fn clone(&self) -> PgIndexerStore {
141 Self {
142 blocking_cp: self.blocking_cp.clone(),
143 metrics: self.metrics.clone(),
144 partition_manager: self.partition_manager.clone(),
145 config: self.config.clone(),
146 }
147 }
148}
149
150impl PgIndexerStore {
151 pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
152 let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
153 .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
154 .parse::<usize>()
155 .unwrap();
156 let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
157 .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
158 .parse::<usize>()
159 .unwrap();
160 let partition_manager = PgPartitionManager::new(blocking_cp.clone())
161 .expect("failed to initialize partition manager");
162 let config = PgIndexerStoreConfig {
163 parallel_chunk_size,
164 parallel_objects_chunk_size,
165 };
166
167 Self {
168 blocking_cp,
169 metrics,
170 partition_manager,
171 config,
172 }
173 }
174
175 pub fn get_metrics(&self) -> IndexerMetrics {
176 self.metrics.clone()
177 }
178
179 pub fn blocking_cp(&self) -> ConnectionPool {
180 self.blocking_cp.clone()
181 }
182
183 pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
185 let start = read_only_blocking!(&self.blocking_cp, |conn| {
188 protocol_configs::dsl::protocol_configs
189 .select(max(protocol_configs::protocol_version))
190 .first::<Option<i64>>(conn)
191 })
192 .context("Failed reading latest protocol version from PostgresDB")?
193 .map_or(1, |v| v + 1);
194
195 let end = read_only_blocking!(&self.blocking_cp, |conn| {
197 epochs::dsl::epochs
198 .select(max(epochs::protocol_version))
199 .first::<Option<i64>>(conn)
200 })
201 .context("Failed reading latest epoch protocol version from PostgresDB")?
202 .unwrap_or(1);
203 Ok((start, end))
204 }
205
206 pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
207 read_only_blocking!(&self.blocking_cp, |conn| {
208 chain_identifier::dsl::chain_identifier
209 .select(chain_identifier::checkpoint_digest)
210 .first::<Vec<u8>>(conn)
211 .optional()
212 })
213 .context("Failed reading chain id from PostgresDB")
214 }
215
216 fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
217 read_only_blocking!(&self.blocking_cp, |conn| {
218 checkpoints::dsl::checkpoints
219 .select(max(checkpoints::sequence_number))
220 .first::<Option<i64>>(conn)
221 .map(|v| v.map(|v| v as u64))
222 })
223 .context("Failed reading latest checkpoint sequence number from PostgresDB")
224 }
225
226 fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
227 read_only_blocking!(&self.blocking_cp, |conn| {
228 checkpoints::dsl::checkpoints
229 .select((
230 min(checkpoints::sequence_number),
231 max(checkpoints::sequence_number),
232 ))
233 .first::<(Option<i64>, Option<i64>)>(conn)
234 .map(|(min, max)| {
235 (
236 min.unwrap_or_default() as u64,
237 max.unwrap_or_default() as u64,
238 )
239 })
240 })
241 .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
242 }
243
244 fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
245 read_only_blocking!(&self.blocking_cp, |conn| {
246 epochs::dsl::epochs
247 .select((min(epochs::epoch), max(epochs::epoch)))
248 .first::<(Option<i64>, Option<i64>)>(conn)
249 .map(|(min, max)| {
250 (
251 min.unwrap_or_default() as u64,
252 max.unwrap_or_default() as u64,
253 )
254 })
255 })
256 .context("Failed reading min and max epoch numbers from PostgresDB")
257 }
258
259 fn get_latest_object_snapshot_watermark(
260 &self,
261 ) -> Result<Option<CommitterWatermark>, IndexerError> {
262 read_only_blocking!(&self.blocking_cp, |conn| {
263 watermarks::table
264 .select((
265 watermarks::current_epoch,
266 watermarks::max_committed_cp,
267 watermarks::max_committed_tx,
268 ))
269 .filter(
270 watermarks::entity
271 .eq(ObjectsSnapshotHandlerTables::ObjectsSnapshot.to_string()),
272 )
273 .first::<(i64, i64, i64)>(conn)
274 .optional()
276 .map(|v| {
277 v.map(|(epoch, cp, tx)| CommitterWatermark {
278 current_epoch: epoch as u64,
279 max_committed_cp: cp as u64,
280 max_committed_tx: tx as u64,
281 })
282 })
283 })
284 .context("Failed reading latest object snapshot watermark from PostgresDB")
285 }
286
287 fn get_latest_object_snapshot_checkpoint_sequence_number(
288 &self,
289 ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
290 read_only_blocking!(&self.blocking_cp, |conn| {
291 objects_snapshot::table
292 .select(max(objects_snapshot::checkpoint_sequence_number))
293 .first::<Option<i64>>(conn)
294 .map(|v| v.map(|v| v as CheckpointSequenceNumber))
295 })
296 .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
297 }
298
299 fn persist_display_updates(
300 &self,
301 display_updates: BTreeMap<String, StoredDisplay>,
302 ) -> Result<(), IndexerError> {
303 transactional_blocking_with_retry!(
304 &self.blocking_cp,
305 {
306 let value = display_updates.values().collect::<Vec<_>>();
307 |conn| self.persist_displays_in_existing_transaction(conn, value)
308 },
309 PG_DB_COMMIT_SLEEP_DURATION
310 )?;
311
312 Ok(())
313 }
314
315 fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
316 let guard = self
317 .metrics
318 .checkpoint_db_commit_latency_objects_chunks
319 .start_timer();
320 let len = objects.len();
321 let raw_query = r#"
322 INSERT INTO objects (
323 object_id,
324 object_version,
325 object_digest,
326 owner_type,
327 owner_id,
328 object_type,
329 object_type_package,
330 object_type_module,
331 object_type_name,
332 serialized_object,
333 coin_type,
334 coin_balance,
335 df_kind,
336 finalized_in_cp
337 )
338 SELECT
339 u.object_id,
340 u.object_version,
341 u.object_digest,
342 u.owner_type,
343 u.owner_id,
344 u.object_type,
345 u.object_type_package,
346 u.object_type_module,
347 u.object_type_name,
348 u.serialized_object,
349 u.coin_type,
350 u.coin_balance,
351 u.df_kind,
352 u.finalized_in_cp
353 FROM UNNEST(
354 $1::BYTEA[],
355 $2::BIGINT[],
356 $3::BYTEA[],
357 $4::SMALLINT[],
358 $5::BYTEA[],
359 $6::TEXT[],
360 $7::BYTEA[],
361 $8::TEXT[],
362 $9::TEXT[],
363 $10::BYTEA[],
364 $11::TEXT[],
365 $12::BIGINT[],
366 $13::SMALLINT[],
367 $14::BYTEA[],
368 $15::BIGINT[]
369 ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest, finalized_in_cp)
370 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
371 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = -1
372 ON CONFLICT (object_id) DO UPDATE
373 SET
374 object_version = EXCLUDED.object_version,
375 object_digest = EXCLUDED.object_digest,
376 owner_type = EXCLUDED.owner_type,
377 owner_id = EXCLUDED.owner_id,
378 object_type = EXCLUDED.object_type,
379 object_type_package = EXCLUDED.object_type_package,
380 object_type_module = EXCLUDED.object_type_module,
381 object_type_name = EXCLUDED.object_type_name,
382 serialized_object = EXCLUDED.serialized_object,
383 coin_type = EXCLUDED.coin_type,
384 coin_balance = EXCLUDED.coin_balance,
385 df_kind = EXCLUDED.df_kind,
386 finalized_in_cp = EXCLUDED.finalized_in_cp
387 WHERE EXCLUDED.object_version > objects.object_version
388 "#;
389 let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
390 .into_iter()
391 .map(LiveObject::split)
392 .map(|(indexed_object, tx_digest)| {
393 (
394 StoredObject::from(indexed_object),
395 tx_digest.into_inner().to_vec(),
396 )
397 })
398 .unzip();
399 let query = diesel::sql_query(raw_query)
400 .bind::<Array<Bytea>, _>(objects.object_ids)
401 .bind::<Array<BigInt>, _>(objects.object_versions)
402 .bind::<Array<Bytea>, _>(objects.object_digests)
403 .bind::<Array<SmallInt>, _>(objects.owner_types)
404 .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
405 .bind::<Array<Nullable<Text>>, _>(objects.object_types)
406 .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
407 .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
408 .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
409 .bind::<Array<Bytea>, _>(objects.serialized_objects)
410 .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
411 .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
412 .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
413 .bind::<Array<Bytea>, _>(tx_digests)
414 .bind::<Array<Nullable<BigInt>>, _>(objects.finalized_in_cps);
415 transactional_blocking_with_retry!(
416 &self.blocking_cp,
417 |conn| {
418 query.clone().execute(conn)?;
419 Ok::<(), IndexerError>(())
420 },
421 PG_DB_COMMIT_SLEEP_DURATION
422 )
423 .tap_ok(|_| {
424 let elapsed = guard.stop_and_record();
425 info!(elapsed, "Persisted {len} chunked objects");
426 })
427 .tap_err(|e| {
428 tracing::error!("failed to persist object mutations with error: {e}");
429 })
430 }
431
432 fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
433 let guard = self
434 .metrics
435 .checkpoint_db_commit_latency_objects_chunks
436 .start_timer();
437 let len = objects.len();
438 let raw_query = r#"
439 DELETE FROM objects
440 USING (
441 SELECT u.object_id, u.object_version
442 FROM UNNEST(
443 $1::BYTEA[],
444 $2::BIGINT[],
445 $3::BYTEA[]
446 ) AS u(object_id, object_version, tx_digest)
447 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
448 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = -1
449 ) AS to_delete
450 WHERE objects.object_id = to_delete.object_id
451 AND objects.object_version < to_delete.object_version
452 "#;
453 let (object_ids, versions, tx_digests): (Vec<_>, Vec<_>, Vec<_>) = objects
454 .into_iter()
455 .map(|removed_object| {
456 (
457 removed_object.object_id().as_bytes().to_vec(),
458 removed_object.version() as i64,
459 removed_object.transaction_digest.into_inner().to_vec(),
460 )
461 })
462 .multiunzip();
463 let query = diesel::sql_query(raw_query)
464 .bind::<Array<Bytea>, _>(object_ids)
465 .bind::<Array<BigInt>, _>(versions)
466 .bind::<Array<Bytea>, _>(tx_digests);
467 transactional_blocking_with_retry!(
468 &self.blocking_cp,
469 |conn| {
470 query.clone().execute(conn)?;
471 Ok::<(), IndexerError>(())
472 },
473 PG_DB_COMMIT_SLEEP_DURATION
474 )
475 .tap_ok(|_| {
476 let elapsed = guard.stop_and_record();
477 info!(elapsed, "Deleted {len} chunked objects");
478 })
479 .tap_err(|e| {
480 tracing::error!("failed to persist object deletions with error: {e}");
481 })
482 }
483
484 fn persist_object_mutation_chunk_in_existing_transaction(
485 &self,
486 conn: &mut PgConnection,
487 mutated_object_mutation_chunk: Vec<StoredObject>,
488 ) -> Result<(), IndexerError> {
489 on_conflict_do_update_with_condition!(
490 objects::table,
491 mutated_object_mutation_chunk,
492 objects::object_id,
493 (
494 objects::object_id.eq(excluded(objects::object_id)),
495 objects::object_version.eq(excluded(objects::object_version)),
496 objects::object_digest.eq(excluded(objects::object_digest)),
497 objects::owner_type.eq(excluded(objects::owner_type)),
498 objects::owner_id.eq(excluded(objects::owner_id)),
499 objects::object_type.eq(excluded(objects::object_type)),
500 objects::serialized_object.eq(excluded(objects::serialized_object)),
501 objects::coin_type.eq(excluded(objects::coin_type)),
502 objects::coin_balance.eq(excluded(objects::coin_balance)),
503 objects::df_kind.eq(excluded(objects::df_kind)),
504 objects::finalized_in_cp.eq(excluded(objects::finalized_in_cp)),
505 ),
506 excluded(objects::object_version).gt(objects::object_version),
507 conn
508 );
509 Ok::<(), IndexerError>(())
510 }
511
512 fn persist_object_deletion_chunk_in_existing_transaction(
513 &self,
514 conn: &mut PgConnection,
515 deleted_objects_chunk: Vec<StoredDeletedObject>,
516 ) -> Result<(), IndexerError> {
517 let (object_ids, object_versions): (Vec<_>, Vec<_>) = deleted_objects_chunk
518 .iter()
519 .map(|o| (o.object_id.clone(), o.object_version))
520 .unzip();
521 let raw_query = r#"
522 DELETE FROM objects
523 USING UNNEST($1::BYTEA[], $2::BIGINT[]) AS to_delete(object_id, object_version)
524 WHERE objects.object_id = to_delete.object_id
525 AND objects.object_version < to_delete.object_version
526 "#;
527 diesel::sql_query(raw_query)
528 .bind::<Array<Bytea>, _>(object_ids)
529 .bind::<Array<BigInt>, _>(object_versions)
530 .execute(conn)
531 .map_err(IndexerError::from)
532 .context("Failed to write object deletion to PostgresDB")?;
533 Ok::<(), IndexerError>(())
534 }
535
536 fn backfill_objects_snapshot_chunk(
537 &self,
538 objects_snapshot: Vec<StoredObjectSnapshot>,
539 ) -> Result<(), IndexerError> {
540 let guard = self
541 .metrics
542 .checkpoint_db_commit_latency_objects_snapshot_chunks
543 .start_timer();
544 transactional_blocking_with_retry!(
545 &self.blocking_cp,
546 |conn| {
547 for objects_snapshot_chunk in
548 objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
549 {
550 on_conflict_do_update!(
551 objects_snapshot::table,
552 objects_snapshot_chunk,
553 objects_snapshot::object_id,
554 (
555 objects_snapshot::object_version
556 .eq(excluded(objects_snapshot::object_version)),
557 objects_snapshot::object_status
558 .eq(excluded(objects_snapshot::object_status)),
559 objects_snapshot::object_digest
560 .eq(excluded(objects_snapshot::object_digest)),
561 objects_snapshot::checkpoint_sequence_number
562 .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
563 objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
564 objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
565 objects_snapshot::object_type_package
566 .eq(excluded(objects_snapshot::object_type_package)),
567 objects_snapshot::object_type_module
568 .eq(excluded(objects_snapshot::object_type_module)),
569 objects_snapshot::object_type_name
570 .eq(excluded(objects_snapshot::object_type_name)),
571 objects_snapshot::object_type
572 .eq(excluded(objects_snapshot::object_type)),
573 objects_snapshot::serialized_object
574 .eq(excluded(objects_snapshot::serialized_object)),
575 objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
576 objects_snapshot::coin_balance
577 .eq(excluded(objects_snapshot::coin_balance)),
578 objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
579 ),
580 conn
581 );
582 }
583 Ok::<(), IndexerError>(())
584 },
585 PG_DB_COMMIT_SLEEP_DURATION
586 )
587 .tap_ok(|_| {
588 let elapsed = guard.stop_and_record();
589 info!(
590 elapsed,
591 "Persisted {} chunked objects snapshot",
592 objects_snapshot.len(),
593 );
594 })
595 .tap_err(|e| {
596 tracing::error!("failed to persist object snapshot with error: {e}");
597 })
598 }
599
600 fn persist_objects_history_chunk(
601 &self,
602 stored_objects_history: Vec<StoredHistoryObject>,
603 ) -> Result<(), IndexerError> {
604 let guard = self
605 .metrics
606 .checkpoint_db_commit_latency_objects_history_chunks
607 .start_timer();
608 transactional_blocking_with_retry!(
609 &self.blocking_cp,
610 |conn| {
611 for stored_objects_history_chunk in
612 stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
613 {
614 insert_or_ignore_into!(
615 objects_history::table,
616 stored_objects_history_chunk,
617 conn
618 );
619 }
620 Ok::<(), IndexerError>(())
621 },
622 PG_DB_COMMIT_SLEEP_DURATION
623 )
624 .tap_ok(|_| {
625 let elapsed = guard.stop_and_record();
626 info!(
627 elapsed,
628 "Persisted {} chunked objects history",
629 stored_objects_history.len(),
630 );
631 })
632 .tap_err(|e| {
633 tracing::error!("failed to persist object history with error: {e}");
634 })
635 }
636
637 fn persist_object_version_chunk(
638 &self,
639 object_versions: Vec<StoredObjectVersion>,
640 ) -> Result<(), IndexerError> {
641 let guard = self
642 .metrics
643 .checkpoint_db_commit_latency_objects_version_chunks
644 .start_timer();
645
646 transactional_blocking_with_retry!(
647 &self.blocking_cp,
648 |conn| {
649 for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
650 {
651 insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
652 }
653 Ok::<(), IndexerError>(())
654 },
655 PG_DB_COMMIT_SLEEP_DURATION
656 )
657 .tap_ok(|_| {
658 let elapsed = guard.stop_and_record();
659 info!(
660 elapsed,
661 "Persisted {} chunked object versions",
662 object_versions.len(),
663 );
664 })
665 .tap_err(|e| {
666 tracing::error!("failed to persist object versions with error: {e}");
667 })
668 }
669
670 fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
671 let Some(first_checkpoint) = checkpoints.first() else {
672 return Ok(());
673 };
674
675 if first_checkpoint.sequence_number == 0 {
678 let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
679 self.persist_protocol_configs_and_feature_flags(checkpoint_digest)?;
680 transactional_blocking_with_retry!(
681 &self.blocking_cp,
682 |conn| {
683 let checkpoint_digest =
684 first_checkpoint.checkpoint_digest.into_inner().to_vec();
685 insert_or_ignore_into!(
686 chain_identifier::table,
687 StoredChainIdentifier { checkpoint_digest },
688 conn
689 );
690 Ok::<(), IndexerError>(())
691 },
692 PG_DB_COMMIT_SLEEP_DURATION
693 )?;
694 }
695 let guard = self
696 .metrics
697 .checkpoint_db_commit_latency_checkpoints
698 .start_timer();
699
700 let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
701 transactional_blocking_with_retry!(
702 &self.blocking_cp,
703 |conn| {
704 for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
705 insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
706 }
707 Ok::<(), IndexerError>(())
708 },
709 PG_DB_COMMIT_SLEEP_DURATION
710 )
711 .tap_ok(|_| {
712 info!(
713 "Persisted {} pruner_cp_watermark rows.",
714 stored_cp_txs.len(),
715 );
716 })
717 .tap_err(|e| {
718 tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
719 })?;
720
721 let stored_checkpoints = checkpoints
722 .iter()
723 .map(StoredCheckpoint::from)
724 .collect::<Vec<_>>();
725 transactional_blocking_with_retry!(
726 &self.blocking_cp,
727 |conn| {
728 for stored_checkpoint_chunk in
729 stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
730 {
731 insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
732 let time_now_ms = chrono::Utc::now().timestamp_millis();
733 for stored_checkpoint in stored_checkpoint_chunk {
734 self.metrics
735 .db_commit_lag_ms
736 .set(time_now_ms - stored_checkpoint.timestamp_ms);
737 self.metrics.max_committed_checkpoint_sequence_number.set(
738 stored_checkpoint.sequence_number,
739 );
740 self.metrics.committed_checkpoint_timestamp_ms.set(
741 stored_checkpoint.timestamp_ms,
742 );
743 }
744 for stored_checkpoint in stored_checkpoint_chunk {
745 info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
746 }
747 }
748 Ok::<(), IndexerError>(())
749 },
750 PG_DB_COMMIT_SLEEP_DURATION
751 )
752 .tap_ok(|_| {
753 let elapsed = guard.stop_and_record();
754 info!(
755 elapsed,
756 "Persisted {} checkpoints",
757 stored_checkpoints.len()
758 );
759 })
760 .tap_err(|e| {
761 tracing::error!("failed to persist checkpoints with error: {e}");
762 })
763 }
764
765 fn persist_transactions_chunk(
766 &self,
767 transactions: Vec<IndexedTransaction>,
768 ) -> Result<(), IndexerError> {
769 let guard = self
770 .metrics
771 .checkpoint_db_commit_latency_transactions_chunks
772 .start_timer();
773 let transformation_guard = self
774 .metrics
775 .checkpoint_db_commit_latency_transactions_chunks_transformation
776 .start_timer();
777 let transactions = transactions
778 .iter()
779 .map(StoredTransaction::from)
780 .collect::<Vec<_>>();
781 drop(transformation_guard);
782
783 transactional_blocking_with_retry!(
784 &self.blocking_cp,
785 |conn| {
786 for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
787 insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
788 }
789 Ok::<(), IndexerError>(())
790 },
791 PG_DB_COMMIT_SLEEP_DURATION
792 )
793 .tap_ok(|_| {
794 let elapsed = guard.stop_and_record();
795 info!(
796 elapsed,
797 "Persisted {} chunked transactions",
798 transactions.len()
799 );
800 })
801 .tap_err(|e| {
802 tracing::error!("failed to persist transactions with error: {e}");
803 })
804 }
805
806 fn persist_tx_global_order_chunk(
807 &self,
808 tx_order: Vec<TxGlobalOrder>,
809 ) -> Result<(), IndexerError> {
810 let guard = self
811 .metrics
812 .checkpoint_db_commit_latency_tx_insertion_order_chunks
813 .start_timer();
814
815 transactional_blocking_with_retry!(
816 &self.blocking_cp,
817 |conn| {
818 for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
819 on_conflict_do_update_with_condition!(
823 tx_global_order::table,
824 tx_order_chunk,
825 tx_global_order::tx_digest,
826 tx_global_order::chk_tx_sequence_number
827 .eq(excluded(tx_global_order::chk_tx_sequence_number)),
828 tx_global_order::chk_tx_sequence_number.is_null(),
829 conn
830 );
831 }
832 Ok::<(), IndexerError>(())
833 },
834 PG_DB_COMMIT_SLEEP_DURATION
835 )
836 .tap_ok(|_| {
837 let elapsed = guard.stop_and_record();
838 info!(
839 elapsed,
840 "Persisted {} chunked txs insertion order",
841 tx_order.len()
842 );
843 })
844 .tap_err(|e| {
845 tracing::error!("failed to persist txs insertion order with error: {e}");
846 })
847 }
848
849 fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
850 let guard = self
851 .metrics
852 .checkpoint_db_commit_latency_events_chunks
853 .start_timer();
854 let len = events.len();
855 let events = events
856 .into_iter()
857 .map(StoredEvent::from)
858 .collect::<Vec<_>>();
859
860 transactional_blocking_with_retry!(
861 &self.blocking_cp,
862 |conn| {
863 for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
864 insert_or_ignore_into!(events::table, event_chunk, conn);
865 }
866 Ok::<(), IndexerError>(())
867 },
868 PG_DB_COMMIT_SLEEP_DURATION
869 )
870 .tap_ok(|_| {
871 let elapsed = guard.stop_and_record();
872 info!(elapsed, "Persisted {} chunked events", len);
873 })
874 .tap_err(|e| {
875 tracing::error!("failed to persist events with error: {e}");
876 })
877 }
878
879 fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
880 if packages.is_empty() {
881 return Ok(());
882 }
883 let guard = self
884 .metrics
885 .checkpoint_db_commit_latency_packages
886 .start_timer();
887 let packages = packages
888 .into_iter()
889 .map(StoredPackage::from)
890 .collect::<Vec<_>>();
891 transactional_blocking_with_retry!(
892 &self.blocking_cp,
893 |conn| {
894 for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
895 on_conflict_do_update!(
896 packages::table,
897 packages_chunk,
898 packages::package_id,
899 (
900 packages::package_id.eq(excluded(packages::package_id)),
901 packages::move_package.eq(excluded(packages::move_package)),
902 ),
903 conn
904 );
905 }
906 Ok::<(), IndexerError>(())
907 },
908 PG_DB_COMMIT_SLEEP_DURATION
909 )
910 .tap_ok(|_| {
911 let elapsed = guard.stop_and_record();
912 info!(elapsed, "Persisted {} packages", packages.len());
913 })
914 .tap_err(|e| {
915 tracing::error!("failed to persist packages with error: {e}");
916 })
917 }
918
919 async fn persist_event_indices_chunk(
920 &self,
921 indices: Vec<EventIndex>,
922 ) -> Result<(), IndexerError> {
923 let guard = self
924 .metrics
925 .checkpoint_db_commit_latency_event_indices_chunks
926 .start_timer();
927 let len = indices.len();
928 let (
929 event_emit_packages,
930 event_emit_modules,
931 event_senders,
932 event_struct_packages,
933 event_struct_modules,
934 event_struct_names,
935 event_struct_instantiations,
936 ) = indices.into_iter().map(|i| i.split()).fold(
937 (
938 Vec::new(),
939 Vec::new(),
940 Vec::new(),
941 Vec::new(),
942 Vec::new(),
943 Vec::new(),
944 Vec::new(),
945 ),
946 |(
947 mut event_emit_packages,
948 mut event_emit_modules,
949 mut event_senders,
950 mut event_struct_packages,
951 mut event_struct_modules,
952 mut event_struct_names,
953 mut event_struct_instantiations,
954 ),
955 index| {
956 event_emit_packages.push(index.0);
957 event_emit_modules.push(index.1);
958 event_senders.push(index.2);
959 event_struct_packages.push(index.3);
960 event_struct_modules.push(index.4);
961 event_struct_names.push(index.5);
962 event_struct_instantiations.push(index.6);
963 (
964 event_emit_packages,
965 event_emit_modules,
966 event_senders,
967 event_struct_packages,
968 event_struct_modules,
969 event_struct_names,
970 event_struct_instantiations,
971 )
972 },
973 );
974
975 let mut futures = vec![];
977 futures.push(self.spawn_blocking_task(move |this| {
978 persist_chunk_into_table!(
979 event_emit_package::table,
980 event_emit_packages,
981 &this.blocking_cp
982 )
983 }));
984
985 futures.push(self.spawn_blocking_task(move |this| {
986 persist_chunk_into_table!(
987 event_emit_module::table,
988 event_emit_modules,
989 &this.blocking_cp
990 )
991 }));
992
993 futures.push(self.spawn_blocking_task(move |this| {
994 persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
995 }));
996
997 futures.push(self.spawn_blocking_task(move |this| {
998 persist_chunk_into_table!(
999 event_struct_package::table,
1000 event_struct_packages,
1001 &this.blocking_cp
1002 )
1003 }));
1004
1005 futures.push(self.spawn_blocking_task(move |this| {
1006 persist_chunk_into_table!(
1007 event_struct_module::table,
1008 event_struct_modules,
1009 &this.blocking_cp
1010 )
1011 }));
1012
1013 futures.push(self.spawn_blocking_task(move |this| {
1014 persist_chunk_into_table!(
1015 event_struct_name::table,
1016 event_struct_names,
1017 &this.blocking_cp
1018 )
1019 }));
1020
1021 futures.push(self.spawn_blocking_task(move |this| {
1022 persist_chunk_into_table!(
1023 event_struct_instantiation::table,
1024 event_struct_instantiations,
1025 &this.blocking_cp
1026 )
1027 }));
1028
1029 futures::future::try_join_all(futures)
1030 .await
1031 .map_err(|e| {
1032 tracing::error!("failed to join event indices futures in a chunk: {e}");
1033 IndexerError::from(e)
1034 })?
1035 .into_iter()
1036 .collect::<Result<Vec<_>, _>>()
1037 .map_err(|e| {
1038 IndexerError::PostgresWrite(format!(
1039 "Failed to persist all event indices in a chunk: {e:?}"
1040 ))
1041 })?;
1042 let elapsed = guard.stop_and_record();
1043 info!(elapsed, "Persisted {} chunked event indices", len);
1044 Ok(())
1045 }
1046
1047 async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1048 let guard = self
1049 .metrics
1050 .checkpoint_db_commit_latency_tx_indices_chunks
1051 .start_timer();
1052 let len = indices.len();
1053
1054 let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
1055
1056 let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1057 let recipients: Vec<_> = splits
1058 .iter()
1059 .flat_map(|ix| ix.tx_recipients.clone())
1060 .collect();
1061 let input_objects: Vec<_> = splits
1062 .iter()
1063 .flat_map(|ix| ix.tx_input_objects.clone())
1064 .collect();
1065 let changed_objects: Vec<_> = splits
1066 .iter()
1067 .flat_map(|ix| ix.tx_changed_objects.clone())
1068 .collect();
1069 let wrapped_or_deleted_objects: Vec<_> = splits
1070 .iter()
1071 .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1072 .collect();
1073 let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1074 let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1075 let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1076 let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1077 let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1078
1079 let futures = [
1080 self.spawn_blocking_task(move |this| {
1081 persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1082 }),
1083 self.spawn_blocking_task(move |this| {
1084 persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1085 }),
1086 self.spawn_blocking_task(move |this| {
1087 persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1088 }),
1089 self.spawn_blocking_task(move |this| {
1090 persist_chunk_into_table!(
1091 tx_changed_objects::table,
1092 changed_objects,
1093 &this.blocking_cp
1094 )
1095 }),
1096 self.spawn_blocking_task(move |this| {
1097 persist_chunk_into_table!(
1098 tx_wrapped_or_deleted_objects::table,
1099 wrapped_or_deleted_objects,
1100 &this.blocking_cp
1101 )
1102 }),
1103 self.spawn_blocking_task(move |this| {
1104 persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1105 }),
1106 self.spawn_blocking_task(move |this| {
1107 persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1108 }),
1109 self.spawn_blocking_task(move |this| {
1110 persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1111 }),
1112 self.spawn_blocking_task(move |this| {
1113 persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1114 }),
1115 self.spawn_blocking_task(move |this| {
1116 persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1117 }),
1118 ];
1119
1120 futures::future::try_join_all(futures)
1121 .await
1122 .map_err(|e| {
1123 tracing::error!("failed to join tx indices futures in a chunk: {e}");
1124 IndexerError::from(e)
1125 })?
1126 .into_iter()
1127 .collect::<Result<Vec<_>, _>>()
1128 .map_err(|e| {
1129 IndexerError::PostgresWrite(format!(
1130 "Failed to persist all tx indices in a chunk: {e:?}"
1131 ))
1132 })?;
1133 let elapsed = guard.stop_and_record();
1134 info!(elapsed, "Persisted {} chunked tx_indices", len);
1135 Ok(())
1136 }
1137
1138 fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1139 let guard = self
1140 .metrics
1141 .checkpoint_db_commit_latency_epoch
1142 .start_timer();
1143 let epoch_id = epoch.new_epoch.epoch;
1144
1145 transactional_blocking_with_retry!(
1146 &self.blocking_cp,
1147 |conn| {
1148 if let Some(last_epoch) = &epoch.last_epoch {
1149 info!(last_epoch.epoch, "Persisting epoch end data.");
1150 diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1151 .set(last_epoch)
1152 .execute(conn)?;
1153 }
1154
1155 info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1156 insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1157 Ok::<(), IndexerError>(())
1158 },
1159 PG_DB_COMMIT_SLEEP_DURATION
1160 )
1161 .tap_ok(|_| {
1162 let elapsed = guard.stop_and_record();
1163 info!(elapsed, epoch_id, "Persisted epoch beginning info");
1164 })
1165 .tap_err(|e| {
1166 tracing::error!("failed to persist epoch with error: {e}");
1167 })
1168 }
1169
1170 fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1171 let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1172 if let Some(last_epoch_id) = last_epoch_id {
1174 let last_db_epoch: Option<StoredEpochInfo> =
1175 read_only_blocking!(&self.blocking_cp, |conn| {
1176 epochs::table
1177 .filter(epochs::epoch.eq(last_epoch_id))
1178 .first::<StoredEpochInfo>(conn)
1179 .optional()
1180 })
1181 .context("Failed to read last epoch from PostgresDB")?;
1182 if let Some(last_epoch) = last_db_epoch {
1183 let epoch_partition_data =
1184 EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1185 let table_partitions = self.partition_manager.get_table_partitions()?;
1186 for (table, (_, last_partition)) in table_partitions {
1187 if !self
1189 .partition_manager
1190 .get_strategy(&table)
1191 .is_epoch_partitioned()
1192 {
1193 continue;
1194 }
1195 let guard = self.metrics.advance_epoch_latency.start_timer();
1196 self.partition_manager.advance_epoch(
1197 table.clone(),
1198 last_partition,
1199 &epoch_partition_data,
1200 )?;
1201 let elapsed = guard.stop_and_record();
1202 info!(
1203 elapsed,
1204 "Advanced epoch partition {} for table {}",
1205 last_partition,
1206 table.clone()
1207 );
1208 }
1209 } else {
1210 tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1211 }
1212 }
1213
1214 Ok(())
1215 }
1216
1217 fn prune_checkpoints_table_by_range(
1218 &self,
1219 min_cp: u64,
1220 max_cp: u64,
1221 ) -> Result<(), IndexerError> {
1222 transactional_blocking_with_retry!(
1223 &self.blocking_cp,
1224 |conn| {
1225 diesel::delete(
1226 checkpoints::table
1227 .filter(checkpoints::sequence_number.between(min_cp as i64, max_cp as i64)),
1228 )
1229 .execute(conn)
1230 .map_err(IndexerError::from)
1231 .context("Failed to prune checkpoints table by range")?;
1232
1233 Ok::<(), IndexerError>(())
1234 },
1235 PG_DB_COMMIT_SLEEP_DURATION
1236 )
1237 }
1238
1239 fn prune_tx_global_order(
1242 &self,
1243 conn: &mut PgConnection,
1244 min_tx: i64,
1245 max_tx: i64,
1246 ) -> Result<(), IndexerError> {
1247 diesel::delete(
1248 tx_global_order::table
1249 .filter(tx_global_order::chk_tx_sequence_number.between(min_tx, max_tx)),
1250 )
1251 .execute(conn)
1252 .map_err(IndexerError::from)
1253 .context("Failed to prune tx_global_order table")
1254 .map(|_| ())
1255 }
1256
1257 fn prune_single_tx_or_event_table(
1259 &self,
1260 table: &crate::pruning::pruner::PrunableTable,
1261 min_tx: u64,
1262 max_tx: u64,
1263 ) -> Result<(), IndexerError> {
1264 use crate::pruning::pruner::PrunableTable;
1265
1266 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1267
1268 transactional_blocking_with_retry!(
1269 &self.blocking_cp,
1270 |conn| {
1271 match table {
1272 PrunableTable::EventEmitModule => {
1274 prune_tx_or_event_indice_table!(
1275 event_emit_module,
1276 conn,
1277 min_tx,
1278 max_tx,
1279 "Failed to prune event_emit_module table"
1280 );
1281 }
1282 PrunableTable::EventEmitPackage => {
1283 prune_tx_or_event_indice_table!(
1284 event_emit_package,
1285 conn,
1286 min_tx,
1287 max_tx,
1288 "Failed to prune event_emit_package table"
1289 );
1290 }
1291 PrunableTable::EventSenders => {
1292 prune_tx_or_event_indice_table!(
1293 event_senders,
1294 conn,
1295 min_tx,
1296 max_tx,
1297 "Failed to prune event_senders table"
1298 );
1299 }
1300 PrunableTable::EventStructInstantiation => {
1301 prune_tx_or_event_indice_table!(
1302 event_struct_instantiation,
1303 conn,
1304 min_tx,
1305 max_tx,
1306 "Failed to prune event_struct_instantiation table"
1307 );
1308 }
1309 PrunableTable::EventStructModule => {
1310 prune_tx_or_event_indice_table!(
1311 event_struct_module,
1312 conn,
1313 min_tx,
1314 max_tx,
1315 "Failed to prune event_struct_module table"
1316 );
1317 }
1318 PrunableTable::EventStructName => {
1319 prune_tx_or_event_indice_table!(
1320 event_struct_name,
1321 conn,
1322 min_tx,
1323 max_tx,
1324 "Failed to prune event_struct_name table"
1325 );
1326 }
1327 PrunableTable::EventStructPackage => {
1328 prune_tx_or_event_indice_table!(
1329 event_struct_package,
1330 conn,
1331 min_tx,
1332 max_tx,
1333 "Failed to prune event_struct_package table"
1334 );
1335 }
1336
1337 PrunableTable::TxSenders => {
1339 prune_tx_or_event_indice_table!(
1340 tx_senders,
1341 conn,
1342 min_tx,
1343 max_tx,
1344 "Failed to prune tx_senders table"
1345 );
1346 }
1347 PrunableTable::TxRecipients => {
1348 prune_tx_or_event_indice_table!(
1349 tx_recipients,
1350 conn,
1351 min_tx,
1352 max_tx,
1353 "Failed to prune tx_recipients table"
1354 );
1355 }
1356 PrunableTable::TxInputObjects => {
1357 prune_tx_or_event_indice_table!(
1358 tx_input_objects,
1359 conn,
1360 min_tx,
1361 max_tx,
1362 "Failed to prune tx_input_objects table"
1363 );
1364 }
1365 PrunableTable::TxChangedObjects => {
1366 prune_tx_or_event_indice_table!(
1367 tx_changed_objects,
1368 conn,
1369 min_tx,
1370 max_tx,
1371 "Failed to prune tx_changed_objects table"
1372 );
1373 }
1374 PrunableTable::TxWrappedOrDeletedObjects => {
1375 prune_tx_or_event_indice_table!(
1376 tx_wrapped_or_deleted_objects,
1377 conn,
1378 min_tx,
1379 max_tx,
1380 "Failed to prune tx_wrapped_or_deleted_objects table"
1381 );
1382 }
1383 PrunableTable::TxCallsPkg => {
1384 prune_tx_or_event_indice_table!(
1385 tx_calls_pkg,
1386 conn,
1387 min_tx,
1388 max_tx,
1389 "Failed to prune tx_calls_pkg table"
1390 );
1391 }
1392 PrunableTable::TxCallsMod => {
1393 prune_tx_or_event_indice_table!(
1394 tx_calls_mod,
1395 conn,
1396 min_tx,
1397 max_tx,
1398 "Failed to prune tx_calls_mod table"
1399 );
1400 }
1401 PrunableTable::TxCallsFun => {
1402 prune_tx_or_event_indice_table!(
1403 tx_calls_fun,
1404 conn,
1405 min_tx,
1406 max_tx,
1407 "Failed to prune tx_calls_fun table"
1408 );
1409 }
1410 PrunableTable::TxDigests => {
1411 prune_tx_or_event_indice_table!(
1412 tx_digests,
1413 conn,
1414 min_tx,
1415 max_tx,
1416 "Failed to prune tx_digests table"
1417 );
1418 }
1419 PrunableTable::TxKinds => {
1420 prune_tx_or_event_indice_table!(
1421 tx_kinds,
1422 conn,
1423 min_tx,
1424 max_tx,
1425 "Failed to prune tx_kinds table"
1426 );
1427 }
1428 PrunableTable::TxGlobalOrder => {
1429 self.prune_tx_global_order(conn, min_tx, max_tx)?;
1430 }
1431 _ => {
1432 return Err(IndexerError::InvalidArgument(format!(
1433 "table {} is not a transaction or event index table",
1434 table.as_ref()
1435 )));
1436 }
1437 }
1438 Ok::<(), IndexerError>(())
1439 },
1440 PG_DB_COMMIT_SLEEP_DURATION
1441 )
1442 }
1443
1444 fn prune_optimistic_tx_by_global_seq(
1447 &self,
1448 start: u64,
1449 end: u64,
1450 limit: i64,
1451 ) -> Result<usize, IndexerError> {
1452 use diesel::prelude::*;
1453
1454 transactional_blocking_with_retry!(
1455 &self.blocking_cp,
1456 |conn| {
1457 let sql = r#"
1458 WITH ids_to_delete AS (
1459 SELECT global_sequence_number, optimistic_sequence_number
1460 FROM optimistic_transactions
1461 WHERE global_sequence_number BETWEEN $1 AND $2
1462 ORDER BY global_sequence_number, optimistic_sequence_number
1463 FOR UPDATE
1464 LIMIT $3
1465 )
1466 DELETE FROM optimistic_transactions otx
1467 USING ids_to_delete
1468 WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
1469 (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
1470 "#;
1471 diesel::sql_query(sql)
1472 .bind::<diesel::sql_types::BigInt, _>(start as i64)
1473 .bind::<diesel::sql_types::BigInt, _>(end as i64)
1474 .bind::<diesel::sql_types::BigInt, _>(limit)
1475 .execute(conn)
1476 .map_err(IndexerError::from)
1477 .context(
1478 format!(
1479 "failed to prune optimistic_transactions table by global_sequence_number range [{start}..={end}] with limit {limit}"
1480 )
1481 .as_str(),
1482 )
1483 },
1484 PG_DB_COMMIT_SLEEP_DURATION
1485 )
1486 }
1487
1488 fn prune_cp_tx_table_by_range(&self, min_cp: u64, max_cp: u64) -> Result<(), IndexerError> {
1489 transactional_blocking_with_retry!(
1490 &self.blocking_cp,
1491 |conn| {
1492 diesel::delete(
1493 pruner_cp_watermark::table.filter(
1494 pruner_cp_watermark::checkpoint_sequence_number
1495 .between(min_cp as i64, max_cp as i64),
1496 ),
1497 )
1498 .execute(conn)
1499 .map_err(IndexerError::from)
1500 .context("Failed to prune pruner_cp_watermark table by range")?;
1501 Ok::<(), IndexerError>(())
1502 },
1503 PG_DB_COMMIT_SLEEP_DURATION
1504 )
1505 }
1506
1507 fn prune_table_by_checkpoint_range(
1508 &self,
1509 table: &crate::pruning::pruner::PrunableTable,
1510 min_checkpoint: u64,
1511 max_checkpoint: u64,
1512 ) -> Result<(), IndexerError> {
1513 use crate::pruning::pruner::PrunableTable;
1514
1515 match table {
1516 PrunableTable::Checkpoints => {
1517 self.prune_checkpoints_table_by_range(min_checkpoint, max_checkpoint)
1518 }
1519 PrunableTable::PrunerCpWatermark => {
1520 self.prune_cp_tx_table_by_range(min_checkpoint, max_checkpoint)
1521 }
1522 _ => Err(IndexerError::InvalidArgument(format!(
1523 "table {} is not pruned by checkpoint",
1524 table.as_ref()
1525 ))),
1526 }
1527 }
1528
1529 fn get_network_total_transactions_by_end_of_epoch(
1530 &self,
1531 epoch: u64,
1532 ) -> Result<Option<u64>, IndexerError> {
1533 read_only_blocking!(&self.blocking_cp, |conn| {
1534 epochs::table
1535 .filter(epochs::epoch.eq(epoch as i64))
1536 .select(epochs::network_total_transactions)
1537 .get_result::<Option<i64>>(conn)
1538 })
1539 .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1540 .map(|option| option.map(|v| v as u64))
1541 }
1542
1543 fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1544 transactional_blocking_with_retry!(
1545 &self.blocking_cp,
1546 |conn| {
1547 diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1548 .execute(conn)?;
1549 Ok::<(), IndexerError>(())
1550 },
1551 PG_DB_COMMIT_SLEEP_DURATION
1552 )
1553 .tap_ok(|_| {
1554 info!("Successfully refreshed participation_metrics");
1555 })
1556 .tap_err(|e| {
1557 tracing::error!("failed to refresh participation_metrics: {e}");
1558 })
1559 }
1560
1561 fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1562 &self,
1563 watermark: CommitterWatermark,
1564 ) -> Result<(), IndexerError>
1565 where
1566 E::Iterator: Iterator<Item: AsRef<str>>,
1567 {
1568 use diesel::query_dsl::methods::FilterDsl;
1569
1570 let guard = self
1571 .metrics
1572 .checkpoint_db_commit_latency_watermarks
1573 .start_timer();
1574
1575 let upper_bound_updates = E::iter()
1576 .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1577 .collect::<Vec<_>>();
1578
1579 transactional_blocking_with_retry!(
1580 &self.blocking_cp,
1581 |conn| {
1582 diesel::insert_into(watermarks::table)
1583 .values(&upper_bound_updates)
1584 .on_conflict(watermarks::entity)
1585 .do_update()
1586 .set((
1587 watermarks::current_epoch.eq(excluded(watermarks::current_epoch)),
1588 watermarks::max_committed_cp.eq(excluded(watermarks::max_committed_cp)),
1589 watermarks::max_committed_tx.eq(excluded(watermarks::max_committed_tx)),
1590 ))
1591 .filter(excluded(watermarks::max_committed_cp).ge(watermarks::max_committed_cp))
1592 .filter(excluded(watermarks::max_committed_tx).ge(watermarks::max_committed_tx))
1593 .filter(excluded(watermarks::current_epoch).ge(watermarks::current_epoch))
1594 .execute(conn)
1595 .map_err(IndexerError::from)
1596 .context("Failed to update watermarks upper bound")?;
1597 Ok::<(), IndexerError>(())
1598 },
1599 PG_DB_COMMIT_SLEEP_DURATION
1600 )
1601 .tap_ok(|_| {
1602 let elapsed = guard.stop_and_record();
1603 info!(elapsed, "Persisted watermarks");
1604 })
1605 .tap_err(|e| {
1606 tracing::error!("Failed to persist watermarks with error: {}", e);
1607 })
1608 }
1609
1610 fn map_epochs_to_cp_tx(
1611 &self,
1612 epochs: &[u64],
1613 ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1614 let pool = &self.blocking_cp;
1615 let results: Vec<(i64, i64, i64)> = run_query!(pool, move |conn| {
1616 epochs::table
1617 .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1618 .select((
1619 epochs::epoch,
1620 epochs::first_checkpoint_id,
1621 epochs::first_tx_sequence_number,
1622 ))
1623 .load::<(i64, i64, i64)>(conn)
1624 })
1625 .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1626
1627 Ok(results
1628 .into_iter()
1629 .map(|(epoch, checkpoint, tx)| (epoch as u64, (checkpoint as u64, tx as u64)))
1630 .collect())
1631 }
1632
1633 fn update_watermarks_lower_bound(
1634 &self,
1635 watermarks: Vec<(PrunableTable, u64)>,
1636 ) -> Result<(), IndexerError> {
1637 use diesel::query_dsl::methods::FilterDsl;
1638
1639 let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1640 let epoch_mapping = self.map_epochs_to_cp_tx(&epochs)?;
1641 let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1642 .into_iter()
1643 .map(|(table, epoch)| {
1644 let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1645 IndexerError::PersistentStorageDataCorruption(format!(
1646 "epoch {epoch} not found in epoch mapping",
1647 ))
1648 })?;
1649 Ok(StoredWatermark::from_lower_bound_update(
1650 table.as_ref(),
1651 epoch,
1652 *checkpoint,
1653 *tx,
1654 ))
1655 })
1656 .collect();
1657 let lower_bound_updates = lookups?;
1658 let guard = self
1659 .metrics
1660 .checkpoint_db_commit_latency_watermarks
1661 .start_timer();
1662 transactional_blocking_with_retry!(
1663 &self.blocking_cp,
1664 |conn| {
1665 diesel::insert_into(watermarks::table)
1666 .values(&lower_bound_updates)
1667 .on_conflict(watermarks::entity)
1668 .do_update()
1669 .set((
1670 watermarks::min_available_cp.eq(excluded(watermarks::min_available_cp)),
1671 watermarks::min_available_tx.eq(excluded(watermarks::min_available_tx)),
1672 watermarks::min_available_epoch
1673 .eq(excluded(watermarks::min_available_epoch)),
1674 watermarks::min_bounds_updated_at_timestamp_ms.eq(sql::<
1675 diesel::sql_types::BigInt,
1676 >(
1677 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1678 )),
1679 ))
1680 .filter(excluded(watermarks::min_available_cp).gt(watermarks::min_available_cp))
1681 .filter(excluded(watermarks::min_available_tx).gt(watermarks::min_available_tx))
1682 .filter(
1683 excluded(watermarks::min_available_epoch)
1684 .gt(watermarks::min_available_epoch),
1685 )
1686 .filter(
1687 diesel::dsl::sql::<diesel::sql_types::BigInt>(
1688 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1689 )
1690 .gt(watermarks::min_bounds_updated_at_timestamp_ms),
1691 )
1692 .execute(conn)
1693 },
1694 PG_DB_COMMIT_SLEEP_DURATION
1695 )
1696 .tap_ok(|_| {
1697 let elapsed = guard.stop_and_record();
1698 tracing::info!(elapsed, "Persisted watermarks lower bounds");
1699 })
1700 .tap_err(|e| {
1701 tracing::error!("Failed to persist watermarks with error: {}", e);
1702 })?;
1703 Ok(())
1704 }
1705
1706 fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1707 run_query_with_retry!(
1710 &self.blocking_cp,
1711 |conn| {
1712 let stored = watermarks::table
1713 .load::<StoredWatermark>(conn)
1714 .map_err(Into::into)
1715 .context("Failed reading watermarks from PostgresDB")?;
1716 let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1717 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1718 ))
1719 .get_result(conn)
1720 .map_err(Into::into)
1721 .context("Failed reading current timestamp from PostgresDB")?;
1722 Ok::<_, IndexerError>((stored, timestamp))
1723 },
1724 PG_DB_COMMIT_SLEEP_DURATION
1725 )
1726 }
1727
1728 fn update_watermark_lowest_unpruned_key(
1729 &self,
1730 table: &PrunableTable,
1731 lowest_unpruned_key: u64,
1732 ) -> Result<(), IndexerError> {
1733 transactional_blocking_with_retry!(
1734 &self.blocking_cp,
1735 |conn| {
1736 diesel::update(watermarks::table.filter(watermarks::entity.eq(table.as_ref())))
1737 .set(watermarks::lowest_unpruned_key.eq(lowest_unpruned_key as i64))
1738 .execute(conn)
1739 .map_err(IndexerError::from)
1740 .context("failed to update watermark lowest_unpruned_key")?;
1741 Ok::<(), IndexerError>(())
1742 },
1743 PG_DB_COMMIT_SLEEP_DURATION
1744 )
1745 }
1746
1747 fn get_watermark_by_entity(
1748 &self,
1749 entity: &str,
1750 ) -> Result<Option<StoredWatermark>, IndexerError> {
1751 run_query_with_retry!(
1752 &self.blocking_cp,
1753 |conn| {
1754 watermarks::table
1755 .filter(watermarks::entity.eq(entity))
1756 .first::<StoredWatermark>(conn)
1757 .optional()
1758 .map_err(Into::into)
1759 .context("failed reading watermark by entity from PostgresDB")
1760 },
1761 PG_DB_COMMIT_SLEEP_DURATION
1762 )
1763 }
1764
1765 async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1766 where
1767 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1768 R: Send + 'static,
1769 {
1770 let this = self.clone();
1771 let current_span = tracing::Span::current();
1772 tokio::task::spawn_blocking(move || {
1773 mark_in_blocking_pool();
1774 let _guard = current_span.enter();
1775 f(this)
1776 })
1777 .await
1778 .map_err(Into::into)
1779 .and_then(std::convert::identity)
1780 }
1781
1782 fn spawn_blocking_task<F, R>(
1783 &self,
1784 f: F,
1785 ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1786 where
1787 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1788 R: Send + 'static,
1789 {
1790 let this = self.clone();
1791 let current_span = tracing::Span::current();
1792 let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1793 tokio::task::spawn_blocking(move || {
1794 mark_in_blocking_pool();
1795 let _guard = current_span.enter();
1796 let _elapsed = guard.stop_and_record();
1797 f(this)
1798 })
1799 }
1800
1801 fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1802 where
1803 F: FnOnce(Self) -> Fut + Send + 'static,
1804 Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1805 R: Send + 'static,
1806 {
1807 let this = self.clone();
1808 tokio::task::spawn(async move { f(this).await })
1809 }
1810}
1811
1812#[async_trait]
1813impl IndexerStore for PgIndexerStore {
1814 async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1815 self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1816 .await
1817 }
1818
1819 async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1820 self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1821 .await
1822 }
1823
1824 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1825 self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1826 .await
1827 }
1828
1829 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1830 self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1831 .await
1832 }
1833
1834 async fn get_latest_object_snapshot_watermark(
1835 &self,
1836 ) -> Result<Option<CommitterWatermark>, IndexerError> {
1837 self.execute_in_blocking_worker(|this| this.get_latest_object_snapshot_watermark())
1838 .await
1839 }
1840
1841 async fn get_latest_object_snapshot_checkpoint_sequence_number(
1842 &self,
1843 ) -> Result<Option<CheckpointSequenceNumber>, IndexerError> {
1844 self.execute_in_blocking_worker(|this| {
1845 this.get_latest_object_snapshot_checkpoint_sequence_number()
1846 })
1847 .await
1848 }
1849
1850 fn persist_objects_in_existing_transaction(
1851 &self,
1852 conn: &mut PgConnection,
1853 object_changes: Vec<TransactionObjectChangesToCommit>,
1854 ) -> Result<(), IndexerError> {
1855 if object_changes.is_empty() {
1856 return Ok(());
1857 }
1858
1859 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1860 let object_mutations = indexed_mutations
1861 .into_iter()
1862 .map(StoredObject::from)
1863 .collect::<Vec<_>>();
1864 let object_deletions = indexed_deletions
1865 .into_iter()
1866 .map(StoredDeletedObject::from)
1867 .collect::<Vec<_>>();
1868
1869 self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1870 self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1871
1872 Ok(())
1873 }
1874
1875 async fn persist_objects_snapshot(
1876 &self,
1877 object_changes: Vec<TransactionObjectChangesToCommit>,
1878 ) -> Result<(), IndexerError> {
1879 if object_changes.is_empty() {
1880 return Ok(());
1881 }
1882 let guard = self
1883 .metrics
1884 .checkpoint_db_commit_latency_objects_snapshot
1885 .start_timer();
1886 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1887 let objects_snapshot = indexed_mutations
1888 .into_iter()
1889 .map(StoredObjectSnapshot::try_from)
1890 .chain(
1891 indexed_deletions
1892 .into_iter()
1893 .map(|o| Ok(StoredObjectSnapshot::from(o))),
1894 )
1895 .collect::<Result<Vec<_>, _>>()?;
1896 let len = objects_snapshot.len();
1897 let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1898 let futures = chunks
1899 .into_iter()
1900 .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1901
1902 futures::future::try_join_all(futures)
1903 .await
1904 .map_err(|e| {
1905 tracing::error!("failed to join backfill_objects_snapshot_chunk futures: {e}");
1906 IndexerError::from(e)
1907 })?
1908 .into_iter()
1909 .collect::<Result<Vec<_>, _>>()
1910 .map_err(|e| {
1911 IndexerError::PostgresWrite(format!(
1912 "Failed to persist all objects snapshot chunks: {e:?}"
1913 ))
1914 })?;
1915 let elapsed = guard.stop_and_record();
1916 info!(elapsed, "Persisted {} objects snapshot", len);
1917 Ok(())
1918 }
1919
1920 async fn persist_object_history(
1921 &self,
1922 object_changes: Vec<TransactionObjectChangesToCommit>,
1923 ) -> Result<(), IndexerError> {
1924 let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1925 .map(|val| val.eq_ignore_ascii_case("true"))
1926 .unwrap_or(false);
1927 if skip_history {
1928 info!("skipping object history");
1929 return Ok(());
1930 }
1931
1932 if object_changes.is_empty() {
1933 return Ok(());
1934 }
1935 let objects = make_objects_history_to_commit(object_changes)?;
1936 let guard = self
1937 .metrics
1938 .checkpoint_db_commit_latency_objects_history
1939 .start_timer();
1940
1941 let len = objects.len();
1942 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1943 let futures = chunks
1944 .into_iter()
1945 .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1946
1947 futures::future::try_join_all(futures)
1948 .await
1949 .map_err(|e| {
1950 tracing::error!("failed to join persist_objects_history_chunk futures: {e}");
1951 IndexerError::from(e)
1952 })?
1953 .into_iter()
1954 .collect::<Result<Vec<_>, _>>()
1955 .map_err(|e| {
1956 IndexerError::PostgresWrite(format!(
1957 "Failed to persist all objects history chunks: {e:?}"
1958 ))
1959 })?;
1960 let elapsed = guard.stop_and_record();
1961 info!(elapsed, "Persisted {} objects history", len);
1962 Ok(())
1963 }
1964
1965 async fn persist_object_versions(
1966 &self,
1967 object_versions: Vec<StoredObjectVersion>,
1968 ) -> Result<(), IndexerError> {
1969 if object_versions.is_empty() {
1970 return Ok(());
1971 }
1972
1973 let guard = self
1974 .metrics
1975 .checkpoint_db_commit_latency_objects_version
1976 .start_timer();
1977
1978 let object_versions_count = object_versions.len();
1979
1980 let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1981 let futures = chunks
1982 .into_iter()
1983 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1984 .collect::<Vec<_>>();
1985
1986 futures::future::try_join_all(futures)
1987 .await
1988 .map_err(|e| {
1989 tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1990 IndexerError::from(e)
1991 })?
1992 .into_iter()
1993 .collect::<Result<Vec<_>, _>>()
1994 .map_err(|e| {
1995 IndexerError::PostgresWrite(format!(
1996 "Failed to persist all objects version chunks: {e:?}"
1997 ))
1998 })?;
1999 let elapsed = guard.stop_and_record();
2000 info!(elapsed, "Persisted {object_versions_count} object versions");
2001 Ok(())
2002 }
2003
2004 async fn persist_checkpoints(
2005 &self,
2006 checkpoints: Vec<IndexedCheckpoint>,
2007 ) -> Result<(), IndexerError> {
2008 self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
2009 .await
2010 }
2011
2012 async fn persist_transactions(
2013 &self,
2014 transactions: Vec<IndexedTransaction>,
2015 ) -> Result<(), IndexerError> {
2016 let guard = self
2017 .metrics
2018 .checkpoint_db_commit_latency_transactions
2019 .start_timer();
2020 let len = transactions.len();
2021
2022 let chunks = chunk!(transactions, self.config.parallel_chunk_size);
2023 let futures = chunks
2024 .into_iter()
2025 .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
2026
2027 futures::future::try_join_all(futures)
2028 .await
2029 .map_err(|e| {
2030 tracing::error!("failed to join persist_transactions_chunk futures: {e}");
2031 IndexerError::from(e)
2032 })?
2033 .into_iter()
2034 .collect::<Result<Vec<_>, _>>()
2035 .map_err(|e| {
2036 IndexerError::PostgresWrite(format!(
2037 "Failed to persist all transactions chunks: {e:?}"
2038 ))
2039 })?;
2040 let elapsed = guard.stop_and_record();
2041 info!(elapsed, "Persisted {} transactions", len);
2042 Ok(())
2043 }
2044
2045 fn persist_optimistic_transaction_in_existing_transaction(
2046 &self,
2047 conn: &mut PgConnection,
2048 transaction: OptimisticTransaction,
2049 ) -> Result<(), IndexerError> {
2050 insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
2051 Ok(())
2052 }
2053
2054 async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
2055 if events.is_empty() {
2056 return Ok(());
2057 }
2058 let len = events.len();
2059 let guard = self
2060 .metrics
2061 .checkpoint_db_commit_latency_events
2062 .start_timer();
2063 let chunks = chunk!(events, self.config.parallel_chunk_size);
2064 let futures = chunks
2065 .into_iter()
2066 .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
2067
2068 futures::future::try_join_all(futures)
2069 .await
2070 .map_err(|e| {
2071 tracing::error!("failed to join persist_events_chunk futures: {e}");
2072 IndexerError::from(e)
2073 })?
2074 .into_iter()
2075 .collect::<Result<Vec<_>, _>>()
2076 .map_err(|e| {
2077 IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
2078 })?;
2079 let elapsed = guard.stop_and_record();
2080 info!(elapsed, "Persisted {} events", len);
2081 Ok(())
2082 }
2083
2084 async fn persist_displays(
2085 &self,
2086 display_updates: BTreeMap<String, StoredDisplay>,
2087 ) -> Result<(), IndexerError> {
2088 if display_updates.is_empty() {
2089 return Ok(());
2090 }
2091
2092 self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
2093 .await?
2094 }
2095
2096 fn persist_displays_in_existing_transaction(
2097 &self,
2098 conn: &mut PgConnection,
2099 display_updates: Vec<&StoredDisplay>,
2100 ) -> Result<(), IndexerError> {
2101 if display_updates.is_empty() {
2102 return Ok(());
2103 }
2104
2105 on_conflict_do_update_with_condition!(
2106 display::table,
2107 display_updates,
2108 display::object_type,
2109 (
2110 display::id.eq(excluded(display::id)),
2111 display::version.eq(excluded(display::version)),
2112 display::bcs.eq(excluded(display::bcs)),
2113 ),
2114 excluded(display::version).gt(display::version),
2115 conn
2116 );
2117
2118 Ok(())
2119 }
2120
2121 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2122 if packages.is_empty() {
2123 return Ok(());
2124 }
2125 self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
2126 .await
2127 }
2128
2129 async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2130 if indices.is_empty() {
2131 return Ok(());
2132 }
2133 let len = indices.len();
2134 let guard = self
2135 .metrics
2136 .checkpoint_db_commit_latency_event_indices
2137 .start_timer();
2138 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2139
2140 let futures = chunks.into_iter().map(|chunk| {
2141 self.spawn_task(move |this: Self| async move {
2142 this.persist_event_indices_chunk(chunk).await
2143 })
2144 });
2145
2146 futures::future::try_join_all(futures)
2147 .await
2148 .map_err(|e| {
2149 tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
2150 IndexerError::from(e)
2151 })?
2152 .into_iter()
2153 .collect::<Result<Vec<_>, _>>()
2154 .map_err(|e| {
2155 IndexerError::PostgresWrite(format!(
2156 "Failed to persist all event_indices chunks: {e:?}"
2157 ))
2158 })?;
2159 let elapsed = guard.stop_and_record();
2160 info!(elapsed, "Persisted {} event_indices chunks", len);
2161 Ok(())
2162 }
2163
2164 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2165 self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2166 .await
2167 }
2168
2169 async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2170 self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2171 .await
2172 }
2173
2174 async fn get_network_total_transactions_by_end_of_epoch(
2175 &self,
2176 epoch: u64,
2177 ) -> Result<Option<u64>, IndexerError> {
2178 self.execute_in_blocking_worker(move |this| {
2179 this.get_network_total_transactions_by_end_of_epoch(epoch)
2180 })
2181 .await
2182 }
2183
2184 async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2185 self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2186 .await
2187 }
2188
2189 async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2190 &self,
2191 watermark: CommitterWatermark,
2192 ) -> Result<(), IndexerError>
2193 where
2194 E::Iterator: Iterator<Item: AsRef<str>>,
2195 {
2196 self.execute_in_blocking_worker(move |this| {
2197 this.update_watermarks_upper_bound::<E>(watermark)
2198 })
2199 .await
2200 }
2201
2202 fn as_any(&self) -> &dyn StdAny {
2203 self
2204 }
2205
2206 fn persist_protocol_configs_and_feature_flags(
2209 &self,
2210 chain_id: Vec<u8>,
2211 ) -> Result<(), IndexerError> {
2212 let chain_id = ChainIdentifier::from(
2213 CheckpointDigest::from_bytes(chain_id).expect("unable to convert chain id"),
2214 );
2215
2216 let mut all_configs = vec![];
2217 let mut all_flags = vec![];
2218
2219 let (start_version, end_version) = self.get_protocol_version_index_range()?;
2220 info!(
2221 "Persisting protocol configs with start_version: {}, end_version: {}",
2222 start_version, end_version
2223 );
2224
2225 for version in start_version..=end_version {
2228 let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2229 (version as u64).into(),
2230 chain_id.chain(),
2231 )
2232 .ok_or(IndexerError::Generic(format!(
2233 "Unable to fetch protocol version {} and chain {:?}",
2234 version,
2235 chain_id.chain()
2236 )))?;
2237 let configs_vec = protocol_configs
2238 .attr_map()
2239 .into_iter()
2240 .map(|(k, v)| StoredProtocolConfig {
2241 protocol_version: version,
2242 config_name: k,
2243 config_value: v.map(|v| v.to_string()),
2244 })
2245 .collect::<Vec<_>>();
2246 all_configs.extend(configs_vec);
2247
2248 let feature_flags = protocol_configs
2249 .feature_map()
2250 .into_iter()
2251 .map(|(k, v)| StoredFeatureFlag {
2252 protocol_version: version,
2253 flag_name: k,
2254 flag_value: v,
2255 })
2256 .collect::<Vec<_>>();
2257 all_flags.extend(feature_flags);
2258 }
2259
2260 transactional_blocking_with_retry!(
2261 &self.blocking_cp,
2262 |conn| {
2263 for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2264 insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2265 }
2266 for flag_chunk in all_flags.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2267 insert_or_ignore_into!(feature_flags::table, flag_chunk, conn);
2268 }
2269 Ok::<(), IndexerError>(())
2270 },
2271 PG_DB_COMMIT_SLEEP_DURATION
2272 )?;
2273 Ok(())
2274 }
2275
2276 async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2277 if indices.is_empty() {
2278 return Ok(());
2279 }
2280 let len = indices.len();
2281 let guard = self
2282 .metrics
2283 .checkpoint_db_commit_latency_tx_indices
2284 .start_timer();
2285 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2286
2287 let futures = chunks.into_iter().map(|chunk| {
2288 self.spawn_task(move |this: Self| async move {
2289 this.persist_tx_indices_chunk_v2(chunk).await
2290 })
2291 });
2292 futures::future::try_join_all(futures)
2293 .await
2294 .map_err(|e| {
2295 tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2296 IndexerError::from(e)
2297 })?
2298 .into_iter()
2299 .collect::<Result<Vec<_>, _>>()
2300 .map_err(|e| {
2301 IndexerError::PostgresWrite(format!(
2302 "Failed to persist all tx_indices chunks: {e:?}"
2303 ))
2304 })?;
2305 let elapsed = guard.stop_and_record();
2306 info!(elapsed, "Persisted {} tx_indices chunks", len);
2307 Ok(())
2308 }
2309
2310 async fn persist_checkpoint_objects(
2311 &self,
2312 objects: Vec<CheckpointObjectChanges>,
2313 ) -> Result<(), IndexerError> {
2314 if objects.is_empty() {
2315 return Ok(());
2316 }
2317 let guard = self
2318 .metrics
2319 .checkpoint_db_commit_latency_objects
2320 .start_timer();
2321 let CheckpointObjectChanges {
2322 changed_objects: mutations,
2323 deleted_objects: deletions,
2324 } = retain_latest_objects_from_checkpoint_batch(objects);
2325 let mutation_len = mutations.len();
2326 let deletion_len = deletions.len();
2327
2328 let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2329 let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2330 let mutation_futures = mutation_chunks
2331 .into_iter()
2332 .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2333 let deletion_futures = deletion_chunks
2334 .into_iter()
2335 .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2336 futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2337 .await
2338 .map_err(|e| {
2339 tracing::error!("failed to join futures for persisting objects: {e}");
2340 IndexerError::from(e)
2341 })?
2342 .into_iter()
2343 .collect::<Result<Vec<_>, _>>()
2344 .map_err(|e| {
2345 IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2346 })?;
2347
2348 let elapsed = guard.stop_and_record();
2349 info!(
2350 elapsed,
2351 "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2352 );
2353 Ok(())
2354 }
2355
2356 async fn persist_tx_global_order(
2357 &self,
2358 tx_order: Vec<TxGlobalOrder>,
2359 ) -> Result<(), IndexerError> {
2360 let guard = self
2361 .metrics
2362 .checkpoint_db_commit_latency_tx_insertion_order
2363 .start_timer();
2364 let len = tx_order.len();
2365
2366 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2367 let futures = chunks
2368 .into_iter()
2369 .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2370
2371 futures::future::try_join_all(futures)
2372 .await
2373 .map_err(|e| {
2374 tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2375 IndexerError::from(e)
2376 })?
2377 .into_iter()
2378 .collect::<Result<Vec<_>, _>>()
2379 .map_err(|e| {
2380 IndexerError::PostgresWrite(format!(
2381 "Failed to persist all txs insertion order chunks: {e:?}",
2382 ))
2383 })?;
2384 let elapsed = guard.stop_and_record();
2385 info!(elapsed, "Persisted {len} txs insertion orders");
2386 Ok(())
2387 }
2388
2389 async fn update_watermarks_lower_bound(
2390 &self,
2391 watermarks: Vec<(PrunableTable, u64)>,
2392 ) -> Result<(), IndexerError> {
2393 self.execute_in_blocking_worker(move |this| this.update_watermarks_lower_bound(watermarks))
2394 .await
2395 }
2396
2397 async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2398 self.execute_in_blocking_worker(move |this| this.get_watermarks())
2399 .await
2400 }
2401
2402 async fn prune_table_by_checkpoint_range(
2403 &self,
2404 table: &crate::pruning::pruner::PrunableTable,
2405 min_checkpoint: u64,
2406 max_checkpoint: u64,
2407 ) -> Result<(), IndexerError> {
2408 let table_clone = *table;
2409 self.execute_in_blocking_worker(move |this| {
2410 this.prune_table_by_checkpoint_range(&table_clone, min_checkpoint, max_checkpoint)
2411 })
2412 .await
2413 }
2414
2415 async fn prune_table_by_tx_range(
2416 &self,
2417 table: &crate::pruning::pruner::PrunableTable,
2418 min_tx: u64,
2419 max_tx: u64,
2420 ) -> Result<(), IndexerError> {
2421 let table_clone = *table;
2422 self.execute_in_blocking_worker(move |this| {
2423 this.prune_single_tx_or_event_table(&table_clone, min_tx, max_tx)
2424 })
2425 .await
2426 }
2427
2428 async fn prune_table_by_global_seq_with_limit(
2429 &self,
2430 table: &crate::pruning::pruner::PrunableTable,
2431 start: u64,
2432 end: u64,
2433 limit: i64,
2434 ) -> Result<usize, IndexerError> {
2435 use crate::pruning::pruner::PrunableTable;
2436
2437 if !matches!(table, PrunableTable::OptimisticTransactions) {
2438 return Err(IndexerError::InvalidArgument(format!(
2439 "table {} does not support pruning by global order with limit",
2440 table.as_ref()
2441 )));
2442 }
2443
2444 self.execute_in_blocking_worker(move |this| {
2445 this.prune_optimistic_tx_by_global_seq(start, end, limit)
2446 })
2447 .await
2448 }
2449
2450 async fn update_watermark_lowest_unpruned_key(
2451 &self,
2452 table: &PrunableTable,
2453 lowest_unpruned_key: u64,
2454 ) -> Result<(), IndexerError> {
2455 let table = *table;
2456 self.execute_in_blocking_worker(move |this| {
2457 this.update_watermark_lowest_unpruned_key(&table, lowest_unpruned_key)
2458 })
2459 .await
2460 }
2461
2462 async fn get_watermark_by_entity(
2463 &self,
2464 entity: String,
2465 ) -> Result<Option<StoredWatermark>, IndexerError> {
2466 self.execute_in_blocking_worker(move |this| this.get_watermark_by_entity(&entity))
2467 .await
2468 }
2469}
2470
2471fn make_objects_history_to_commit(
2472 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2473) -> Result<Vec<StoredHistoryObject>, IndexerError> {
2474 let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2475 .clone()
2476 .into_iter()
2477 .flat_map(|changes| changes.deleted_objects)
2478 .map(|o| o.into())
2479 .collect();
2480 let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2481 .into_iter()
2482 .flat_map(|changes| changes.changed_objects)
2483 .map(StoredHistoryObject::try_from)
2484 .collect::<Result<Vec<_>, _>>()?;
2485 Ok(deleted_objects.into_iter().chain(mutated_objects).collect())
2486}
2487
2488fn retain_latest_indexed_objects(
2494 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2495) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2496 use std::collections::HashMap;
2497
2498 let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2499 let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2500
2501 for change in tx_object_changes {
2502 for mutation in change.changed_objects {
2506 let id = mutation.object.id();
2507 let version = mutation.object.version();
2508
2509 if let Some(existing) = deletions.remove(&id) {
2510 assert!(
2511 existing.object_version < version,
2512 "mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2513 existing.object_version
2514 );
2515 }
2516
2517 if let Some(existing) = mutations.insert(id, mutation) {
2518 assert!(
2519 existing.object.version() < version,
2520 "mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2521 existing.object.version()
2522 );
2523 }
2524 }
2525 for deletion in change.deleted_objects {
2527 let id = deletion.object_id;
2528 let version = deletion.object_version;
2529
2530 if let Some(existing) = mutations.remove(&id) {
2531 assert!(
2532 existing.object.version() < version,
2533 "deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2534 existing.object.version(),
2535 );
2536 }
2537
2538 if let Some(existing) = deletions.insert(id, deletion) {
2539 assert!(
2540 existing.object_version < version,
2541 "deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2542 existing.object_version
2543 );
2544 }
2545 }
2546 }
2547
2548 (
2549 mutations.into_values().collect(),
2550 deletions.into_values().collect(),
2551 )
2552}