1use core::result::Result::Ok;
6use std::{any::Any as StdAny, collections::BTreeMap, time::Duration};
7
8use async_trait::async_trait;
9use diesel::{
10 ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
11 dsl::{max, min},
12 sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
13 upsert::excluded,
14};
15use downcast::Any;
16use iota_protocol_config::ProtocolConfig;
17use iota_types::{
18 base_types::ObjectID,
19 digests::{ChainIdentifier, CheckpointDigest},
20};
21use itertools::Itertools;
22use tap::TapFallible;
23use tracing::info;
24
25use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
26use crate::{
27 db::ConnectionPool,
28 errors::{Context, IndexerError},
29 handlers::{EpochToCommit, TransactionObjectChangesToCommit},
30 insert_or_ignore_into,
31 metrics::IndexerMetrics,
32 models::{
33 checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
34 display::StoredDisplay,
35 epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
36 events::StoredEvent,
37 obj_indices::StoredObjectVersion,
38 objects::{
39 StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
40 StoredObjects,
41 },
42 packages::StoredPackage,
43 transactions::{
44 CheckpointTxGlobalOrder, IndexStatus, OptimisticTransaction, StoredTransaction,
45 },
46 tx_indices::TxIndexV2Split,
47 },
48 on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
49 persist_chunk_into_table_in_existing_connection, read_only_blocking,
50 rolling::transform::{
51 CheckpointObjectChanges, LiveObject, RemovedObject,
52 retain_latest_objects_from_checkpoint_batch,
53 },
54 schema::{
55 chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
56 event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
57 event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
58 objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
59 transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
60 tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
61 tx_wrapped_or_deleted_objects,
62 },
63 store::{IndexerStore, IndexerStoreExt},
64 transactional_blocking_with_retry,
65 types::{
66 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
67 IndexedPackage, IndexedTransaction, TxIndex, TxIndexV2,
68 },
69};
70
71#[macro_export]
72macro_rules! chunk {
73 ($data: expr, $size: expr) => {{
74 $data
75 .into_iter()
76 .chunks($size)
77 .into_iter()
78 .map(|c| c.collect())
79 .collect::<Vec<Vec<_>>>()
80 }};
81}
82
83macro_rules! prune_tx_or_event_indice_table {
84 ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
85 diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
86 .execute($conn)
87 .map_err(IndexerError::from)
88 .context($context_msg)?;
89 };
90}
91
92const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
97const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
99const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
103const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
104
105#[derive(Clone)]
106pub struct PgIndexerStoreConfig {
107 pub parallel_chunk_size: usize,
108 pub parallel_objects_chunk_size: usize,
109}
110
111pub struct PgIndexerStore {
112 blocking_cp: ConnectionPool,
113 metrics: IndexerMetrics,
114 partition_manager: PgPartitionManager,
115 config: PgIndexerStoreConfig,
116}
117
118impl Clone for PgIndexerStore {
119 fn clone(&self) -> PgIndexerStore {
120 Self {
121 blocking_cp: self.blocking_cp.clone(),
122 metrics: self.metrics.clone(),
123 partition_manager: self.partition_manager.clone(),
124 config: self.config.clone(),
125 }
126 }
127}
128
129impl PgIndexerStore {
130 pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
131 let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
132 .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
133 .parse::<usize>()
134 .unwrap();
135 let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
136 .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
137 .parse::<usize>()
138 .unwrap();
139 let partition_manager = PgPartitionManager::new(blocking_cp.clone())
140 .expect("Failed to initialize partition manager");
141 let config = PgIndexerStoreConfig {
142 parallel_chunk_size,
143 parallel_objects_chunk_size,
144 };
145
146 Self {
147 blocking_cp,
148 metrics,
149 partition_manager,
150 config,
151 }
152 }
153
154 pub fn blocking_cp(&self) -> ConnectionPool {
155 self.blocking_cp.clone()
156 }
157
158 pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
159 read_only_blocking!(&self.blocking_cp, |conn| {
160 epochs::dsl::epochs
161 .select(max(epochs::epoch))
162 .first::<Option<i64>>(conn)
163 .map(|v| v.map(|v| v as u64))
164 })
165 .context("Failed reading latest epoch id from PostgresDB")
166 }
167
168 pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
170 let start = read_only_blocking!(&self.blocking_cp, |conn| {
173 protocol_configs::dsl::protocol_configs
174 .select(max(protocol_configs::protocol_version))
175 .first::<Option<i64>>(conn)
176 })
177 .context("Failed reading latest protocol version from PostgresDB")?
178 .map_or(1, |v| v + 1);
179
180 let end = read_only_blocking!(&self.blocking_cp, |conn| {
182 epochs::dsl::epochs
183 .select(max(epochs::protocol_version))
184 .first::<Option<i64>>(conn)
185 })
186 .context("Failed reading latest epoch protocol version from PostgresDB")?
187 .unwrap_or(1);
188 Ok((start, end))
189 }
190
191 pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
192 read_only_blocking!(&self.blocking_cp, |conn| {
193 chain_identifier::dsl::chain_identifier
194 .select(chain_identifier::checkpoint_digest)
195 .first::<Vec<u8>>(conn)
196 .optional()
197 })
198 .context("Failed reading chain id from PostgresDB")
199 }
200
201 fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
202 read_only_blocking!(&self.blocking_cp, |conn| {
203 checkpoints::dsl::checkpoints
204 .select(max(checkpoints::sequence_number))
205 .first::<Option<i64>>(conn)
206 .map(|v| v.map(|v| v as u64))
207 })
208 .context("Failed reading latest checkpoint sequence number from PostgresDB")
209 }
210
211 fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
212 read_only_blocking!(&self.blocking_cp, |conn| {
213 checkpoints::dsl::checkpoints
214 .select((
215 min(checkpoints::sequence_number),
216 max(checkpoints::sequence_number),
217 ))
218 .first::<(Option<i64>, Option<i64>)>(conn)
219 .map(|(min, max)| {
220 (
221 min.unwrap_or_default() as u64,
222 max.unwrap_or_default() as u64,
223 )
224 })
225 })
226 .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
227 }
228
229 fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
230 read_only_blocking!(&self.blocking_cp, |conn| {
231 epochs::dsl::epochs
232 .select((min(epochs::epoch), max(epochs::epoch)))
233 .first::<(Option<i64>, Option<i64>)>(conn)
234 .map(|(min, max)| {
235 (
236 min.unwrap_or_default() as u64,
237 max.unwrap_or_default() as u64,
238 )
239 })
240 })
241 .context("Failed reading min and max epoch numbers from PostgresDB")
242 }
243
244 fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
245 read_only_blocking!(&self.blocking_cp, |conn| {
246 pruner_cp_watermark::dsl::pruner_cp_watermark
247 .select(min(pruner_cp_watermark::checkpoint_sequence_number))
248 .first::<Option<i64>>(conn)
249 .map(|v| v.unwrap_or_default() as u64)
250 })
251 .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
252 }
253
254 fn get_checkpoint_range_for_epoch(
255 &self,
256 epoch: u64,
257 ) -> Result<(u64, Option<u64>), IndexerError> {
258 read_only_blocking!(&self.blocking_cp, |conn| {
259 epochs::dsl::epochs
260 .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
261 .filter(epochs::epoch.eq(epoch as i64))
262 .first::<(i64, Option<i64>)>(conn)
263 .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
264 })
265 .context("Failed reading checkpoint range from PostgresDB")
266 }
267
268 fn get_transaction_range_for_checkpoint(
269 &self,
270 checkpoint: u64,
271 ) -> Result<(u64, u64), IndexerError> {
272 read_only_blocking!(&self.blocking_cp, |conn| {
273 pruner_cp_watermark::dsl::pruner_cp_watermark
274 .select((
275 pruner_cp_watermark::min_tx_sequence_number,
276 pruner_cp_watermark::max_tx_sequence_number,
277 ))
278 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
279 .first::<(i64, i64)>(conn)
280 .map(|(min, max)| (min as u64, max as u64))
281 })
282 .context("Failed reading transaction range from PostgresDB")
283 }
284
285 fn get_latest_object_snapshot_checkpoint_sequence_number(
286 &self,
287 ) -> Result<Option<u64>, IndexerError> {
288 read_only_blocking!(&self.blocking_cp, |conn| {
289 objects_snapshot::dsl::objects_snapshot
290 .select(max(objects_snapshot::checkpoint_sequence_number))
291 .first::<Option<i64>>(conn)
292 .map(|v| v.map(|v| v as u64))
293 })
294 .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
295 }
296
297 fn persist_display_updates(
298 &self,
299 display_updates: BTreeMap<String, StoredDisplay>,
300 ) -> Result<(), IndexerError> {
301 transactional_blocking_with_retry!(
302 &self.blocking_cp,
303 {
304 let value = display_updates.values().collect::<Vec<_>>();
305 |conn| self.persist_displays_in_existing_transaction(conn, value)
306 },
307 PG_DB_COMMIT_SLEEP_DURATION
308 )?;
309
310 Ok(())
311 }
312
313 fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
314 let guard = self
315 .metrics
316 .checkpoint_db_commit_latency_objects_chunks
317 .start_timer();
318 let len = objects.len();
319 let raw_query = r#"
320 INSERT INTO objects (
321 object_id,
322 object_version,
323 object_digest,
324 owner_type,
325 owner_id,
326 object_type,
327 object_type_package,
328 object_type_module,
329 object_type_name,
330 serialized_object,
331 coin_type,
332 coin_balance,
333 df_kind
334 )
335 SELECT
336 u.object_id,
337 u.object_version,
338 u.object_digest,
339 u.owner_type,
340 u.owner_id,
341 u.object_type,
342 u.object_type_package,
343 u.object_type_module,
344 u.object_type_name,
345 u.serialized_object,
346 u.coin_type,
347 u.coin_balance,
348 u.df_kind
349 FROM UNNEST(
350 $1::BYTEA[],
351 $2::BIGINT[],
352 $3::BYTEA[],
353 $4::SMALLINT[],
354 $5::BYTEA[],
355 $6::TEXT[],
356 $7::BYTEA[],
357 $8::TEXT[],
358 $9::TEXT[],
359 $10::BYTEA[],
360 $11::TEXT[],
361 $12::BIGINT[],
362 $13::SMALLINT[],
363 $14::BYTEA[]
364 ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest)
365 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
366 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
367 ON CONFLICT (object_id) DO UPDATE
368 SET
369 object_version = EXCLUDED.object_version,
370 object_digest = EXCLUDED.object_digest,
371 owner_type = EXCLUDED.owner_type,
372 owner_id = EXCLUDED.owner_id,
373 object_type = EXCLUDED.object_type,
374 object_type_package = EXCLUDED.object_type_package,
375 object_type_module = EXCLUDED.object_type_module,
376 object_type_name = EXCLUDED.object_type_name,
377 serialized_object = EXCLUDED.serialized_object,
378 coin_type = EXCLUDED.coin_type,
379 coin_balance = EXCLUDED.coin_balance,
380 df_kind = EXCLUDED.df_kind
381 "#;
382 let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
383 .into_iter()
384 .map(LiveObject::split)
385 .map(|(indexed_object, tx_digest)| {
386 (
387 StoredObject::from(indexed_object),
388 tx_digest.into_inner().to_vec(),
389 )
390 })
391 .unzip();
392 let query = diesel::sql_query(raw_query)
393 .bind::<Array<Bytea>, _>(objects.object_ids)
394 .bind::<Array<BigInt>, _>(objects.object_versions)
395 .bind::<Array<Bytea>, _>(objects.object_digests)
396 .bind::<Array<SmallInt>, _>(objects.owner_types)
397 .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
398 .bind::<Array<Nullable<Text>>, _>(objects.object_types)
399 .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
400 .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
401 .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
402 .bind::<Array<Bytea>, _>(objects.serialized_objects)
403 .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
404 .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
405 .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
406 .bind::<Array<Bytea>, _>(tx_digests);
407 transactional_blocking_with_retry!(
408 &self.blocking_cp,
409 |conn| {
410 query.clone().execute(conn)?;
411 Ok::<(), IndexerError>(())
412 },
413 PG_DB_COMMIT_SLEEP_DURATION
414 )
415 .tap_ok(|_| {
416 let elapsed = guard.stop_and_record();
417 info!(elapsed, "Persisted {len} chunked objects");
418 })
419 .tap_err(|e| {
420 tracing::error!("Failed to persist object mutations with error: {e}");
421 })
422 }
423
424 fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
425 let guard = self
426 .metrics
427 .checkpoint_db_commit_latency_objects_chunks
428 .start_timer();
429 let len = objects.len();
430 let raw_query = r#"
431 DELETE FROM objects
432 WHERE object_id IN (
433 SELECT u.object_id
434 FROM UNNEST(
435 $1::BYTEA[],
436 $2::BYTEA[]
437 ) AS u(object_id, tx_digest)
438 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
439 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
440 )
441 "#;
442 let (object_ids, tx_digests): (Vec<_>, Vec<_>) = objects
443 .into_iter()
444 .map(|removed_object| {
445 (
446 removed_object.object_id().to_vec(),
447 removed_object.transaction_digest.into_inner().to_vec(),
448 )
449 })
450 .unzip();
451 let query = diesel::sql_query(raw_query)
452 .bind::<Array<Bytea>, _>(object_ids)
453 .bind::<Array<Bytea>, _>(tx_digests);
454 transactional_blocking_with_retry!(
455 &self.blocking_cp,
456 |conn| {
457 query.clone().execute(conn)?;
458 Ok::<(), IndexerError>(())
459 },
460 PG_DB_COMMIT_SLEEP_DURATION
461 )
462 .tap_ok(|_| {
463 let elapsed = guard.stop_and_record();
464 info!(elapsed, "Deleted {len} chunked objects");
465 })
466 .tap_err(|e| {
467 tracing::error!("Failed to persist object deletions with error: {e}");
468 })
469 }
470
471 fn persist_object_mutation_chunk(
472 &self,
473 mutated_object_mutation_chunk: Vec<StoredObject>,
474 ) -> Result<(), IndexerError> {
475 let guard = self
476 .metrics
477 .checkpoint_db_commit_latency_objects_chunks
478 .start_timer();
479 let len = mutated_object_mutation_chunk.len();
480 transactional_blocking_with_retry!(
481 &self.blocking_cp,
482 |conn| {
483 self.persist_object_mutation_chunk_in_existing_transaction(
484 conn,
485 mutated_object_mutation_chunk.clone(),
486 )
487 },
488 PG_DB_COMMIT_SLEEP_DURATION
489 )
490 .tap_ok(|_| {
491 let elapsed = guard.stop_and_record();
492 info!(elapsed, "Persisted {} chunked objects", len);
493 })
494 .tap_err(|e| {
495 tracing::error!("Failed to persist object mutations with error: {}", e);
496 })
497 }
498
499 fn persist_object_mutation_chunk_in_existing_transaction(
500 &self,
501 conn: &mut PgConnection,
502 mutated_object_mutation_chunk: Vec<StoredObject>,
503 ) -> Result<(), IndexerError> {
504 on_conflict_do_update!(
505 objects::table,
506 mutated_object_mutation_chunk,
507 objects::object_id,
508 (
509 objects::object_id.eq(excluded(objects::object_id)),
510 objects::object_version.eq(excluded(objects::object_version)),
511 objects::object_digest.eq(excluded(objects::object_digest)),
512 objects::owner_type.eq(excluded(objects::owner_type)),
513 objects::owner_id.eq(excluded(objects::owner_id)),
514 objects::object_type.eq(excluded(objects::object_type)),
515 objects::serialized_object.eq(excluded(objects::serialized_object)),
516 objects::coin_type.eq(excluded(objects::coin_type)),
517 objects::coin_balance.eq(excluded(objects::coin_balance)),
518 objects::df_kind.eq(excluded(objects::df_kind)),
519 ),
520 conn
521 );
522 Ok::<(), IndexerError>(())
523 }
524
525 fn persist_object_deletion_chunk(
526 &self,
527 deleted_objects_chunk: Vec<StoredDeletedObject>,
528 ) -> Result<(), IndexerError> {
529 let guard = self
530 .metrics
531 .checkpoint_db_commit_latency_objects_chunks
532 .start_timer();
533 let len = deleted_objects_chunk.len();
534 transactional_blocking_with_retry!(
535 &self.blocking_cp,
536 |conn| {
537 self.persist_object_deletion_chunk_in_existing_transaction(
538 conn,
539 deleted_objects_chunk.clone(),
540 )
541 },
542 PG_DB_COMMIT_SLEEP_DURATION
543 )
544 .tap_ok(|_| {
545 let elapsed = guard.stop_and_record();
546 info!(elapsed, "Deleted {} chunked objects", len);
547 })
548 .tap_err(|e| {
549 tracing::error!("Failed to persist object deletions with error: {}", e);
550 })
551 }
552
553 fn persist_object_deletion_chunk_in_existing_transaction(
554 &self,
555 conn: &mut PgConnection,
556 deleted_objects_chunk: Vec<StoredDeletedObject>,
557 ) -> Result<(), IndexerError> {
558 diesel::delete(
559 objects::table.filter(
560 objects::object_id.eq_any(
561 deleted_objects_chunk
562 .iter()
563 .map(|o| o.object_id.clone())
564 .collect::<Vec<_>>(),
565 ),
566 ),
567 )
568 .execute(conn)
569 .map_err(IndexerError::from)
570 .context("Failed to write object deletion to PostgresDB")?;
571
572 Ok::<(), IndexerError>(())
573 }
574
575 fn backfill_objects_snapshot_chunk(
576 &self,
577 objects_snapshot: Vec<StoredObjectSnapshot>,
578 ) -> Result<(), IndexerError> {
579 let guard = self
580 .metrics
581 .checkpoint_db_commit_latency_objects_snapshot_chunks
582 .start_timer();
583 transactional_blocking_with_retry!(
584 &self.blocking_cp,
585 |conn| {
586 for objects_snapshot_chunk in
587 objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
588 {
589 on_conflict_do_update!(
590 objects_snapshot::table,
591 objects_snapshot_chunk,
592 objects_snapshot::object_id,
593 (
594 objects_snapshot::object_version
595 .eq(excluded(objects_snapshot::object_version)),
596 objects_snapshot::object_status
597 .eq(excluded(objects_snapshot::object_status)),
598 objects_snapshot::object_digest
599 .eq(excluded(objects_snapshot::object_digest)),
600 objects_snapshot::checkpoint_sequence_number
601 .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
602 objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
603 objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
604 objects_snapshot::object_type_package
605 .eq(excluded(objects_snapshot::object_type_package)),
606 objects_snapshot::object_type_module
607 .eq(excluded(objects_snapshot::object_type_module)),
608 objects_snapshot::object_type_name
609 .eq(excluded(objects_snapshot::object_type_name)),
610 objects_snapshot::object_type
611 .eq(excluded(objects_snapshot::object_type)),
612 objects_snapshot::serialized_object
613 .eq(excluded(objects_snapshot::serialized_object)),
614 objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
615 objects_snapshot::coin_balance
616 .eq(excluded(objects_snapshot::coin_balance)),
617 objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
618 ),
619 conn
620 );
621 }
622 Ok::<(), IndexerError>(())
623 },
624 PG_DB_COMMIT_SLEEP_DURATION
625 )
626 .tap_ok(|_| {
627 let elapsed = guard.stop_and_record();
628 info!(
629 elapsed,
630 "Persisted {} chunked objects snapshot",
631 objects_snapshot.len(),
632 );
633 })
634 .tap_err(|e| {
635 tracing::error!("Failed to persist object snapshot with error: {}", e);
636 })
637 }
638
639 fn persist_objects_history_chunk(
640 &self,
641 stored_objects_history: Vec<StoredHistoryObject>,
642 ) -> Result<(), IndexerError> {
643 let guard = self
644 .metrics
645 .checkpoint_db_commit_latency_objects_history_chunks
646 .start_timer();
647 transactional_blocking_with_retry!(
648 &self.blocking_cp,
649 |conn| {
650 for stored_objects_history_chunk in
651 stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
652 {
653 insert_or_ignore_into!(
654 objects_history::table,
655 stored_objects_history_chunk,
656 conn
657 );
658 }
659 Ok::<(), IndexerError>(())
660 },
661 PG_DB_COMMIT_SLEEP_DURATION
662 )
663 .tap_ok(|_| {
664 let elapsed = guard.stop_and_record();
665 info!(
666 elapsed,
667 "Persisted {} chunked objects history",
668 stored_objects_history.len(),
669 );
670 })
671 .tap_err(|e| {
672 tracing::error!("Failed to persist object history with error: {}", e);
673 })
674 }
675
676 fn persist_object_version_chunk(
677 &self,
678 object_versions: Vec<StoredObjectVersion>,
679 ) -> Result<(), IndexerError> {
680 let guard = self
681 .metrics
682 .checkpoint_db_commit_latency_objects_version_chunks
683 .start_timer();
684
685 transactional_blocking_with_retry!(
686 &self.blocking_cp,
687 |conn| {
688 for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
689 {
690 insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
691 }
692 Ok::<(), IndexerError>(())
693 },
694 PG_DB_COMMIT_SLEEP_DURATION
695 )
696 .tap_ok(|_| {
697 let elapsed = guard.stop_and_record();
698 info!(
699 elapsed,
700 "Persisted {} chunked object versions",
701 object_versions.len(),
702 );
703 })
704 .tap_err(|e| {
705 tracing::error!("Failed to persist object versions with error: {e}");
706 })
707 }
708
709 fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
710 let Some(first_checkpoint) = checkpoints.first() else {
711 return Ok(());
712 };
713
714 if first_checkpoint.sequence_number == 0 {
717 let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
718 self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
719 transactional_blocking_with_retry!(
720 &self.blocking_cp,
721 |conn| {
722 let checkpoint_digest =
723 first_checkpoint.checkpoint_digest.into_inner().to_vec();
724 insert_or_ignore_into!(
725 chain_identifier::table,
726 StoredChainIdentifier { checkpoint_digest },
727 conn
728 );
729 Ok::<(), IndexerError>(())
730 },
731 PG_DB_COMMIT_SLEEP_DURATION
732 )?;
733 }
734 let guard = self
735 .metrics
736 .checkpoint_db_commit_latency_checkpoints
737 .start_timer();
738
739 let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
740 transactional_blocking_with_retry!(
741 &self.blocking_cp,
742 |conn| {
743 for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
744 insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
745 }
746 Ok::<(), IndexerError>(())
747 },
748 PG_DB_COMMIT_SLEEP_DURATION
749 )
750 .tap_ok(|_| {
751 info!(
752 "Persisted {} pruner_cp_watermark rows.",
753 stored_cp_txs.len(),
754 );
755 })
756 .tap_err(|e| {
757 tracing::error!("Failed to persist pruner_cp_watermark with error: {}", e);
758 })?;
759
760 let stored_checkpoints = checkpoints
761 .iter()
762 .map(StoredCheckpoint::from)
763 .collect::<Vec<_>>();
764 transactional_blocking_with_retry!(
765 &self.blocking_cp,
766 |conn| {
767 for stored_checkpoint_chunk in
768 stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
769 {
770 insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
771 let time_now_ms = chrono::Utc::now().timestamp_millis();
772 for stored_checkpoint in stored_checkpoint_chunk {
773 self.metrics
774 .db_commit_lag_ms
775 .set(time_now_ms - stored_checkpoint.timestamp_ms);
776 self.metrics.max_committed_checkpoint_sequence_number.set(
777 stored_checkpoint.sequence_number,
778 );
779 self.metrics.committed_checkpoint_timestamp_ms.set(
780 stored_checkpoint.timestamp_ms,
781 );
782 }
783 for stored_checkpoint in stored_checkpoint_chunk {
784 info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
785 }
786 }
787 Ok::<(), IndexerError>(())
788 },
789 PG_DB_COMMIT_SLEEP_DURATION
790 )
791 .tap_ok(|_| {
792 let elapsed = guard.stop_and_record();
793 info!(
794 elapsed,
795 "Persisted {} checkpoints",
796 stored_checkpoints.len()
797 );
798 })
799 .tap_err(|e| {
800 tracing::error!("Failed to persist checkpoints with error: {}", e);
801 })
802 }
803
804 fn persist_transactions_chunk(
805 &self,
806 transactions: Vec<IndexedTransaction>,
807 ) -> Result<(), IndexerError> {
808 let guard = self
809 .metrics
810 .checkpoint_db_commit_latency_transactions_chunks
811 .start_timer();
812 let transformation_guard = self
813 .metrics
814 .checkpoint_db_commit_latency_transactions_chunks_transformation
815 .start_timer();
816 let transactions = transactions
817 .iter()
818 .map(StoredTransaction::from)
819 .collect::<Vec<_>>();
820 drop(transformation_guard);
821
822 transactional_blocking_with_retry!(
823 &self.blocking_cp,
824 |conn| {
825 for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
826 insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
827 }
828 Ok::<(), IndexerError>(())
829 },
830 PG_DB_COMMIT_SLEEP_DURATION
831 )
832 .tap_ok(|_| {
833 let elapsed = guard.stop_and_record();
834 info!(
835 elapsed,
836 "Persisted {} chunked transactions",
837 transactions.len()
838 );
839 })
840 .tap_err(|e| {
841 tracing::error!("Failed to persist transactions with error: {}", e);
842 })
843 }
844
845 fn persist_tx_global_order_chunk(
846 &self,
847 tx_order: Vec<CheckpointTxGlobalOrder>,
848 ) -> Result<(), IndexerError> {
849 let guard = self
850 .metrics
851 .checkpoint_db_commit_latency_tx_insertion_order_chunks
852 .start_timer();
853
854 transactional_blocking_with_retry!(
855 &self.blocking_cp,
856 |conn| {
857 for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
858 insert_or_ignore_into!(tx_global_order::table, tx_order_chunk, conn);
859 }
860 Ok::<(), IndexerError>(())
861 },
862 PG_DB_COMMIT_SLEEP_DURATION
863 )
864 .tap_ok(|_| {
865 let elapsed = guard.stop_and_record();
866 info!(
867 elapsed,
868 "Persisted {} chunked txs insertion order",
869 tx_order.len()
870 );
871 })
872 .tap_err(|e| {
873 tracing::error!("Failed to persist txs insertion order with error: {e}");
874 })
875 }
876
877 fn update_status_for_checkpoint_transactions_chunk(
884 &self,
885 tx_order: Vec<CheckpointTxGlobalOrder>,
886 ) -> Result<(), IndexerError> {
887 let guard = self
888 .metrics
889 .checkpoint_db_commit_latency_tx_insertion_order_chunks
890 .start_timer();
891
892 let num_transactions = tx_order.len();
893 transactional_blocking_with_retry!(
894 &self.blocking_cp,
895 |conn| {
896 on_conflict_do_update_with_condition!(
897 tx_global_order::table,
898 tx_order.clone(),
899 tx_global_order::tx_digest,
900 tx_global_order::optimistic_sequence_number.eq(IndexStatus::Completed),
901 tx_global_order::optimistic_sequence_number.eq(IndexStatus::Started),
902 conn
903 );
904 on_conflict_do_update_with_condition!(
905 tx_global_order::table,
906 tx_order.clone(),
907 tx_global_order::tx_digest,
908 tx_global_order::chk_tx_sequence_number
909 .eq(excluded(tx_global_order::chk_tx_sequence_number)),
910 tx_global_order::chk_tx_sequence_number.is_null(),
911 conn
912 );
913 Ok::<(), IndexerError>(())
914 },
915 PG_DB_COMMIT_SLEEP_DURATION
916 )
917 .tap_ok(|_| {
918 let elapsed = guard.stop_and_record();
919 info!(
920 elapsed,
921 "Updated {} chunked values of `tx_global_order`", num_transactions
922 );
923 })
924 .tap_err(|e| {
925 tracing::error!("Failed to update `tx_global_order` with error: {e}");
926 })
927 }
928
929 fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
930 let guard = self
931 .metrics
932 .checkpoint_db_commit_latency_events_chunks
933 .start_timer();
934 let len = events.len();
935 let events = events
936 .into_iter()
937 .map(StoredEvent::from)
938 .collect::<Vec<_>>();
939
940 transactional_blocking_with_retry!(
941 &self.blocking_cp,
942 |conn| {
943 for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
944 insert_or_ignore_into!(events::table, event_chunk, conn);
945 }
946 Ok::<(), IndexerError>(())
947 },
948 PG_DB_COMMIT_SLEEP_DURATION
949 )
950 .tap_ok(|_| {
951 let elapsed = guard.stop_and_record();
952 info!(elapsed, "Persisted {} chunked events", len);
953 })
954 .tap_err(|e| {
955 tracing::error!("Failed to persist events with error: {}", e);
956 })
957 }
958
959 fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
960 if packages.is_empty() {
961 return Ok(());
962 }
963 let guard = self
964 .metrics
965 .checkpoint_db_commit_latency_packages
966 .start_timer();
967 let packages = packages
968 .into_iter()
969 .map(StoredPackage::from)
970 .collect::<Vec<_>>();
971 transactional_blocking_with_retry!(
972 &self.blocking_cp,
973 |conn| {
974 for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
975 on_conflict_do_update!(
976 packages::table,
977 packages_chunk,
978 packages::package_id,
979 (
980 packages::package_id.eq(excluded(packages::package_id)),
981 packages::move_package.eq(excluded(packages::move_package)),
982 ),
983 conn
984 );
985 }
986 Ok::<(), IndexerError>(())
987 },
988 PG_DB_COMMIT_SLEEP_DURATION
989 )
990 .tap_ok(|_| {
991 let elapsed = guard.stop_and_record();
992 info!(elapsed, "Persisted {} packages", packages.len());
993 })
994 .tap_err(|e| {
995 tracing::error!("Failed to persist packages with error: {}", e);
996 })
997 }
998
999 async fn persist_event_indices_chunk(
1000 &self,
1001 indices: Vec<EventIndex>,
1002 ) -> Result<(), IndexerError> {
1003 let guard = self
1004 .metrics
1005 .checkpoint_db_commit_latency_event_indices_chunks
1006 .start_timer();
1007 let len = indices.len();
1008 let (
1009 event_emit_packages,
1010 event_emit_modules,
1011 event_senders,
1012 event_struct_packages,
1013 event_struct_modules,
1014 event_struct_names,
1015 event_struct_instantiations,
1016 ) = indices.into_iter().map(|i| i.split()).fold(
1017 (
1018 Vec::new(),
1019 Vec::new(),
1020 Vec::new(),
1021 Vec::new(),
1022 Vec::new(),
1023 Vec::new(),
1024 Vec::new(),
1025 ),
1026 |(
1027 mut event_emit_packages,
1028 mut event_emit_modules,
1029 mut event_senders,
1030 mut event_struct_packages,
1031 mut event_struct_modules,
1032 mut event_struct_names,
1033 mut event_struct_instantiations,
1034 ),
1035 index| {
1036 event_emit_packages.push(index.0);
1037 event_emit_modules.push(index.1);
1038 event_senders.push(index.2);
1039 event_struct_packages.push(index.3);
1040 event_struct_modules.push(index.4);
1041 event_struct_names.push(index.5);
1042 event_struct_instantiations.push(index.6);
1043 (
1044 event_emit_packages,
1045 event_emit_modules,
1046 event_senders,
1047 event_struct_packages,
1048 event_struct_modules,
1049 event_struct_names,
1050 event_struct_instantiations,
1051 )
1052 },
1053 );
1054
1055 let mut futures = vec![];
1057 futures.push(self.spawn_blocking_task(move |this| {
1058 persist_chunk_into_table!(
1059 event_emit_package::table,
1060 event_emit_packages,
1061 &this.blocking_cp
1062 )
1063 }));
1064
1065 futures.push(self.spawn_blocking_task(move |this| {
1066 persist_chunk_into_table!(
1067 event_emit_module::table,
1068 event_emit_modules,
1069 &this.blocking_cp
1070 )
1071 }));
1072
1073 futures.push(self.spawn_blocking_task(move |this| {
1074 persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
1075 }));
1076
1077 futures.push(self.spawn_blocking_task(move |this| {
1078 persist_chunk_into_table!(
1079 event_struct_package::table,
1080 event_struct_packages,
1081 &this.blocking_cp
1082 )
1083 }));
1084
1085 futures.push(self.spawn_blocking_task(move |this| {
1086 persist_chunk_into_table!(
1087 event_struct_module::table,
1088 event_struct_modules,
1089 &this.blocking_cp
1090 )
1091 }));
1092
1093 futures.push(self.spawn_blocking_task(move |this| {
1094 persist_chunk_into_table!(
1095 event_struct_name::table,
1096 event_struct_names,
1097 &this.blocking_cp
1098 )
1099 }));
1100
1101 futures.push(self.spawn_blocking_task(move |this| {
1102 persist_chunk_into_table!(
1103 event_struct_instantiation::table,
1104 event_struct_instantiations,
1105 &this.blocking_cp
1106 )
1107 }));
1108
1109 futures::future::try_join_all(futures)
1110 .await
1111 .map_err(|e| {
1112 tracing::error!("Failed to join event indices futures in a chunk: {}", e);
1113 IndexerError::from(e)
1114 })?
1115 .into_iter()
1116 .collect::<Result<Vec<_>, _>>()
1117 .map_err(|e| {
1118 IndexerError::PostgresWrite(format!(
1119 "Failed to persist all event indices in a chunk: {e:?}"
1120 ))
1121 })?;
1122 let elapsed = guard.stop_and_record();
1123 info!(elapsed, "Persisted {} chunked event indices", len);
1124 Ok(())
1125 }
1126
1127 async fn persist_tx_indices_chunk(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1128 let guard = self
1129 .metrics
1130 .checkpoint_db_commit_latency_tx_indices_chunks
1131 .start_timer();
1132 let len = indices.len();
1133 let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) =
1134 indices.into_iter().map(|i| i.split()).fold(
1135 (
1136 Vec::new(),
1137 Vec::new(),
1138 Vec::new(),
1139 Vec::new(),
1140 Vec::new(),
1141 Vec::new(),
1142 Vec::new(),
1143 Vec::new(),
1144 Vec::new(),
1145 ),
1146 |(
1147 mut tx_senders,
1148 mut tx_recipients,
1149 mut tx_input_objects,
1150 mut tx_changed_objects,
1151 mut tx_pkgs,
1152 mut tx_mods,
1153 mut tx_funs,
1154 mut tx_digests,
1155 mut tx_kinds,
1156 ),
1157 index| {
1158 tx_senders.extend(index.0);
1159 tx_recipients.extend(index.1);
1160 tx_input_objects.extend(index.2);
1161 tx_changed_objects.extend(index.3);
1162 tx_pkgs.extend(index.4);
1163 tx_mods.extend(index.5);
1164 tx_funs.extend(index.6);
1165 tx_digests.extend(index.7);
1166 tx_kinds.extend(index.8);
1167 (
1168 tx_senders,
1169 tx_recipients,
1170 tx_input_objects,
1171 tx_changed_objects,
1172 tx_pkgs,
1173 tx_mods,
1174 tx_funs,
1175 tx_digests,
1176 tx_kinds,
1177 )
1178 },
1179 );
1180
1181 let futures = [
1182 self.spawn_blocking_task(move |this| {
1183 persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1184 }),
1185 self.spawn_blocking_task(move |this| {
1186 persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1187 }),
1188 self.spawn_blocking_task(move |this| {
1189 persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1190 }),
1191 self.spawn_blocking_task(move |this| {
1192 persist_chunk_into_table!(
1193 tx_changed_objects::table,
1194 changed_objects,
1195 &this.blocking_cp
1196 )
1197 }),
1198 self.spawn_blocking_task(move |this| {
1199 persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1200 }),
1201 self.spawn_blocking_task(move |this| {
1202 persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1203 }),
1204 self.spawn_blocking_task(move |this| {
1205 persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1206 }),
1207 self.spawn_blocking_task(move |this| {
1208 persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1209 }),
1210 self.spawn_blocking_task(move |this| {
1211 persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1212 }),
1213 ];
1214
1215 futures::future::try_join_all(futures)
1216 .await
1217 .map_err(|e| {
1218 tracing::error!("Failed to join tx indices futures in a chunk: {}", e);
1219 IndexerError::from(e)
1220 })?
1221 .into_iter()
1222 .collect::<Result<Vec<_>, _>>()
1223 .map_err(|e| {
1224 IndexerError::PostgresWrite(format!(
1225 "Failed to persist all tx indices in a chunk: {e:?}"
1226 ))
1227 })?;
1228 let elapsed = guard.stop_and_record();
1229 info!(elapsed, "Persisted {} chunked tx_indices", len);
1230 Ok(())
1231 }
1232
1233 async fn persist_tx_indices_chunk_v2(
1234 &self,
1235 indices: Vec<TxIndexV2>,
1236 ) -> Result<(), IndexerError> {
1237 let guard = self
1238 .metrics
1239 .checkpoint_db_commit_latency_tx_indices_chunks
1240 .start_timer();
1241 let len = indices.len();
1242
1243 let splits: Vec<TxIndexV2Split> = indices.into_iter().map(|i| i.split()).collect();
1244
1245 let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1246 let recipients: Vec<_> = splits
1247 .iter()
1248 .flat_map(|ix| ix.tx_recipients.clone())
1249 .collect();
1250 let input_objects: Vec<_> = splits
1251 .iter()
1252 .flat_map(|ix| ix.tx_input_objects.clone())
1253 .collect();
1254 let changed_objects: Vec<_> = splits
1255 .iter()
1256 .flat_map(|ix| ix.tx_changed_objects.clone())
1257 .collect();
1258 let wrapped_or_deleted_objects: Vec<_> = splits
1259 .iter()
1260 .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1261 .collect();
1262 let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1263 let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1264 let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1265 let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1266 let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1267
1268 let futures = [
1269 self.spawn_blocking_task(move |this| {
1270 persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1271 }),
1272 self.spawn_blocking_task(move |this| {
1273 persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1274 }),
1275 self.spawn_blocking_task(move |this| {
1276 persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1277 }),
1278 self.spawn_blocking_task(move |this| {
1279 persist_chunk_into_table!(
1280 tx_changed_objects::table,
1281 changed_objects,
1282 &this.blocking_cp
1283 )
1284 }),
1285 self.spawn_blocking_task(move |this| {
1286 persist_chunk_into_table!(
1287 tx_wrapped_or_deleted_objects::table,
1288 wrapped_or_deleted_objects,
1289 &this.blocking_cp
1290 )
1291 }),
1292 self.spawn_blocking_task(move |this| {
1293 persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1294 }),
1295 self.spawn_blocking_task(move |this| {
1296 persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1297 }),
1298 self.spawn_blocking_task(move |this| {
1299 persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1300 }),
1301 self.spawn_blocking_task(move |this| {
1302 persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1303 }),
1304 self.spawn_blocking_task(move |this| {
1305 persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1306 }),
1307 ];
1308
1309 futures::future::try_join_all(futures)
1310 .await
1311 .map_err(|e| {
1312 tracing::error!("Failed to join tx indices futures in a chunk: {}", e);
1313 IndexerError::from(e)
1314 })?
1315 .into_iter()
1316 .collect::<Result<Vec<_>, _>>()
1317 .map_err(|e| {
1318 IndexerError::PostgresWrite(format!(
1319 "Failed to persist all tx indices in a chunk: {e:?}"
1320 ))
1321 })?;
1322 let elapsed = guard.stop_and_record();
1323 info!(elapsed, "Persisted {} chunked tx_indices", len);
1324 Ok(())
1325 }
1326
1327 fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1328 let guard = self
1329 .metrics
1330 .checkpoint_db_commit_latency_epoch
1331 .start_timer();
1332 let epoch_id = epoch.new_epoch.epoch;
1333
1334 transactional_blocking_with_retry!(
1335 &self.blocking_cp,
1336 |conn| {
1337 if let Some(last_epoch) = &epoch.last_epoch {
1338 info!(last_epoch.epoch, "Persisting epoch end data.");
1339 diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1340 .set(last_epoch)
1341 .execute(conn)?;
1342 }
1343
1344 info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1345 insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1346 Ok::<(), IndexerError>(())
1347 },
1348 PG_DB_COMMIT_SLEEP_DURATION
1349 )
1350 .tap_ok(|_| {
1351 let elapsed = guard.stop_and_record();
1352 info!(elapsed, epoch_id, "Persisted epoch beginning info");
1353 })
1354 .tap_err(|e| {
1355 tracing::error!("Failed to persist epoch with error: {}", e);
1356 })
1357 }
1358
1359 fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1360 let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1361 if let Some(last_epoch_id) = last_epoch_id {
1363 let last_db_epoch: Option<StoredEpochInfo> =
1364 read_only_blocking!(&self.blocking_cp, |conn| {
1365 epochs::table
1366 .filter(epochs::epoch.eq(last_epoch_id))
1367 .first::<StoredEpochInfo>(conn)
1368 .optional()
1369 })
1370 .context("Failed to read last epoch from PostgresDB")?;
1371 if let Some(last_epoch) = last_db_epoch {
1372 let epoch_partition_data =
1373 EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1374 let table_partitions = self.partition_manager.get_table_partitions()?;
1375 for (table, (_, last_partition)) in table_partitions {
1376 if !self
1378 .partition_manager
1379 .get_strategy(&table)
1380 .is_epoch_partitioned()
1381 {
1382 continue;
1383 }
1384 let guard = self.metrics.advance_epoch_latency.start_timer();
1385 self.partition_manager.advance_epoch(
1386 table.clone(),
1387 last_partition,
1388 &epoch_partition_data,
1389 )?;
1390 let elapsed = guard.stop_and_record();
1391 info!(
1392 elapsed,
1393 "Advanced epoch partition {} for table {}",
1394 last_partition,
1395 table.clone()
1396 );
1397 }
1398 } else {
1399 tracing::error!("Last epoch: {} from PostgresDB is None.", last_epoch_id);
1400 }
1401 }
1402
1403 Ok(())
1404 }
1405
1406 fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1407 transactional_blocking_with_retry!(
1408 &self.blocking_cp,
1409 |conn| {
1410 diesel::delete(
1411 checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1412 )
1413 .execute(conn)
1414 .map_err(IndexerError::from)
1415 .context("Failed to prune checkpoints table")?;
1416
1417 Ok::<(), IndexerError>(())
1418 },
1419 PG_DB_COMMIT_SLEEP_DURATION
1420 )
1421 }
1422
1423 fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1424 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1425 transactional_blocking_with_retry!(
1426 &self.blocking_cp,
1427 |conn| {
1428 prune_tx_or_event_indice_table!(
1429 event_emit_module,
1430 conn,
1431 min_tx,
1432 max_tx,
1433 "Failed to prune event_emit_module table"
1434 );
1435 prune_tx_or_event_indice_table!(
1436 event_emit_package,
1437 conn,
1438 min_tx,
1439 max_tx,
1440 "Failed to prune event_emit_package table"
1441 );
1442 prune_tx_or_event_indice_table![
1443 event_senders,
1444 conn,
1445 min_tx,
1446 max_tx,
1447 "Failed to prune event_senders table"
1448 ];
1449 prune_tx_or_event_indice_table![
1450 event_struct_instantiation,
1451 conn,
1452 min_tx,
1453 max_tx,
1454 "Failed to prune event_struct_instantiation table"
1455 ];
1456 prune_tx_or_event_indice_table![
1457 event_struct_module,
1458 conn,
1459 min_tx,
1460 max_tx,
1461 "Failed to prune event_struct_module table"
1462 ];
1463 prune_tx_or_event_indice_table![
1464 event_struct_name,
1465 conn,
1466 min_tx,
1467 max_tx,
1468 "Failed to prune event_struct_name table"
1469 ];
1470 prune_tx_or_event_indice_table![
1471 event_struct_package,
1472 conn,
1473 min_tx,
1474 max_tx,
1475 "Failed to prune event_struct_package table"
1476 ];
1477 Ok::<(), IndexerError>(())
1478 },
1479 PG_DB_COMMIT_SLEEP_DURATION
1480 )
1481 }
1482
1483 fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1484 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1485 transactional_blocking_with_retry!(
1486 &self.blocking_cp,
1487 |conn| {
1488 prune_tx_or_event_indice_table!(
1489 tx_senders,
1490 conn,
1491 min_tx,
1492 max_tx,
1493 "Failed to prune tx_senders table"
1494 );
1495 prune_tx_or_event_indice_table!(
1496 tx_recipients,
1497 conn,
1498 min_tx,
1499 max_tx,
1500 "Failed to prune tx_recipients table"
1501 );
1502 prune_tx_or_event_indice_table![
1503 tx_input_objects,
1504 conn,
1505 min_tx,
1506 max_tx,
1507 "Failed to prune tx_input_objects table"
1508 ];
1509 prune_tx_or_event_indice_table![
1510 tx_changed_objects,
1511 conn,
1512 min_tx,
1513 max_tx,
1514 "Failed to prune tx_changed_objects table"
1515 ];
1516 prune_tx_or_event_indice_table![
1517 tx_wrapped_or_deleted_objects,
1518 conn,
1519 min_tx,
1520 max_tx,
1521 "Failed to prune tx_wrapped_or_deleted_objects table"
1522 ];
1523 prune_tx_or_event_indice_table![
1524 tx_calls_pkg,
1525 conn,
1526 min_tx,
1527 max_tx,
1528 "Failed to prune tx_calls_pkg table"
1529 ];
1530 prune_tx_or_event_indice_table![
1531 tx_calls_mod,
1532 conn,
1533 min_tx,
1534 max_tx,
1535 "Failed to prune tx_calls_mod table"
1536 ];
1537 prune_tx_or_event_indice_table![
1538 tx_calls_fun,
1539 conn,
1540 min_tx,
1541 max_tx,
1542 "Failed to prune tx_calls_fun table"
1543 ];
1544 prune_tx_or_event_indice_table![
1545 tx_digests,
1546 conn,
1547 min_tx,
1548 max_tx,
1549 "Failed to prune tx_digests table"
1550 ];
1551 Ok::<(), IndexerError>(())
1552 },
1553 PG_DB_COMMIT_SLEEP_DURATION
1554 )
1555 }
1556
1557 fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1558 transactional_blocking_with_retry!(
1559 &self.blocking_cp,
1560 |conn| {
1561 diesel::delete(
1562 pruner_cp_watermark::table
1563 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1564 )
1565 .execute(conn)
1566 .map_err(IndexerError::from)
1567 .context("Failed to prune pruner_cp_watermark table")?;
1568 Ok::<(), IndexerError>(())
1569 },
1570 PG_DB_COMMIT_SLEEP_DURATION
1571 )
1572 }
1573
1574 fn get_network_total_transactions_by_end_of_epoch(
1575 &self,
1576 epoch: u64,
1577 ) -> Result<Option<u64>, IndexerError> {
1578 read_only_blocking!(&self.blocking_cp, |conn| {
1579 epochs::table
1580 .filter(epochs::epoch.eq(epoch as i64))
1581 .select(epochs::network_total_transactions)
1582 .get_result::<Option<i64>>(conn)
1583 })
1584 .context("Failed to get network total transactions in epoch")
1585 .map(|option| option.map(|v| v as u64))
1586 }
1587
1588 fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1589 transactional_blocking_with_retry!(
1590 &self.blocking_cp,
1591 |conn| {
1592 diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1593 .execute(conn)?;
1594 Ok::<(), IndexerError>(())
1595 },
1596 PG_DB_COMMIT_SLEEP_DURATION
1597 )
1598 .tap_ok(|_| {
1599 info!("Successfully refreshed participation_metrics");
1600 })
1601 .tap_err(|e| {
1602 tracing::error!("Failed to refresh participation_metrics: {e}");
1603 })
1604 }
1605
1606 async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1607 where
1608 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1609 R: Send + 'static,
1610 {
1611 let this = self.clone();
1612 let current_span = tracing::Span::current();
1613 tokio::task::spawn_blocking(move || {
1614 let _guard = current_span.enter();
1615 f(this)
1616 })
1617 .await
1618 .map_err(Into::into)
1619 .and_then(std::convert::identity)
1620 }
1621
1622 fn spawn_blocking_task<F, R>(
1623 &self,
1624 f: F,
1625 ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1626 where
1627 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1628 R: Send + 'static,
1629 {
1630 let this = self.clone();
1631 let current_span = tracing::Span::current();
1632 let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1633 tokio::task::spawn_blocking(move || {
1634 let _guard = current_span.enter();
1635 let _elapsed = guard.stop_and_record();
1636 f(this)
1637 })
1638 }
1639
1640 fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1641 where
1642 F: FnOnce(Self) -> Fut + Send + 'static,
1643 Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1644 R: Send + 'static,
1645 {
1646 let this = self.clone();
1647 tokio::task::spawn(async move { f(this).await })
1648 }
1649}
1650
1651#[async_trait]
1652impl IndexerStore for PgIndexerStore {
1653 async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1654 self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1655 .await
1656 }
1657
1658 async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1659 self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1660 .await
1661 }
1662
1663 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1664 self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1665 .await
1666 }
1667
1668 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1669 self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1670 .await
1671 }
1672
1673 async fn get_latest_object_snapshot_checkpoint_sequence_number(
1674 &self,
1675 ) -> Result<Option<u64>, IndexerError> {
1676 self.execute_in_blocking_worker(|this| {
1677 this.get_latest_object_snapshot_checkpoint_sequence_number()
1678 })
1679 .await
1680 }
1681
1682 async fn persist_objects(
1683 &self,
1684 object_changes: Vec<TransactionObjectChangesToCommit>,
1685 ) -> Result<(), IndexerError> {
1686 if object_changes.is_empty() {
1687 return Ok(());
1688 }
1689 let guard = self
1690 .metrics
1691 .checkpoint_db_commit_latency_objects
1692 .start_timer();
1693 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1694 let object_mutations = indexed_mutations
1695 .into_iter()
1696 .map(StoredObject::from)
1697 .collect::<Vec<_>>();
1698 let object_deletions = indexed_deletions
1699 .into_iter()
1700 .map(StoredDeletedObject::from)
1701 .collect::<Vec<_>>();
1702 let mutation_len = object_mutations.len();
1703 let deletion_len = object_deletions.len();
1704
1705 let object_mutation_chunks =
1706 chunk!(object_mutations, self.config.parallel_objects_chunk_size);
1707 let object_deletion_chunks =
1708 chunk!(object_deletions, self.config.parallel_objects_chunk_size);
1709 let mutation_futures = object_mutation_chunks
1710 .into_iter()
1711 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_mutation_chunk(c)));
1712 futures::future::try_join_all(mutation_futures)
1713 .await
1714 .map_err(|e| {
1715 tracing::error!(
1716 "Failed to join persist_object_mutation_chunk futures: {}",
1717 e
1718 );
1719 IndexerError::from(e)
1720 })?
1721 .into_iter()
1722 .collect::<Result<Vec<_>, _>>()
1723 .map_err(|e| {
1724 IndexerError::PostgresWrite(format!(
1725 "Failed to persist all object mutation chunks: {e:?}"
1726 ))
1727 })?;
1728 let deletion_futures = object_deletion_chunks
1729 .into_iter()
1730 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_deletion_chunk(c)));
1731 futures::future::try_join_all(deletion_futures)
1732 .await
1733 .map_err(|e| {
1734 tracing::error!(
1735 "Failed to join persist_object_deletion_chunk futures: {}",
1736 e
1737 );
1738 IndexerError::from(e)
1739 })?
1740 .into_iter()
1741 .collect::<Result<Vec<_>, _>>()
1742 .map_err(|e| {
1743 IndexerError::PostgresWrite(format!(
1744 "Failed to persist all object deletion chunks: {e:?}"
1745 ))
1746 })?;
1747
1748 let elapsed = guard.stop_and_record();
1749 info!(
1750 elapsed,
1751 "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
1752 );
1753 Ok(())
1754 }
1755
1756 fn persist_objects_in_existing_transaction(
1757 &self,
1758 conn: &mut PgConnection,
1759 object_changes: Vec<TransactionObjectChangesToCommit>,
1760 ) -> Result<(), IndexerError> {
1761 if object_changes.is_empty() {
1762 return Ok(());
1763 }
1764
1765 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1766 let object_mutations = indexed_mutations
1767 .into_iter()
1768 .map(StoredObject::from)
1769 .collect::<Vec<_>>();
1770 let object_deletions = indexed_deletions
1771 .into_iter()
1772 .map(StoredDeletedObject::from)
1773 .collect::<Vec<_>>();
1774
1775 self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1776 self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1777
1778 Ok(())
1779 }
1780
1781 async fn persist_objects_snapshot(
1782 &self,
1783 object_changes: Vec<TransactionObjectChangesToCommit>,
1784 ) -> Result<(), IndexerError> {
1785 if object_changes.is_empty() {
1786 return Ok(());
1787 }
1788 let guard = self
1789 .metrics
1790 .checkpoint_db_commit_latency_objects_snapshot
1791 .start_timer();
1792 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1793 let objects_snapshot = indexed_mutations
1794 .into_iter()
1795 .map(StoredObjectSnapshot::from)
1796 .chain(
1797 indexed_deletions
1798 .into_iter()
1799 .map(StoredObjectSnapshot::from),
1800 )
1801 .collect::<Vec<_>>();
1802 let len = objects_snapshot.len();
1803 let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1804 let futures = chunks
1805 .into_iter()
1806 .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1807
1808 futures::future::try_join_all(futures)
1809 .await
1810 .map_err(|e| {
1811 tracing::error!(
1812 "Failed to join backfill_objects_snapshot_chunk futures: {}",
1813 e
1814 );
1815 IndexerError::from(e)
1816 })?
1817 .into_iter()
1818 .collect::<Result<Vec<_>, _>>()
1819 .map_err(|e| {
1820 IndexerError::PostgresWrite(format!(
1821 "Failed to persist all objects snapshot chunks: {e:?}"
1822 ))
1823 })?;
1824 let elapsed = guard.stop_and_record();
1825 info!(elapsed, "Persisted {} objects snapshot", len);
1826 Ok(())
1827 }
1828
1829 async fn persist_object_history(
1830 &self,
1831 object_changes: Vec<TransactionObjectChangesToCommit>,
1832 ) -> Result<(), IndexerError> {
1833 let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1834 .map(|val| val.eq_ignore_ascii_case("true"))
1835 .unwrap_or(false);
1836 if skip_history {
1837 info!("skipping object history");
1838 return Ok(());
1839 }
1840
1841 if object_changes.is_empty() {
1842 return Ok(());
1843 }
1844 let objects = make_objects_history_to_commit(object_changes);
1845 let guard = self
1846 .metrics
1847 .checkpoint_db_commit_latency_objects_history
1848 .start_timer();
1849
1850 let len = objects.len();
1851 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1852 let futures = chunks
1853 .into_iter()
1854 .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1855
1856 futures::future::try_join_all(futures)
1857 .await
1858 .map_err(|e| {
1859 tracing::error!(
1860 "Failed to join persist_objects_history_chunk futures: {}",
1861 e
1862 );
1863 IndexerError::from(e)
1864 })?
1865 .into_iter()
1866 .collect::<Result<Vec<_>, _>>()
1867 .map_err(|e| {
1868 IndexerError::PostgresWrite(format!(
1869 "Failed to persist all objects history chunks: {e:?}"
1870 ))
1871 })?;
1872 let elapsed = guard.stop_and_record();
1873 info!(elapsed, "Persisted {} objects history", len);
1874 Ok(())
1875 }
1876
1877 async fn persist_object_versions(
1878 &self,
1879 object_versions: Vec<StoredObjectVersion>,
1880 ) -> Result<(), IndexerError> {
1881 if object_versions.is_empty() {
1882 return Ok(());
1883 }
1884
1885 let guard = self
1886 .metrics
1887 .checkpoint_db_commit_latency_objects_version
1888 .start_timer();
1889
1890 let object_versions_count = object_versions.len();
1891
1892 let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1893 let futures = chunks
1894 .into_iter()
1895 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1896 .collect::<Vec<_>>();
1897
1898 futures::future::try_join_all(futures)
1899 .await
1900 .map_err(|e| {
1901 tracing::error!("Failed to join persist_object_version_chunk futures: {}", e);
1902 IndexerError::from(e)
1903 })?
1904 .into_iter()
1905 .collect::<Result<Vec<_>, _>>()
1906 .map_err(|e| {
1907 IndexerError::PostgresWrite(format!(
1908 "Failed to persist all objects version chunks: {e:?}"
1909 ))
1910 })?;
1911 let elapsed = guard.stop_and_record();
1912 info!(elapsed, "Persisted {object_versions_count} object versions");
1913 Ok(())
1914 }
1915
1916 async fn persist_checkpoints(
1917 &self,
1918 checkpoints: Vec<IndexedCheckpoint>,
1919 ) -> Result<(), IndexerError> {
1920 self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1921 .await
1922 }
1923
1924 async fn persist_transactions(
1925 &self,
1926 transactions: Vec<IndexedTransaction>,
1927 ) -> Result<(), IndexerError> {
1928 let guard = self
1929 .metrics
1930 .checkpoint_db_commit_latency_transactions
1931 .start_timer();
1932 let len = transactions.len();
1933
1934 let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1935 let futures = chunks
1936 .into_iter()
1937 .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1938
1939 futures::future::try_join_all(futures)
1940 .await
1941 .map_err(|e| {
1942 tracing::error!("Failed to join persist_transactions_chunk futures: {}", e);
1943 IndexerError::from(e)
1944 })?
1945 .into_iter()
1946 .collect::<Result<Vec<_>, _>>()
1947 .map_err(|e| {
1948 IndexerError::PostgresWrite(format!(
1949 "Failed to persist all transactions chunks: {e:?}"
1950 ))
1951 })?;
1952 let elapsed = guard.stop_and_record();
1953 info!(elapsed, "Persisted {} transactions", len);
1954 Ok(())
1955 }
1956
1957 fn persist_optimistic_transaction_in_existing_transaction(
1958 &self,
1959 conn: &mut PgConnection,
1960 transaction: OptimisticTransaction,
1961 ) -> Result<(), IndexerError> {
1962 insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1963 Ok(())
1964 }
1965
1966 async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1967 if events.is_empty() {
1968 return Ok(());
1969 }
1970 let len = events.len();
1971 let guard = self
1972 .metrics
1973 .checkpoint_db_commit_latency_events
1974 .start_timer();
1975 let chunks = chunk!(events, self.config.parallel_chunk_size);
1976 let futures = chunks
1977 .into_iter()
1978 .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1979
1980 futures::future::try_join_all(futures)
1981 .await
1982 .map_err(|e| {
1983 tracing::error!("Failed to join persist_events_chunk futures: {}", e);
1984 IndexerError::from(e)
1985 })?
1986 .into_iter()
1987 .collect::<Result<Vec<_>, _>>()
1988 .map_err(|e| {
1989 IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
1990 })?;
1991 let elapsed = guard.stop_and_record();
1992 info!(elapsed, "Persisted {} events", len);
1993 Ok(())
1994 }
1995
1996 async fn persist_displays(
1997 &self,
1998 display_updates: BTreeMap<String, StoredDisplay>,
1999 ) -> Result<(), IndexerError> {
2000 if display_updates.is_empty() {
2001 return Ok(());
2002 }
2003
2004 self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
2005 .await?
2006 }
2007
2008 fn persist_displays_in_existing_transaction(
2009 &self,
2010 conn: &mut PgConnection,
2011 display_updates: Vec<&StoredDisplay>,
2012 ) -> Result<(), IndexerError> {
2013 if display_updates.is_empty() {
2014 return Ok(());
2015 }
2016
2017 on_conflict_do_update_with_condition!(
2018 display::table,
2019 display_updates,
2020 display::object_type,
2021 (
2022 display::id.eq(excluded(display::id)),
2023 display::version.eq(excluded(display::version)),
2024 display::bcs.eq(excluded(display::bcs)),
2025 ),
2026 excluded(display::version).gt(display::version),
2027 conn
2028 );
2029
2030 Ok(())
2031 }
2032
2033 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2034 if packages.is_empty() {
2035 return Ok(());
2036 }
2037 self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
2038 .await
2039 }
2040
2041 async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2042 if indices.is_empty() {
2043 return Ok(());
2044 }
2045 let len = indices.len();
2046 let guard = self
2047 .metrics
2048 .checkpoint_db_commit_latency_event_indices
2049 .start_timer();
2050 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2051
2052 let futures = chunks.into_iter().map(|chunk| {
2053 self.spawn_task(move |this: Self| async move {
2054 this.persist_event_indices_chunk(chunk).await
2055 })
2056 });
2057
2058 futures::future::try_join_all(futures)
2059 .await
2060 .map_err(|e| {
2061 tracing::error!("Failed to join persist_event_indices_chunk futures: {}", e);
2062 IndexerError::from(e)
2063 })?
2064 .into_iter()
2065 .collect::<Result<Vec<_>, _>>()
2066 .map_err(|e| {
2067 IndexerError::PostgresWrite(format!(
2068 "Failed to persist all event_indices chunks: {e:?}"
2069 ))
2070 })?;
2071 let elapsed = guard.stop_and_record();
2072 info!(elapsed, "Persisted {} event_indices chunks", len);
2073 Ok(())
2074 }
2075
2076 async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2077 if indices.is_empty() {
2078 return Ok(());
2079 }
2080 let len = indices.len();
2081 let guard = self
2082 .metrics
2083 .checkpoint_db_commit_latency_tx_indices
2084 .start_timer();
2085 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2086
2087 let futures = chunks.into_iter().map(|chunk| {
2088 self.spawn_task(
2089 move |this: Self| async move { this.persist_tx_indices_chunk(chunk).await },
2090 )
2091 });
2092 futures::future::try_join_all(futures)
2093 .await
2094 .map_err(|e| {
2095 tracing::error!("Failed to join persist_tx_indices_chunk futures: {}", e);
2096 IndexerError::from(e)
2097 })?
2098 .into_iter()
2099 .collect::<Result<Vec<_>, _>>()
2100 .map_err(|e| {
2101 IndexerError::PostgresWrite(format!(
2102 "Failed to persist all tx_indices chunks: {e:?}"
2103 ))
2104 })?;
2105 let elapsed = guard.stop_and_record();
2106 info!(elapsed, "Persisted {} tx_indices chunks", len);
2107 Ok(())
2108 }
2109
2110 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2111 self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2112 .await
2113 }
2114
2115 async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2116 self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2117 .await
2118 }
2119
2120 async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2121 let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2122 (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2123 _ => Err(IndexerError::PostgresRead(format!(
2124 "Failed to get checkpoint range for epoch {epoch}"
2125 ))),
2126 }?;
2127
2128 let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2133 min_cp = std::cmp::max(min_cp, min_prunable_cp);
2134 for cp in min_cp..=max_cp {
2135 info!(
2147 "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2148 cp, epoch, min_prunable_cp
2149 );
2150 self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2151 .await
2152 .unwrap_or_else(|e| {
2153 tracing::error!("Failed to prune checkpoint {}: {}", cp, e);
2154 });
2155
2156 let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2157 self.execute_in_blocking_worker(move |this| {
2158 this.prune_tx_indices_table(min_tx, max_tx)
2159 })
2160 .await
2161 .unwrap_or_else(|e| {
2162 tracing::error!("Failed to prune transactions for cp {}: {}", cp, e);
2163 });
2164 info!(
2165 "Pruned transactions for checkpoint {} from tx {} to tx {}",
2166 cp, min_tx, max_tx
2167 );
2168 self.execute_in_blocking_worker(move |this| {
2169 this.prune_event_indices_table(min_tx, max_tx)
2170 })
2171 .await
2172 .unwrap_or_else(|e| {
2173 tracing::error!(
2174 "Failed to prune events of transactions for cp {}: {}",
2175 cp,
2176 e
2177 );
2178 });
2179 info!(
2180 "Pruned events of transactions for checkpoint {} from tx {} to tx {}",
2181 cp, min_tx, max_tx
2182 );
2183 self.metrics.last_pruned_transaction.set(max_tx as i64);
2184
2185 self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2186 .await
2187 .unwrap_or_else(|e| {
2188 tracing::error!(
2189 "Failed to prune pruner_cp_watermark table for cp {}: {}",
2190 cp,
2191 e
2192 );
2193 });
2194 info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2195 self.metrics.last_pruned_checkpoint.set(cp as i64);
2196 }
2197
2198 Ok(())
2199 }
2200
2201 async fn get_network_total_transactions_by_end_of_epoch(
2202 &self,
2203 epoch: u64,
2204 ) -> Result<Option<u64>, IndexerError> {
2205 self.execute_in_blocking_worker(move |this| {
2206 this.get_network_total_transactions_by_end_of_epoch(epoch)
2207 })
2208 .await
2209 }
2210
2211 async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2212 self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2213 .await
2214 }
2215
2216 fn as_any(&self) -> &dyn StdAny {
2217 self
2218 }
2219
2220 fn persist_protocol_configs_and_feature_flags(
2223 &self,
2224 chain_id: Vec<u8>,
2225 ) -> Result<(), IndexerError> {
2226 let chain_id = ChainIdentifier::from(
2227 CheckpointDigest::try_from(chain_id).expect("Unable to convert chain id"),
2228 );
2229
2230 let mut all_configs = vec![];
2231 let mut all_flags = vec![];
2232
2233 let (start_version, end_version) = self.get_protocol_version_index_range()?;
2234 info!(
2235 "Persisting protocol configs with start_version: {}, end_version: {}",
2236 start_version, end_version
2237 );
2238
2239 for version in start_version..=end_version {
2242 let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2243 (version as u64).into(),
2244 chain_id.chain(),
2245 )
2246 .ok_or(IndexerError::Generic(format!(
2247 "Unable to fetch protocol version {} and chain {:?}",
2248 version,
2249 chain_id.chain()
2250 )))?;
2251 let configs_vec = protocol_configs
2252 .attr_map()
2253 .into_iter()
2254 .map(|(k, v)| StoredProtocolConfig {
2255 protocol_version: version,
2256 config_name: k,
2257 config_value: v.map(|v| v.to_string()),
2258 })
2259 .collect::<Vec<_>>();
2260 all_configs.extend(configs_vec);
2261
2262 let feature_flags = protocol_configs
2263 .feature_map()
2264 .into_iter()
2265 .map(|(k, v)| StoredFeatureFlag {
2266 protocol_version: version,
2267 flag_name: k,
2268 flag_value: v,
2269 })
2270 .collect::<Vec<_>>();
2271 all_flags.extend(feature_flags);
2272 }
2273
2274 transactional_blocking_with_retry!(
2275 &self.blocking_cp,
2276 |conn| {
2277 for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2278 insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2279 }
2280 insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2281 Ok::<(), IndexerError>(())
2282 },
2283 PG_DB_COMMIT_SLEEP_DURATION
2284 )?;
2285 Ok(())
2286 }
2287}
2288#[async_trait]
2289impl IndexerStoreExt for PgIndexerStore {
2290 async fn persist_tx_indices_v2(&self, indices: Vec<TxIndexV2>) -> Result<(), IndexerError> {
2291 if indices.is_empty() {
2292 return Ok(());
2293 }
2294 let len = indices.len();
2295 let guard = self
2296 .metrics
2297 .checkpoint_db_commit_latency_tx_indices
2298 .start_timer();
2299 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2300
2301 let futures = chunks.into_iter().map(|chunk| {
2302 self.spawn_task(move |this: Self| async move {
2303 this.persist_tx_indices_chunk_v2(chunk).await
2304 })
2305 });
2306 futures::future::try_join_all(futures)
2307 .await
2308 .map_err(|e| {
2309 tracing::error!("Failed to join persist_tx_indices_chunk futures: {}", e);
2310 IndexerError::from(e)
2311 })?
2312 .into_iter()
2313 .collect::<Result<Vec<_>, _>>()
2314 .map_err(|e| {
2315 IndexerError::PostgresWrite(format!(
2316 "Failed to persist all tx_indices chunks: {e:?}"
2317 ))
2318 })?;
2319 let elapsed = guard.stop_and_record();
2320 info!(elapsed, "Persisted {} tx_indices chunks", len);
2321 Ok(())
2322 }
2323
2324 async fn persist_checkpoint_objects(
2325 &self,
2326 objects: Vec<CheckpointObjectChanges>,
2327 ) -> Result<(), IndexerError> {
2328 if objects.is_empty() {
2329 return Ok(());
2330 }
2331 let guard = self
2332 .metrics
2333 .checkpoint_db_commit_latency_objects
2334 .start_timer();
2335 let CheckpointObjectChanges {
2336 changed_objects: mutations,
2337 deleted_objects: deletions,
2338 } = retain_latest_objects_from_checkpoint_batch(objects);
2339 let mutation_len = mutations.len();
2340 let deletion_len = deletions.len();
2341
2342 let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2343 let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2344 let mutation_futures = mutation_chunks
2345 .into_iter()
2346 .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2347 futures::future::try_join_all(mutation_futures)
2348 .await
2349 .map_err(|e| {
2350 tracing::error!(
2351 "Failed to join persist_object_mutation_chunk futures: {}",
2352 e
2353 );
2354 IndexerError::from(e)
2355 })?
2356 .into_iter()
2357 .collect::<Result<Vec<_>, _>>()
2358 .map_err(|e| {
2359 IndexerError::PostgresWrite(format!(
2360 "Failed to persist all object mutation chunks: {e:?}",
2361 ))
2362 })?;
2363 let deletion_futures = deletion_chunks
2364 .into_iter()
2365 .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2366 futures::future::try_join_all(deletion_futures)
2367 .await
2368 .map_err(|e| {
2369 tracing::error!(
2370 "Failed to join persist_object_deletion_chunk futures: {}",
2371 e
2372 );
2373 IndexerError::from(e)
2374 })?
2375 .into_iter()
2376 .collect::<Result<Vec<_>, _>>()
2377 .map_err(|e| {
2378 IndexerError::PostgresWrite(format!(
2379 "Failed to persist all object deletion chunks: {e:?}",
2380 ))
2381 })?;
2382
2383 let elapsed = guard.stop_and_record();
2384 info!(
2385 elapsed,
2386 "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2387 );
2388 Ok(())
2389 }
2390
2391 async fn update_status_for_checkpoint_transactions(
2392 &self,
2393 tx_order: Vec<CheckpointTxGlobalOrder>,
2394 ) -> Result<(), IndexerError> {
2395 let guard = self
2396 .metrics
2397 .checkpoint_db_commit_latency_tx_insertion_order
2398 .start_timer();
2399 let len = tx_order.len();
2400
2401 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2402 let futures = chunks.into_iter().map(|c| {
2403 self.spawn_blocking_task(move |this| {
2404 this.update_status_for_checkpoint_transactions_chunk(c)
2405 })
2406 });
2407
2408 futures::future::try_join_all(futures)
2409 .await
2410 .map_err(|e| {
2411 tracing::error!(
2412 "Failed to join update_status_for_checkpoint_transactions_chunk futures: {e}",
2413 );
2414 IndexerError::from(e)
2415 })?
2416 .into_iter()
2417 .collect::<Result<Vec<_>, _>>()
2418 .map_err(|e| {
2419 IndexerError::PostgresWrite(format!(
2420 "Failed to update all `tx_global_order` chunks: {e:?}",
2421 ))
2422 })?;
2423 let elapsed = guard.stop_and_record();
2424 info!(
2425 elapsed,
2426 "Updated index status for {len} txs insertion orders"
2427 );
2428 Ok(())
2429 }
2430
2431 async fn persist_tx_global_order(
2432 &self,
2433 tx_order: Vec<CheckpointTxGlobalOrder>,
2434 ) -> Result<(), IndexerError> {
2435 let guard = self
2436 .metrics
2437 .checkpoint_db_commit_latency_tx_insertion_order
2438 .start_timer();
2439 let len = tx_order.len();
2440
2441 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2442 let futures = chunks
2443 .into_iter()
2444 .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2445
2446 futures::future::try_join_all(futures)
2447 .await
2448 .map_err(|e| {
2449 tracing::error!("Failed to join persist_tx_global_order_chunk futures: {e}",);
2450 IndexerError::from(e)
2451 })?
2452 .into_iter()
2453 .collect::<Result<Vec<_>, _>>()
2454 .map_err(|e| {
2455 IndexerError::PostgresWrite(format!(
2456 "Failed to persist all txs insertion order chunks: {e:?}",
2457 ))
2458 })?;
2459 let elapsed = guard.stop_and_record();
2460 info!(elapsed, "Persisted {len} txs insertion orders");
2461 Ok(())
2462 }
2463}
2464
2465fn make_objects_history_to_commit(
2466 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2467) -> Vec<StoredHistoryObject> {
2468 let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2469 .clone()
2470 .into_iter()
2471 .flat_map(|changes| changes.deleted_objects)
2472 .map(|o| o.into())
2473 .collect();
2474 let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2475 .into_iter()
2476 .flat_map(|changes| changes.changed_objects)
2477 .map(|o| o.into())
2478 .collect();
2479 deleted_objects.into_iter().chain(mutated_objects).collect()
2480}
2481
2482fn retain_latest_indexed_objects(
2488 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2489) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2490 use std::collections::HashMap;
2491
2492 let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2493 let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2494
2495 for change in tx_object_changes {
2496 for mutation in change.changed_objects {
2500 let id = mutation.object.id();
2501 let version = mutation.object.version();
2502
2503 if let Some(existing) = deletions.remove(&id) {
2504 assert!(
2505 existing.object_version < version.value(),
2506 "Mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2507 existing.object_version
2508 );
2509 }
2510
2511 if let Some(existing) = mutations.insert(id, mutation) {
2512 assert!(
2513 existing.object.version() < version,
2514 "Mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2515 existing.object.version()
2516 );
2517 }
2518 }
2519 for deletion in change.deleted_objects {
2521 let id = deletion.object_id;
2522 let version = deletion.object_version;
2523
2524 if let Some(existing) = mutations.remove(&id) {
2525 assert!(
2526 existing.object.version().value() < version,
2527 "Deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2528 existing.object.version(),
2529 );
2530 }
2531
2532 if let Some(existing) = deletions.insert(id, deletion) {
2533 assert!(
2534 existing.object_version < version,
2535 "Deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2536 existing.object_version
2537 );
2538 }
2539 }
2540 }
2541
2542 (
2543 mutations.into_values().collect(),
2544 deletions.into_values().collect(),
2545 )
2546}