1use core::result::Result::Ok;
6use std::{any::Any as StdAny, collections::BTreeMap, time::Duration};
7
8use async_trait::async_trait;
9use diesel::{
10 ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
11 dsl::{max, min},
12 sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
13 upsert::excluded,
14};
15use downcast::Any;
16use iota_protocol_config::ProtocolConfig;
17use iota_types::{
18 base_types::ObjectID,
19 digests::{ChainIdentifier, CheckpointDigest},
20};
21use itertools::Itertools;
22use tap::TapFallible;
23use tracing::info;
24
25use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
26use crate::{
27 db::ConnectionPool,
28 errors::{Context, IndexerError, IndexerResult},
29 ingestion::{
30 common::prepare::{
31 CheckpointObjectChanges, LiveObject, RemovedObject,
32 retain_latest_objects_from_checkpoint_batch,
33 },
34 primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
35 },
36 insert_or_ignore_into,
37 metrics::IndexerMetrics,
38 models::{
39 checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
40 display::StoredDisplay,
41 epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
42 events::StoredEvent,
43 obj_indices::StoredObjectVersion,
44 objects::{
45 StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot,
46 StoredObjects,
47 },
48 packages::StoredPackage,
49 transactions::{
50 CheckpointTxGlobalOrder, IndexStatus, OptimisticTransaction, StoredTransaction,
51 },
52 tx_indices::TxIndexSplit,
53 },
54 on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
55 persist_chunk_into_table_in_existing_connection, read_only_blocking,
56 schema::{
57 chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
58 event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
59 event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
60 objects_version, optimistic_transactions, packages, protocol_configs, pruner_cp_watermark,
61 transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
62 tx_global_order, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
63 tx_wrapped_or_deleted_objects,
64 },
65 store::IndexerStore,
66 transactional_blocking_with_retry,
67 types::{
68 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
69 IndexedPackage, IndexedTransaction, TxIndex,
70 },
71};
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub struct TxGlobalOrderCursor {
77 pub global_sequence_number: i64,
78 pub optimistic_sequence_number: i64,
79}
80
81#[macro_export]
82macro_rules! chunk {
83 ($data: expr, $size: expr) => {{
84 $data
85 .into_iter()
86 .chunks($size)
87 .into_iter()
88 .map(|c| c.collect())
89 .collect::<Vec<Vec<_>>>()
90 }};
91}
92
93macro_rules! prune_tx_or_event_indice_table {
94 ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
95 diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
96 .execute($conn)
97 .map_err(IndexerError::from)
98 .context($context_msg)?;
99 };
100}
101
102const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
107const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
109const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
113const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
114
115#[derive(Clone)]
116pub struct PgIndexerStoreConfig {
117 pub parallel_chunk_size: usize,
118 pub parallel_objects_chunk_size: usize,
119}
120
121pub struct PgIndexerStore {
122 blocking_cp: ConnectionPool,
123 metrics: IndexerMetrics,
124 partition_manager: PgPartitionManager,
125 config: PgIndexerStoreConfig,
126}
127
128impl Clone for PgIndexerStore {
129 fn clone(&self) -> PgIndexerStore {
130 Self {
131 blocking_cp: self.blocking_cp.clone(),
132 metrics: self.metrics.clone(),
133 partition_manager: self.partition_manager.clone(),
134 config: self.config.clone(),
135 }
136 }
137}
138
139impl PgIndexerStore {
140 pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
141 let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
142 .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
143 .parse::<usize>()
144 .unwrap();
145 let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
146 .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
147 .parse::<usize>()
148 .unwrap();
149 let partition_manager = PgPartitionManager::new(blocking_cp.clone())
150 .expect("failed to initialize partition manager");
151 let config = PgIndexerStoreConfig {
152 parallel_chunk_size,
153 parallel_objects_chunk_size,
154 };
155
156 Self {
157 blocking_cp,
158 metrics,
159 partition_manager,
160 config,
161 }
162 }
163
164 pub fn get_metrics(&self) -> IndexerMetrics {
165 self.metrics.clone()
166 }
167
168 pub fn blocking_cp(&self) -> ConnectionPool {
169 self.blocking_cp.clone()
170 }
171
172 pub(crate) async fn get_latest_epoch_id_in_blocking_worker(
173 &self,
174 ) -> Result<Option<u64>, IndexerError> {
175 self.execute_in_blocking_worker(move |this| this.get_latest_epoch_id())
176 .await
177 }
178
179 pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
180 read_only_blocking!(&self.blocking_cp, |conn| {
181 epochs::dsl::epochs
182 .select(max(epochs::epoch))
183 .first::<Option<i64>>(conn)
184 .map(|v| v.map(|v| v as u64))
185 })
186 .context("Failed reading latest epoch id from PostgresDB")
187 }
188
189 pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
191 let start = read_only_blocking!(&self.blocking_cp, |conn| {
194 protocol_configs::dsl::protocol_configs
195 .select(max(protocol_configs::protocol_version))
196 .first::<Option<i64>>(conn)
197 })
198 .context("Failed reading latest protocol version from PostgresDB")?
199 .map_or(1, |v| v + 1);
200
201 let end = read_only_blocking!(&self.blocking_cp, |conn| {
203 epochs::dsl::epochs
204 .select(max(epochs::protocol_version))
205 .first::<Option<i64>>(conn)
206 })
207 .context("Failed reading latest epoch protocol version from PostgresDB")?
208 .unwrap_or(1);
209 Ok((start, end))
210 }
211
212 pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
213 read_only_blocking!(&self.blocking_cp, |conn| {
214 chain_identifier::dsl::chain_identifier
215 .select(chain_identifier::checkpoint_digest)
216 .first::<Vec<u8>>(conn)
217 .optional()
218 })
219 .context("Failed reading chain id from PostgresDB")
220 }
221
222 fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
223 read_only_blocking!(&self.blocking_cp, |conn| {
224 checkpoints::dsl::checkpoints
225 .select(max(checkpoints::sequence_number))
226 .first::<Option<i64>>(conn)
227 .map(|v| v.map(|v| v as u64))
228 })
229 .context("Failed reading latest checkpoint sequence number from PostgresDB")
230 }
231
232 fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
233 read_only_blocking!(&self.blocking_cp, |conn| {
234 checkpoints::dsl::checkpoints
235 .select((
236 min(checkpoints::sequence_number),
237 max(checkpoints::sequence_number),
238 ))
239 .first::<(Option<i64>, Option<i64>)>(conn)
240 .map(|(min, max)| {
241 (
242 min.unwrap_or_default() as u64,
243 max.unwrap_or_default() as u64,
244 )
245 })
246 })
247 .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
248 }
249
250 fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
251 read_only_blocking!(&self.blocking_cp, |conn| {
252 epochs::dsl::epochs
253 .select((min(epochs::epoch), max(epochs::epoch)))
254 .first::<(Option<i64>, Option<i64>)>(conn)
255 .map(|(min, max)| {
256 (
257 min.unwrap_or_default() as u64,
258 max.unwrap_or_default() as u64,
259 )
260 })
261 })
262 .context("Failed reading min and max epoch numbers from PostgresDB")
263 }
264
265 fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
266 read_only_blocking!(&self.blocking_cp, |conn| {
267 pruner_cp_watermark::dsl::pruner_cp_watermark
268 .select(min(pruner_cp_watermark::checkpoint_sequence_number))
269 .first::<Option<i64>>(conn)
270 .map(|v| v.unwrap_or_default() as u64)
271 })
272 .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
273 }
274
275 fn get_checkpoint_range_for_epoch(
276 &self,
277 epoch: u64,
278 ) -> Result<(u64, Option<u64>), IndexerError> {
279 read_only_blocking!(&self.blocking_cp, |conn| {
280 epochs::dsl::epochs
281 .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
282 .filter(epochs::epoch.eq(epoch as i64))
283 .first::<(i64, Option<i64>)>(conn)
284 .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
285 })
286 .context(
287 format!("failed reading checkpoint range from PostgresDB for epoch {epoch}").as_str(),
288 )
289 }
290
291 fn get_transaction_range_for_checkpoint(
292 &self,
293 checkpoint: u64,
294 ) -> Result<(u64, u64), IndexerError> {
295 read_only_blocking!(&self.blocking_cp, |conn| {
296 pruner_cp_watermark::dsl::pruner_cp_watermark
297 .select((
298 pruner_cp_watermark::min_tx_sequence_number,
299 pruner_cp_watermark::max_tx_sequence_number,
300 ))
301 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
302 .first::<(i64, i64)>(conn)
303 .map(|(min, max)| (min as u64, max as u64))
304 })
305 .context(
306 format!("failed reading transaction range from PostgresDB for checkpoint {checkpoint}")
307 .as_str(),
308 )
309 }
310
311 pub(crate) async fn get_global_order_for_tx_seq_in_blocking_worker(
312 &self,
313 tx_seq: i64,
314 ) -> Result<TxGlobalOrderCursor, IndexerError> {
315 self.execute_in_blocking_worker(move |this| this.get_global_order_for_tx_seq(tx_seq))
316 .await
317 }
318
319 fn get_global_order_for_tx_seq(
320 &self,
321 tx_seq: i64,
322 ) -> Result<TxGlobalOrderCursor, IndexerError> {
323 let result = read_only_blocking!(&self.blocking_cp, |conn| {
324 tx_global_order::dsl::tx_global_order
325 .select((
326 tx_global_order::global_sequence_number,
327 tx_global_order::optimistic_sequence_number,
328 ))
329 .filter(tx_global_order::chk_tx_sequence_number.eq(tx_seq))
330 .first::<(i64, i64)>(conn)
331 })
332 .context(
333 format!("failed reading global sequence number from PostgresDB for tx seq {tx_seq}")
334 .as_str(),
335 )?;
336 let (global_sequence_number, optimistic_sequence_number) = result;
337 Ok(TxGlobalOrderCursor {
338 global_sequence_number,
339 optimistic_sequence_number,
340 })
341 }
342
343 pub(crate) async fn prune_optimistic_transactions_up_to_in_blocking_worker(
344 &self,
345 to: TxGlobalOrderCursor,
346 limit: i64,
347 ) -> IndexerResult<usize> {
348 self.execute_in_blocking_worker(move |this| {
349 this.prune_optimistic_transactions_up_to(to, limit)
350 })
351 .await
352 }
353
354 fn prune_optimistic_transactions_up_to(
355 &self,
356 to: TxGlobalOrderCursor,
357 limit: i64,
358 ) -> IndexerResult<usize> {
359 transactional_blocking_with_retry!(
360 &self.blocking_cp,
361 |conn| {
362 let sql = r#"
363 WITH ids_to_delete AS (
364 SELECT global_sequence_number, optimistic_sequence_number
365 FROM optimistic_transactions
366 WHERE (global_sequence_number, optimistic_sequence_number) <= ($1, $2)
367 ORDER BY global_sequence_number, optimistic_sequence_number
368 FOR UPDATE LIMIT $3
369 )
370 DELETE FROM optimistic_transactions otx
371 USING ids_to_delete
372 WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
373 (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
374 "#;
375 diesel::sql_query(sql)
376 .bind::<BigInt, _>(to.global_sequence_number)
377 .bind::<BigInt, _>(to.optimistic_sequence_number)
378 .bind::<BigInt, _>(limit)
379 .execute(conn)
380 .map_err(IndexerError::from)
381 .context(
382 format!("failed to prune optimistic_transactions table to {to:?} with limit {limit}").as_str(),
383 )
384 },
385 PG_DB_COMMIT_SLEEP_DURATION
386 )
387 }
388
389 fn get_latest_object_snapshot_checkpoint_sequence_number(
390 &self,
391 ) -> Result<Option<u64>, IndexerError> {
392 read_only_blocking!(&self.blocking_cp, |conn| {
393 objects_snapshot::dsl::objects_snapshot
394 .select(max(objects_snapshot::checkpoint_sequence_number))
395 .first::<Option<i64>>(conn)
396 .map(|v| v.map(|v| v as u64))
397 })
398 .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
399 }
400
401 fn persist_display_updates(
402 &self,
403 display_updates: BTreeMap<String, StoredDisplay>,
404 ) -> Result<(), IndexerError> {
405 transactional_blocking_with_retry!(
406 &self.blocking_cp,
407 {
408 let value = display_updates.values().collect::<Vec<_>>();
409 |conn| self.persist_displays_in_existing_transaction(conn, value)
410 },
411 PG_DB_COMMIT_SLEEP_DURATION
412 )?;
413
414 Ok(())
415 }
416
417 fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
418 let guard = self
419 .metrics
420 .checkpoint_db_commit_latency_objects_chunks
421 .start_timer();
422 let len = objects.len();
423 let raw_query = r#"
424 INSERT INTO objects (
425 object_id,
426 object_version,
427 object_digest,
428 owner_type,
429 owner_id,
430 object_type,
431 object_type_package,
432 object_type_module,
433 object_type_name,
434 serialized_object,
435 coin_type,
436 coin_balance,
437 df_kind
438 )
439 SELECT
440 u.object_id,
441 u.object_version,
442 u.object_digest,
443 u.owner_type,
444 u.owner_id,
445 u.object_type,
446 u.object_type_package,
447 u.object_type_module,
448 u.object_type_name,
449 u.serialized_object,
450 u.coin_type,
451 u.coin_balance,
452 u.df_kind
453 FROM UNNEST(
454 $1::BYTEA[],
455 $2::BIGINT[],
456 $3::BYTEA[],
457 $4::SMALLINT[],
458 $5::BYTEA[],
459 $6::TEXT[],
460 $7::BYTEA[],
461 $8::TEXT[],
462 $9::TEXT[],
463 $10::BYTEA[],
464 $11::TEXT[],
465 $12::BIGINT[],
466 $13::SMALLINT[],
467 $14::BYTEA[]
468 ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest)
469 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
470 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
471 ON CONFLICT (object_id) DO UPDATE
472 SET
473 object_version = EXCLUDED.object_version,
474 object_digest = EXCLUDED.object_digest,
475 owner_type = EXCLUDED.owner_type,
476 owner_id = EXCLUDED.owner_id,
477 object_type = EXCLUDED.object_type,
478 object_type_package = EXCLUDED.object_type_package,
479 object_type_module = EXCLUDED.object_type_module,
480 object_type_name = EXCLUDED.object_type_name,
481 serialized_object = EXCLUDED.serialized_object,
482 coin_type = EXCLUDED.coin_type,
483 coin_balance = EXCLUDED.coin_balance,
484 df_kind = EXCLUDED.df_kind
485 "#;
486 let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
487 .into_iter()
488 .map(LiveObject::split)
489 .map(|(indexed_object, tx_digest)| {
490 (
491 StoredObject::from(indexed_object),
492 tx_digest.into_inner().to_vec(),
493 )
494 })
495 .unzip();
496 let query = diesel::sql_query(raw_query)
497 .bind::<Array<Bytea>, _>(objects.object_ids)
498 .bind::<Array<BigInt>, _>(objects.object_versions)
499 .bind::<Array<Bytea>, _>(objects.object_digests)
500 .bind::<Array<SmallInt>, _>(objects.owner_types)
501 .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
502 .bind::<Array<Nullable<Text>>, _>(objects.object_types)
503 .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
504 .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
505 .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
506 .bind::<Array<Bytea>, _>(objects.serialized_objects)
507 .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
508 .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
509 .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
510 .bind::<Array<Bytea>, _>(tx_digests);
511 transactional_blocking_with_retry!(
512 &self.blocking_cp,
513 |conn| {
514 query.clone().execute(conn)?;
515 Ok::<(), IndexerError>(())
516 },
517 PG_DB_COMMIT_SLEEP_DURATION
518 )
519 .tap_ok(|_| {
520 let elapsed = guard.stop_and_record();
521 info!(elapsed, "Persisted {len} chunked objects");
522 })
523 .tap_err(|e| {
524 tracing::error!("failed to persist object mutations with error: {e}");
525 })
526 }
527
528 fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
529 let guard = self
530 .metrics
531 .checkpoint_db_commit_latency_objects_chunks
532 .start_timer();
533 let len = objects.len();
534 let raw_query = r#"
535 DELETE FROM objects
536 WHERE object_id IN (
537 SELECT u.object_id
538 FROM UNNEST(
539 $1::BYTEA[],
540 $2::BYTEA[]
541 ) AS u(object_id, tx_digest)
542 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
543 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = 0
544 )
545 "#;
546 let (object_ids, tx_digests): (Vec<_>, Vec<_>) = objects
547 .into_iter()
548 .map(|removed_object| {
549 (
550 removed_object.object_id().to_vec(),
551 removed_object.transaction_digest.into_inner().to_vec(),
552 )
553 })
554 .unzip();
555 let query = diesel::sql_query(raw_query)
556 .bind::<Array<Bytea>, _>(object_ids)
557 .bind::<Array<Bytea>, _>(tx_digests);
558 transactional_blocking_with_retry!(
559 &self.blocking_cp,
560 |conn| {
561 query.clone().execute(conn)?;
562 Ok::<(), IndexerError>(())
563 },
564 PG_DB_COMMIT_SLEEP_DURATION
565 )
566 .tap_ok(|_| {
567 let elapsed = guard.stop_and_record();
568 info!(elapsed, "Deleted {len} chunked objects");
569 })
570 .tap_err(|e| {
571 tracing::error!("failed to persist object deletions with error: {e}");
572 })
573 }
574
575 fn persist_object_mutation_chunk_in_existing_transaction(
576 &self,
577 conn: &mut PgConnection,
578 mutated_object_mutation_chunk: Vec<StoredObject>,
579 ) -> Result<(), IndexerError> {
580 on_conflict_do_update!(
581 objects::table,
582 mutated_object_mutation_chunk,
583 objects::object_id,
584 (
585 objects::object_id.eq(excluded(objects::object_id)),
586 objects::object_version.eq(excluded(objects::object_version)),
587 objects::object_digest.eq(excluded(objects::object_digest)),
588 objects::owner_type.eq(excluded(objects::owner_type)),
589 objects::owner_id.eq(excluded(objects::owner_id)),
590 objects::object_type.eq(excluded(objects::object_type)),
591 objects::serialized_object.eq(excluded(objects::serialized_object)),
592 objects::coin_type.eq(excluded(objects::coin_type)),
593 objects::coin_balance.eq(excluded(objects::coin_balance)),
594 objects::df_kind.eq(excluded(objects::df_kind)),
595 ),
596 conn
597 );
598 Ok::<(), IndexerError>(())
599 }
600
601 fn persist_object_deletion_chunk_in_existing_transaction(
602 &self,
603 conn: &mut PgConnection,
604 deleted_objects_chunk: Vec<StoredDeletedObject>,
605 ) -> Result<(), IndexerError> {
606 diesel::delete(
607 objects::table.filter(
608 objects::object_id.eq_any(
609 deleted_objects_chunk
610 .iter()
611 .map(|o| o.object_id.clone())
612 .collect::<Vec<_>>(),
613 ),
614 ),
615 )
616 .execute(conn)
617 .map_err(IndexerError::from)
618 .context("Failed to write object deletion to PostgresDB")?;
619
620 Ok::<(), IndexerError>(())
621 }
622
623 fn backfill_objects_snapshot_chunk(
624 &self,
625 objects_snapshot: Vec<StoredObjectSnapshot>,
626 ) -> Result<(), IndexerError> {
627 let guard = self
628 .metrics
629 .checkpoint_db_commit_latency_objects_snapshot_chunks
630 .start_timer();
631 transactional_blocking_with_retry!(
632 &self.blocking_cp,
633 |conn| {
634 for objects_snapshot_chunk in
635 objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
636 {
637 on_conflict_do_update!(
638 objects_snapshot::table,
639 objects_snapshot_chunk,
640 objects_snapshot::object_id,
641 (
642 objects_snapshot::object_version
643 .eq(excluded(objects_snapshot::object_version)),
644 objects_snapshot::object_status
645 .eq(excluded(objects_snapshot::object_status)),
646 objects_snapshot::object_digest
647 .eq(excluded(objects_snapshot::object_digest)),
648 objects_snapshot::checkpoint_sequence_number
649 .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
650 objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
651 objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
652 objects_snapshot::object_type_package
653 .eq(excluded(objects_snapshot::object_type_package)),
654 objects_snapshot::object_type_module
655 .eq(excluded(objects_snapshot::object_type_module)),
656 objects_snapshot::object_type_name
657 .eq(excluded(objects_snapshot::object_type_name)),
658 objects_snapshot::object_type
659 .eq(excluded(objects_snapshot::object_type)),
660 objects_snapshot::serialized_object
661 .eq(excluded(objects_snapshot::serialized_object)),
662 objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
663 objects_snapshot::coin_balance
664 .eq(excluded(objects_snapshot::coin_balance)),
665 objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
666 ),
667 conn
668 );
669 }
670 Ok::<(), IndexerError>(())
671 },
672 PG_DB_COMMIT_SLEEP_DURATION
673 )
674 .tap_ok(|_| {
675 let elapsed = guard.stop_and_record();
676 info!(
677 elapsed,
678 "Persisted {} chunked objects snapshot",
679 objects_snapshot.len(),
680 );
681 })
682 .tap_err(|e| {
683 tracing::error!("failed to persist object snapshot with error: {e}");
684 })
685 }
686
687 fn persist_objects_history_chunk(
688 &self,
689 stored_objects_history: Vec<StoredHistoryObject>,
690 ) -> Result<(), IndexerError> {
691 let guard = self
692 .metrics
693 .checkpoint_db_commit_latency_objects_history_chunks
694 .start_timer();
695 transactional_blocking_with_retry!(
696 &self.blocking_cp,
697 |conn| {
698 for stored_objects_history_chunk in
699 stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
700 {
701 insert_or_ignore_into!(
702 objects_history::table,
703 stored_objects_history_chunk,
704 conn
705 );
706 }
707 Ok::<(), IndexerError>(())
708 },
709 PG_DB_COMMIT_SLEEP_DURATION
710 )
711 .tap_ok(|_| {
712 let elapsed = guard.stop_and_record();
713 info!(
714 elapsed,
715 "Persisted {} chunked objects history",
716 stored_objects_history.len(),
717 );
718 })
719 .tap_err(|e| {
720 tracing::error!("failed to persist object history with error: {e}");
721 })
722 }
723
724 fn persist_object_version_chunk(
725 &self,
726 object_versions: Vec<StoredObjectVersion>,
727 ) -> Result<(), IndexerError> {
728 let guard = self
729 .metrics
730 .checkpoint_db_commit_latency_objects_version_chunks
731 .start_timer();
732
733 transactional_blocking_with_retry!(
734 &self.blocking_cp,
735 |conn| {
736 for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
737 {
738 insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
739 }
740 Ok::<(), IndexerError>(())
741 },
742 PG_DB_COMMIT_SLEEP_DURATION
743 )
744 .tap_ok(|_| {
745 let elapsed = guard.stop_and_record();
746 info!(
747 elapsed,
748 "Persisted {} chunked object versions",
749 object_versions.len(),
750 );
751 })
752 .tap_err(|e| {
753 tracing::error!("failed to persist object versions with error: {e}");
754 })
755 }
756
757 fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
758 let Some(first_checkpoint) = checkpoints.first() else {
759 return Ok(());
760 };
761
762 if first_checkpoint.sequence_number == 0 {
765 let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
766 self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
767 transactional_blocking_with_retry!(
768 &self.blocking_cp,
769 |conn| {
770 let checkpoint_digest =
771 first_checkpoint.checkpoint_digest.into_inner().to_vec();
772 insert_or_ignore_into!(
773 chain_identifier::table,
774 StoredChainIdentifier { checkpoint_digest },
775 conn
776 );
777 Ok::<(), IndexerError>(())
778 },
779 PG_DB_COMMIT_SLEEP_DURATION
780 )?;
781 }
782 let guard = self
783 .metrics
784 .checkpoint_db_commit_latency_checkpoints
785 .start_timer();
786
787 let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
788 transactional_blocking_with_retry!(
789 &self.blocking_cp,
790 |conn| {
791 for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
792 insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
793 }
794 Ok::<(), IndexerError>(())
795 },
796 PG_DB_COMMIT_SLEEP_DURATION
797 )
798 .tap_ok(|_| {
799 info!(
800 "Persisted {} pruner_cp_watermark rows.",
801 stored_cp_txs.len(),
802 );
803 })
804 .tap_err(|e| {
805 tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
806 })?;
807
808 let stored_checkpoints = checkpoints
809 .iter()
810 .map(StoredCheckpoint::from)
811 .collect::<Vec<_>>();
812 transactional_blocking_with_retry!(
813 &self.blocking_cp,
814 |conn| {
815 for stored_checkpoint_chunk in
816 stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
817 {
818 insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
819 let time_now_ms = chrono::Utc::now().timestamp_millis();
820 for stored_checkpoint in stored_checkpoint_chunk {
821 self.metrics
822 .db_commit_lag_ms
823 .set(time_now_ms - stored_checkpoint.timestamp_ms);
824 self.metrics.max_committed_checkpoint_sequence_number.set(
825 stored_checkpoint.sequence_number,
826 );
827 self.metrics.committed_checkpoint_timestamp_ms.set(
828 stored_checkpoint.timestamp_ms,
829 );
830 }
831 for stored_checkpoint in stored_checkpoint_chunk {
832 info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
833 }
834 }
835 Ok::<(), IndexerError>(())
836 },
837 PG_DB_COMMIT_SLEEP_DURATION
838 )
839 .tap_ok(|_| {
840 let elapsed = guard.stop_and_record();
841 info!(
842 elapsed,
843 "Persisted {} checkpoints",
844 stored_checkpoints.len()
845 );
846 })
847 .tap_err(|e| {
848 tracing::error!("failed to persist checkpoints with error: {e}");
849 })
850 }
851
852 fn persist_transactions_chunk(
853 &self,
854 transactions: Vec<IndexedTransaction>,
855 ) -> Result<(), IndexerError> {
856 let guard = self
857 .metrics
858 .checkpoint_db_commit_latency_transactions_chunks
859 .start_timer();
860 let transformation_guard = self
861 .metrics
862 .checkpoint_db_commit_latency_transactions_chunks_transformation
863 .start_timer();
864 let transactions = transactions
865 .iter()
866 .map(StoredTransaction::from)
867 .collect::<Vec<_>>();
868 drop(transformation_guard);
869
870 transactional_blocking_with_retry!(
871 &self.blocking_cp,
872 |conn| {
873 for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
874 insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
875 }
876 Ok::<(), IndexerError>(())
877 },
878 PG_DB_COMMIT_SLEEP_DURATION
879 )
880 .tap_ok(|_| {
881 let elapsed = guard.stop_and_record();
882 info!(
883 elapsed,
884 "Persisted {} chunked transactions",
885 transactions.len()
886 );
887 })
888 .tap_err(|e| {
889 tracing::error!("failed to persist transactions with error: {e}");
890 })
891 }
892
893 fn persist_tx_global_order_chunk(
894 &self,
895 tx_order: Vec<CheckpointTxGlobalOrder>,
896 ) -> Result<(), IndexerError> {
897 let guard = self
898 .metrics
899 .checkpoint_db_commit_latency_tx_insertion_order_chunks
900 .start_timer();
901
902 transactional_blocking_with_retry!(
903 &self.blocking_cp,
904 |conn| {
905 for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
906 insert_or_ignore_into!(tx_global_order::table, tx_order_chunk, conn);
907 }
908 Ok::<(), IndexerError>(())
909 },
910 PG_DB_COMMIT_SLEEP_DURATION
911 )
912 .tap_ok(|_| {
913 let elapsed = guard.stop_and_record();
914 info!(
915 elapsed,
916 "Persisted {} chunked txs insertion order",
917 tx_order.len()
918 );
919 })
920 .tap_err(|e| {
921 tracing::error!("failed to persist txs insertion order with error: {e}");
922 })
923 }
924
925 fn update_status_for_checkpoint_transactions_chunk(
932 &self,
933 tx_order: Vec<CheckpointTxGlobalOrder>,
934 ) -> Result<(), IndexerError> {
935 let guard = self
936 .metrics
937 .checkpoint_db_commit_latency_tx_insertion_order_chunks
938 .start_timer();
939
940 let num_transactions = tx_order.len();
941 transactional_blocking_with_retry!(
942 &self.blocking_cp,
943 |conn| {
944 on_conflict_do_update_with_condition!(
945 tx_global_order::table,
946 tx_order.clone(),
947 tx_global_order::tx_digest,
948 tx_global_order::optimistic_sequence_number.eq(IndexStatus::Completed),
949 tx_global_order::optimistic_sequence_number.eq(IndexStatus::Started),
950 conn
951 );
952 on_conflict_do_update_with_condition!(
953 tx_global_order::table,
954 tx_order.clone(),
955 tx_global_order::tx_digest,
956 tx_global_order::chk_tx_sequence_number
957 .eq(excluded(tx_global_order::chk_tx_sequence_number)),
958 tx_global_order::chk_tx_sequence_number.is_null(),
959 conn
960 );
961 Ok::<(), IndexerError>(())
962 },
963 PG_DB_COMMIT_SLEEP_DURATION
964 )
965 .tap_ok(|_| {
966 let elapsed = guard.stop_and_record();
967 info!(
968 elapsed,
969 "Updated {} chunked values of `tx_global_order`", num_transactions
970 );
971 })
972 .tap_err(|e| {
973 tracing::error!("failed to update `tx_global_order` with error: {e}");
974 })
975 }
976
977 fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
978 let guard = self
979 .metrics
980 .checkpoint_db_commit_latency_events_chunks
981 .start_timer();
982 let len = events.len();
983 let events = events
984 .into_iter()
985 .map(StoredEvent::from)
986 .collect::<Vec<_>>();
987
988 transactional_blocking_with_retry!(
989 &self.blocking_cp,
990 |conn| {
991 for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
992 insert_or_ignore_into!(events::table, event_chunk, conn);
993 }
994 Ok::<(), IndexerError>(())
995 },
996 PG_DB_COMMIT_SLEEP_DURATION
997 )
998 .tap_ok(|_| {
999 let elapsed = guard.stop_and_record();
1000 info!(elapsed, "Persisted {} chunked events", len);
1001 })
1002 .tap_err(|e| {
1003 tracing::error!("failed to persist events with error: {e}");
1004 })
1005 }
1006
1007 fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1008 if packages.is_empty() {
1009 return Ok(());
1010 }
1011 let guard = self
1012 .metrics
1013 .checkpoint_db_commit_latency_packages
1014 .start_timer();
1015 let packages = packages
1016 .into_iter()
1017 .map(StoredPackage::from)
1018 .collect::<Vec<_>>();
1019 transactional_blocking_with_retry!(
1020 &self.blocking_cp,
1021 |conn| {
1022 for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1023 on_conflict_do_update!(
1024 packages::table,
1025 packages_chunk,
1026 packages::package_id,
1027 (
1028 packages::package_id.eq(excluded(packages::package_id)),
1029 packages::move_package.eq(excluded(packages::move_package)),
1030 ),
1031 conn
1032 );
1033 }
1034 Ok::<(), IndexerError>(())
1035 },
1036 PG_DB_COMMIT_SLEEP_DURATION
1037 )
1038 .tap_ok(|_| {
1039 let elapsed = guard.stop_and_record();
1040 info!(elapsed, "Persisted {} packages", packages.len());
1041 })
1042 .tap_err(|e| {
1043 tracing::error!("failed to persist packages with error: {e}");
1044 })
1045 }
1046
1047 async fn persist_event_indices_chunk(
1048 &self,
1049 indices: Vec<EventIndex>,
1050 ) -> Result<(), IndexerError> {
1051 let guard = self
1052 .metrics
1053 .checkpoint_db_commit_latency_event_indices_chunks
1054 .start_timer();
1055 let len = indices.len();
1056 let (
1057 event_emit_packages,
1058 event_emit_modules,
1059 event_senders,
1060 event_struct_packages,
1061 event_struct_modules,
1062 event_struct_names,
1063 event_struct_instantiations,
1064 ) = indices.into_iter().map(|i| i.split()).fold(
1065 (
1066 Vec::new(),
1067 Vec::new(),
1068 Vec::new(),
1069 Vec::new(),
1070 Vec::new(),
1071 Vec::new(),
1072 Vec::new(),
1073 ),
1074 |(
1075 mut event_emit_packages,
1076 mut event_emit_modules,
1077 mut event_senders,
1078 mut event_struct_packages,
1079 mut event_struct_modules,
1080 mut event_struct_names,
1081 mut event_struct_instantiations,
1082 ),
1083 index| {
1084 event_emit_packages.push(index.0);
1085 event_emit_modules.push(index.1);
1086 event_senders.push(index.2);
1087 event_struct_packages.push(index.3);
1088 event_struct_modules.push(index.4);
1089 event_struct_names.push(index.5);
1090 event_struct_instantiations.push(index.6);
1091 (
1092 event_emit_packages,
1093 event_emit_modules,
1094 event_senders,
1095 event_struct_packages,
1096 event_struct_modules,
1097 event_struct_names,
1098 event_struct_instantiations,
1099 )
1100 },
1101 );
1102
1103 let mut futures = vec![];
1105 futures.push(self.spawn_blocking_task(move |this| {
1106 persist_chunk_into_table!(
1107 event_emit_package::table,
1108 event_emit_packages,
1109 &this.blocking_cp
1110 )
1111 }));
1112
1113 futures.push(self.spawn_blocking_task(move |this| {
1114 persist_chunk_into_table!(
1115 event_emit_module::table,
1116 event_emit_modules,
1117 &this.blocking_cp
1118 )
1119 }));
1120
1121 futures.push(self.spawn_blocking_task(move |this| {
1122 persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
1123 }));
1124
1125 futures.push(self.spawn_blocking_task(move |this| {
1126 persist_chunk_into_table!(
1127 event_struct_package::table,
1128 event_struct_packages,
1129 &this.blocking_cp
1130 )
1131 }));
1132
1133 futures.push(self.spawn_blocking_task(move |this| {
1134 persist_chunk_into_table!(
1135 event_struct_module::table,
1136 event_struct_modules,
1137 &this.blocking_cp
1138 )
1139 }));
1140
1141 futures.push(self.spawn_blocking_task(move |this| {
1142 persist_chunk_into_table!(
1143 event_struct_name::table,
1144 event_struct_names,
1145 &this.blocking_cp
1146 )
1147 }));
1148
1149 futures.push(self.spawn_blocking_task(move |this| {
1150 persist_chunk_into_table!(
1151 event_struct_instantiation::table,
1152 event_struct_instantiations,
1153 &this.blocking_cp
1154 )
1155 }));
1156
1157 futures::future::try_join_all(futures)
1158 .await
1159 .map_err(|e| {
1160 tracing::error!("failed to join event indices futures in a chunk: {e}");
1161 IndexerError::from(e)
1162 })?
1163 .into_iter()
1164 .collect::<Result<Vec<_>, _>>()
1165 .map_err(|e| {
1166 IndexerError::PostgresWrite(format!(
1167 "Failed to persist all event indices in a chunk: {e:?}"
1168 ))
1169 })?;
1170 let elapsed = guard.stop_and_record();
1171 info!(elapsed, "Persisted {} chunked event indices", len);
1172 Ok(())
1173 }
1174
1175 async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1176 let guard = self
1177 .metrics
1178 .checkpoint_db_commit_latency_tx_indices_chunks
1179 .start_timer();
1180 let len = indices.len();
1181
1182 let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
1183
1184 let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
1185 let recipients: Vec<_> = splits
1186 .iter()
1187 .flat_map(|ix| ix.tx_recipients.clone())
1188 .collect();
1189 let input_objects: Vec<_> = splits
1190 .iter()
1191 .flat_map(|ix| ix.tx_input_objects.clone())
1192 .collect();
1193 let changed_objects: Vec<_> = splits
1194 .iter()
1195 .flat_map(|ix| ix.tx_changed_objects.clone())
1196 .collect();
1197 let wrapped_or_deleted_objects: Vec<_> = splits
1198 .iter()
1199 .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
1200 .collect();
1201 let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
1202 let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
1203 let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
1204 let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1205 let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1206
1207 let futures = [
1208 self.spawn_blocking_task(move |this| {
1209 persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1210 }),
1211 self.spawn_blocking_task(move |this| {
1212 persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1213 }),
1214 self.spawn_blocking_task(move |this| {
1215 persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1216 }),
1217 self.spawn_blocking_task(move |this| {
1218 persist_chunk_into_table!(
1219 tx_changed_objects::table,
1220 changed_objects,
1221 &this.blocking_cp
1222 )
1223 }),
1224 self.spawn_blocking_task(move |this| {
1225 persist_chunk_into_table!(
1226 tx_wrapped_or_deleted_objects::table,
1227 wrapped_or_deleted_objects,
1228 &this.blocking_cp
1229 )
1230 }),
1231 self.spawn_blocking_task(move |this| {
1232 persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1233 }),
1234 self.spawn_blocking_task(move |this| {
1235 persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1236 }),
1237 self.spawn_blocking_task(move |this| {
1238 persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1239 }),
1240 self.spawn_blocking_task(move |this| {
1241 persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1242 }),
1243 self.spawn_blocking_task(move |this| {
1244 persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1245 }),
1246 ];
1247
1248 futures::future::try_join_all(futures)
1249 .await
1250 .map_err(|e| {
1251 tracing::error!("failed to join tx indices futures in a chunk: {e}");
1252 IndexerError::from(e)
1253 })?
1254 .into_iter()
1255 .collect::<Result<Vec<_>, _>>()
1256 .map_err(|e| {
1257 IndexerError::PostgresWrite(format!(
1258 "Failed to persist all tx indices in a chunk: {e:?}"
1259 ))
1260 })?;
1261 let elapsed = guard.stop_and_record();
1262 info!(elapsed, "Persisted {} chunked tx_indices", len);
1263 Ok(())
1264 }
1265
1266 fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1267 let guard = self
1268 .metrics
1269 .checkpoint_db_commit_latency_epoch
1270 .start_timer();
1271 let epoch_id = epoch.new_epoch.epoch;
1272
1273 transactional_blocking_with_retry!(
1274 &self.blocking_cp,
1275 |conn| {
1276 if let Some(last_epoch) = &epoch.last_epoch {
1277 info!(last_epoch.epoch, "Persisting epoch end data.");
1278 diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1279 .set(last_epoch)
1280 .execute(conn)?;
1281 }
1282
1283 info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1284 insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1285 Ok::<(), IndexerError>(())
1286 },
1287 PG_DB_COMMIT_SLEEP_DURATION
1288 )
1289 .tap_ok(|_| {
1290 let elapsed = guard.stop_and_record();
1291 info!(elapsed, epoch_id, "Persisted epoch beginning info");
1292 })
1293 .tap_err(|e| {
1294 tracing::error!("failed to persist epoch with error: {e}");
1295 })
1296 }
1297
1298 fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1299 let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1300 if let Some(last_epoch_id) = last_epoch_id {
1302 let last_db_epoch: Option<StoredEpochInfo> =
1303 read_only_blocking!(&self.blocking_cp, |conn| {
1304 epochs::table
1305 .filter(epochs::epoch.eq(last_epoch_id))
1306 .first::<StoredEpochInfo>(conn)
1307 .optional()
1308 })
1309 .context("Failed to read last epoch from PostgresDB")?;
1310 if let Some(last_epoch) = last_db_epoch {
1311 let epoch_partition_data =
1312 EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1313 let table_partitions = self.partition_manager.get_table_partitions()?;
1314 for (table, (_, last_partition)) in table_partitions {
1315 if !self
1317 .partition_manager
1318 .get_strategy(&table)
1319 .is_epoch_partitioned()
1320 {
1321 continue;
1322 }
1323 let guard = self.metrics.advance_epoch_latency.start_timer();
1324 self.partition_manager.advance_epoch(
1325 table.clone(),
1326 last_partition,
1327 &epoch_partition_data,
1328 )?;
1329 let elapsed = guard.stop_and_record();
1330 info!(
1331 elapsed,
1332 "Advanced epoch partition {} for table {}",
1333 last_partition,
1334 table.clone()
1335 );
1336 }
1337 } else {
1338 tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1339 }
1340 }
1341
1342 Ok(())
1343 }
1344
1345 fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1346 transactional_blocking_with_retry!(
1347 &self.blocking_cp,
1348 |conn| {
1349 diesel::delete(
1350 checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1351 )
1352 .execute(conn)
1353 .map_err(IndexerError::from)
1354 .context("Failed to prune checkpoints table")?;
1355
1356 Ok::<(), IndexerError>(())
1357 },
1358 PG_DB_COMMIT_SLEEP_DURATION
1359 )
1360 }
1361
1362 fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1363 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1364 transactional_blocking_with_retry!(
1365 &self.blocking_cp,
1366 |conn| {
1367 prune_tx_or_event_indice_table!(
1368 event_emit_module,
1369 conn,
1370 min_tx,
1371 max_tx,
1372 "Failed to prune event_emit_module table"
1373 );
1374 prune_tx_or_event_indice_table!(
1375 event_emit_package,
1376 conn,
1377 min_tx,
1378 max_tx,
1379 "Failed to prune event_emit_package table"
1380 );
1381 prune_tx_or_event_indice_table![
1382 event_senders,
1383 conn,
1384 min_tx,
1385 max_tx,
1386 "Failed to prune event_senders table"
1387 ];
1388 prune_tx_or_event_indice_table![
1389 event_struct_instantiation,
1390 conn,
1391 min_tx,
1392 max_tx,
1393 "Failed to prune event_struct_instantiation table"
1394 ];
1395 prune_tx_or_event_indice_table![
1396 event_struct_module,
1397 conn,
1398 min_tx,
1399 max_tx,
1400 "Failed to prune event_struct_module table"
1401 ];
1402 prune_tx_or_event_indice_table![
1403 event_struct_name,
1404 conn,
1405 min_tx,
1406 max_tx,
1407 "Failed to prune event_struct_name table"
1408 ];
1409 prune_tx_or_event_indice_table![
1410 event_struct_package,
1411 conn,
1412 min_tx,
1413 max_tx,
1414 "Failed to prune event_struct_package table"
1415 ];
1416 Ok::<(), IndexerError>(())
1417 },
1418 PG_DB_COMMIT_SLEEP_DURATION
1419 )
1420 }
1421
1422 fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1423 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1424 transactional_blocking_with_retry!(
1425 &self.blocking_cp,
1426 |conn| {
1427 prune_tx_or_event_indice_table!(
1428 tx_senders,
1429 conn,
1430 min_tx,
1431 max_tx,
1432 "Failed to prune tx_senders table"
1433 );
1434 prune_tx_or_event_indice_table!(
1435 tx_recipients,
1436 conn,
1437 min_tx,
1438 max_tx,
1439 "Failed to prune tx_recipients table"
1440 );
1441 prune_tx_or_event_indice_table![
1442 tx_input_objects,
1443 conn,
1444 min_tx,
1445 max_tx,
1446 "Failed to prune tx_input_objects table"
1447 ];
1448 prune_tx_or_event_indice_table![
1449 tx_changed_objects,
1450 conn,
1451 min_tx,
1452 max_tx,
1453 "Failed to prune tx_changed_objects table"
1454 ];
1455 prune_tx_or_event_indice_table![
1456 tx_wrapped_or_deleted_objects,
1457 conn,
1458 min_tx,
1459 max_tx,
1460 "Failed to prune tx_wrapped_or_deleted_objects table"
1461 ];
1462 prune_tx_or_event_indice_table![
1463 tx_calls_pkg,
1464 conn,
1465 min_tx,
1466 max_tx,
1467 "Failed to prune tx_calls_pkg table"
1468 ];
1469 prune_tx_or_event_indice_table![
1470 tx_calls_mod,
1471 conn,
1472 min_tx,
1473 max_tx,
1474 "Failed to prune tx_calls_mod table"
1475 ];
1476 prune_tx_or_event_indice_table![
1477 tx_calls_fun,
1478 conn,
1479 min_tx,
1480 max_tx,
1481 "Failed to prune tx_calls_fun table"
1482 ];
1483 prune_tx_or_event_indice_table![
1484 tx_digests,
1485 conn,
1486 min_tx,
1487 max_tx,
1488 "Failed to prune tx_digests table"
1489 ];
1490 Ok::<(), IndexerError>(())
1491 },
1492 PG_DB_COMMIT_SLEEP_DURATION
1493 )
1494 }
1495
1496 fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1497 transactional_blocking_with_retry!(
1498 &self.blocking_cp,
1499 |conn| {
1500 diesel::delete(
1501 pruner_cp_watermark::table
1502 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1503 )
1504 .execute(conn)
1505 .map_err(IndexerError::from)
1506 .context("Failed to prune pruner_cp_watermark table")?;
1507 Ok::<(), IndexerError>(())
1508 },
1509 PG_DB_COMMIT_SLEEP_DURATION
1510 )
1511 }
1512
1513 fn get_network_total_transactions_by_end_of_epoch(
1514 &self,
1515 epoch: u64,
1516 ) -> Result<Option<u64>, IndexerError> {
1517 read_only_blocking!(&self.blocking_cp, |conn| {
1518 epochs::table
1519 .filter(epochs::epoch.eq(epoch as i64))
1520 .select(epochs::network_total_transactions)
1521 .get_result::<Option<i64>>(conn)
1522 })
1523 .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1524 .map(|option| option.map(|v| v as u64))
1525 }
1526
1527 fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1528 transactional_blocking_with_retry!(
1529 &self.blocking_cp,
1530 |conn| {
1531 diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1532 .execute(conn)?;
1533 Ok::<(), IndexerError>(())
1534 },
1535 PG_DB_COMMIT_SLEEP_DURATION
1536 )
1537 .tap_ok(|_| {
1538 info!("Successfully refreshed participation_metrics");
1539 })
1540 .tap_err(|e| {
1541 tracing::error!("failed to refresh participation_metrics: {e}");
1542 })
1543 }
1544
1545 async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1546 where
1547 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1548 R: Send + 'static,
1549 {
1550 let this = self.clone();
1551 let current_span = tracing::Span::current();
1552 tokio::task::spawn_blocking(move || {
1553 let _guard = current_span.enter();
1554 f(this)
1555 })
1556 .await
1557 .map_err(Into::into)
1558 .and_then(std::convert::identity)
1559 }
1560
1561 fn spawn_blocking_task<F, R>(
1562 &self,
1563 f: F,
1564 ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1565 where
1566 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1567 R: Send + 'static,
1568 {
1569 let this = self.clone();
1570 let current_span = tracing::Span::current();
1571 let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1572 tokio::task::spawn_blocking(move || {
1573 let _guard = current_span.enter();
1574 let _elapsed = guard.stop_and_record();
1575 f(this)
1576 })
1577 }
1578
1579 fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1580 where
1581 F: FnOnce(Self) -> Fut + Send + 'static,
1582 Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1583 R: Send + 'static,
1584 {
1585 let this = self.clone();
1586 tokio::task::spawn(async move { f(this).await })
1587 }
1588}
1589
1590#[async_trait]
1591impl IndexerStore for PgIndexerStore {
1592 async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1593 self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1594 .await
1595 }
1596
1597 async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1598 self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1599 .await
1600 }
1601
1602 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1603 self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1604 .await
1605 }
1606
1607 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1608 self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1609 .await
1610 }
1611
1612 async fn get_latest_object_snapshot_checkpoint_sequence_number(
1613 &self,
1614 ) -> Result<Option<u64>, IndexerError> {
1615 self.execute_in_blocking_worker(|this| {
1616 this.get_latest_object_snapshot_checkpoint_sequence_number()
1617 })
1618 .await
1619 }
1620
1621 fn persist_objects_in_existing_transaction(
1622 &self,
1623 conn: &mut PgConnection,
1624 object_changes: Vec<TransactionObjectChangesToCommit>,
1625 ) -> Result<(), IndexerError> {
1626 if object_changes.is_empty() {
1627 return Ok(());
1628 }
1629
1630 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1631 let object_mutations = indexed_mutations
1632 .into_iter()
1633 .map(StoredObject::from)
1634 .collect::<Vec<_>>();
1635 let object_deletions = indexed_deletions
1636 .into_iter()
1637 .map(StoredDeletedObject::from)
1638 .collect::<Vec<_>>();
1639
1640 self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1641 self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1642
1643 Ok(())
1644 }
1645
1646 async fn persist_objects_snapshot(
1647 &self,
1648 object_changes: Vec<TransactionObjectChangesToCommit>,
1649 ) -> Result<(), IndexerError> {
1650 if object_changes.is_empty() {
1651 return Ok(());
1652 }
1653 let guard = self
1654 .metrics
1655 .checkpoint_db_commit_latency_objects_snapshot
1656 .start_timer();
1657 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1658 let objects_snapshot = indexed_mutations
1659 .into_iter()
1660 .map(StoredObjectSnapshot::from)
1661 .chain(
1662 indexed_deletions
1663 .into_iter()
1664 .map(StoredObjectSnapshot::from),
1665 )
1666 .collect::<Vec<_>>();
1667 let len = objects_snapshot.len();
1668 let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1669 let futures = chunks
1670 .into_iter()
1671 .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1672
1673 futures::future::try_join_all(futures)
1674 .await
1675 .map_err(|e| {
1676 tracing::error!("failed to join backfill_objects_snapshot_chunk futures: {e}");
1677 IndexerError::from(e)
1678 })?
1679 .into_iter()
1680 .collect::<Result<Vec<_>, _>>()
1681 .map_err(|e| {
1682 IndexerError::PostgresWrite(format!(
1683 "Failed to persist all objects snapshot chunks: {e:?}"
1684 ))
1685 })?;
1686 let elapsed = guard.stop_and_record();
1687 info!(elapsed, "Persisted {} objects snapshot", len);
1688 Ok(())
1689 }
1690
1691 async fn persist_object_history(
1692 &self,
1693 object_changes: Vec<TransactionObjectChangesToCommit>,
1694 ) -> Result<(), IndexerError> {
1695 let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1696 .map(|val| val.eq_ignore_ascii_case("true"))
1697 .unwrap_or(false);
1698 if skip_history {
1699 info!("skipping object history");
1700 return Ok(());
1701 }
1702
1703 if object_changes.is_empty() {
1704 return Ok(());
1705 }
1706 let objects = make_objects_history_to_commit(object_changes);
1707 let guard = self
1708 .metrics
1709 .checkpoint_db_commit_latency_objects_history
1710 .start_timer();
1711
1712 let len = objects.len();
1713 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1714 let futures = chunks
1715 .into_iter()
1716 .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1717
1718 futures::future::try_join_all(futures)
1719 .await
1720 .map_err(|e| {
1721 tracing::error!("failed to join persist_objects_history_chunk futures: {e}");
1722 IndexerError::from(e)
1723 })?
1724 .into_iter()
1725 .collect::<Result<Vec<_>, _>>()
1726 .map_err(|e| {
1727 IndexerError::PostgresWrite(format!(
1728 "Failed to persist all objects history chunks: {e:?}"
1729 ))
1730 })?;
1731 let elapsed = guard.stop_and_record();
1732 info!(elapsed, "Persisted {} objects history", len);
1733 Ok(())
1734 }
1735
1736 async fn persist_object_versions(
1737 &self,
1738 object_versions: Vec<StoredObjectVersion>,
1739 ) -> Result<(), IndexerError> {
1740 if object_versions.is_empty() {
1741 return Ok(());
1742 }
1743
1744 let guard = self
1745 .metrics
1746 .checkpoint_db_commit_latency_objects_version
1747 .start_timer();
1748
1749 let object_versions_count = object_versions.len();
1750
1751 let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1752 let futures = chunks
1753 .into_iter()
1754 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1755 .collect::<Vec<_>>();
1756
1757 futures::future::try_join_all(futures)
1758 .await
1759 .map_err(|e| {
1760 tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1761 IndexerError::from(e)
1762 })?
1763 .into_iter()
1764 .collect::<Result<Vec<_>, _>>()
1765 .map_err(|e| {
1766 IndexerError::PostgresWrite(format!(
1767 "Failed to persist all objects version chunks: {e:?}"
1768 ))
1769 })?;
1770 let elapsed = guard.stop_and_record();
1771 info!(elapsed, "Persisted {object_versions_count} object versions");
1772 Ok(())
1773 }
1774
1775 async fn persist_checkpoints(
1776 &self,
1777 checkpoints: Vec<IndexedCheckpoint>,
1778 ) -> Result<(), IndexerError> {
1779 self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1780 .await
1781 }
1782
1783 async fn persist_transactions(
1784 &self,
1785 transactions: Vec<IndexedTransaction>,
1786 ) -> Result<(), IndexerError> {
1787 let guard = self
1788 .metrics
1789 .checkpoint_db_commit_latency_transactions
1790 .start_timer();
1791 let len = transactions.len();
1792
1793 let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1794 let futures = chunks
1795 .into_iter()
1796 .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1797
1798 futures::future::try_join_all(futures)
1799 .await
1800 .map_err(|e| {
1801 tracing::error!("failed to join persist_transactions_chunk futures: {e}");
1802 IndexerError::from(e)
1803 })?
1804 .into_iter()
1805 .collect::<Result<Vec<_>, _>>()
1806 .map_err(|e| {
1807 IndexerError::PostgresWrite(format!(
1808 "Failed to persist all transactions chunks: {e:?}"
1809 ))
1810 })?;
1811 let elapsed = guard.stop_and_record();
1812 info!(elapsed, "Persisted {} transactions", len);
1813 Ok(())
1814 }
1815
1816 fn persist_optimistic_transaction_in_existing_transaction(
1817 &self,
1818 conn: &mut PgConnection,
1819 transaction: OptimisticTransaction,
1820 ) -> Result<(), IndexerError> {
1821 insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1822 Ok(())
1823 }
1824
1825 async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1826 if events.is_empty() {
1827 return Ok(());
1828 }
1829 let len = events.len();
1830 let guard = self
1831 .metrics
1832 .checkpoint_db_commit_latency_events
1833 .start_timer();
1834 let chunks = chunk!(events, self.config.parallel_chunk_size);
1835 let futures = chunks
1836 .into_iter()
1837 .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1838
1839 futures::future::try_join_all(futures)
1840 .await
1841 .map_err(|e| {
1842 tracing::error!("failed to join persist_events_chunk futures: {e}");
1843 IndexerError::from(e)
1844 })?
1845 .into_iter()
1846 .collect::<Result<Vec<_>, _>>()
1847 .map_err(|e| {
1848 IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
1849 })?;
1850 let elapsed = guard.stop_and_record();
1851 info!(elapsed, "Persisted {} events", len);
1852 Ok(())
1853 }
1854
1855 async fn persist_displays(
1856 &self,
1857 display_updates: BTreeMap<String, StoredDisplay>,
1858 ) -> Result<(), IndexerError> {
1859 if display_updates.is_empty() {
1860 return Ok(());
1861 }
1862
1863 self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
1864 .await?
1865 }
1866
1867 fn persist_displays_in_existing_transaction(
1868 &self,
1869 conn: &mut PgConnection,
1870 display_updates: Vec<&StoredDisplay>,
1871 ) -> Result<(), IndexerError> {
1872 if display_updates.is_empty() {
1873 return Ok(());
1874 }
1875
1876 on_conflict_do_update_with_condition!(
1877 display::table,
1878 display_updates,
1879 display::object_type,
1880 (
1881 display::id.eq(excluded(display::id)),
1882 display::version.eq(excluded(display::version)),
1883 display::bcs.eq(excluded(display::bcs)),
1884 ),
1885 excluded(display::version).gt(display::version),
1886 conn
1887 );
1888
1889 Ok(())
1890 }
1891
1892 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1893 if packages.is_empty() {
1894 return Ok(());
1895 }
1896 self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
1897 .await
1898 }
1899
1900 async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
1901 if indices.is_empty() {
1902 return Ok(());
1903 }
1904 let len = indices.len();
1905 let guard = self
1906 .metrics
1907 .checkpoint_db_commit_latency_event_indices
1908 .start_timer();
1909 let chunks = chunk!(indices, self.config.parallel_chunk_size);
1910
1911 let futures = chunks.into_iter().map(|chunk| {
1912 self.spawn_task(move |this: Self| async move {
1913 this.persist_event_indices_chunk(chunk).await
1914 })
1915 });
1916
1917 futures::future::try_join_all(futures)
1918 .await
1919 .map_err(|e| {
1920 tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
1921 IndexerError::from(e)
1922 })?
1923 .into_iter()
1924 .collect::<Result<Vec<_>, _>>()
1925 .map_err(|e| {
1926 IndexerError::PostgresWrite(format!(
1927 "Failed to persist all event_indices chunks: {e:?}"
1928 ))
1929 })?;
1930 let elapsed = guard.stop_and_record();
1931 info!(elapsed, "Persisted {} event_indices chunks", len);
1932 Ok(())
1933 }
1934
1935 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1936 self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
1937 .await
1938 }
1939
1940 async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1941 self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
1942 .await
1943 }
1944
1945 async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
1946 let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
1947 (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
1948 _ => Err(IndexerError::PostgresRead(format!(
1949 "Failed to get checkpoint range for epoch {epoch}"
1950 ))),
1951 }?;
1952
1953 let min_prunable_cp = self.get_min_prunable_checkpoint()?;
1958 min_cp = std::cmp::max(min_cp, min_prunable_cp);
1959 for cp in min_cp..=max_cp {
1960 info!(
1972 "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
1973 cp, epoch, min_prunable_cp
1974 );
1975 self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
1976 .await
1977 .unwrap_or_else(|e| {
1978 tracing::error!("failed to prune checkpoint {cp}: {e}");
1979 });
1980
1981 let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
1982 self.execute_in_blocking_worker(move |this| {
1983 this.prune_tx_indices_table(min_tx, max_tx)
1984 })
1985 .await
1986 .unwrap_or_else(|e| {
1987 tracing::error!("failed to prune transactions for cp {cp}: {e}");
1988 });
1989 info!(
1990 "Pruned transactions for checkpoint {} from tx {} to tx {}",
1991 cp, min_tx, max_tx
1992 );
1993 self.execute_in_blocking_worker(move |this| {
1994 this.prune_event_indices_table(min_tx, max_tx)
1995 })
1996 .await
1997 .unwrap_or_else(|e| {
1998 tracing::error!("failed to prune events of transactions for cp {cp}: {e}");
1999 });
2000 info!(
2001 "Pruned events of transactions for checkpoint {cp} from tx {min_tx} to tx {max_tx}"
2002 );
2003 self.metrics.last_pruned_transaction.set(max_tx as i64);
2004
2005 self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2006 .await
2007 .unwrap_or_else(|e| {
2008 tracing::error!("failed to prune pruner_cp_watermark table for cp {cp}: {e}");
2009 });
2010 info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2011 self.metrics.last_pruned_checkpoint.set(cp as i64);
2012 }
2013
2014 Ok(())
2015 }
2016
2017 async fn get_network_total_transactions_by_end_of_epoch(
2018 &self,
2019 epoch: u64,
2020 ) -> Result<Option<u64>, IndexerError> {
2021 self.execute_in_blocking_worker(move |this| {
2022 this.get_network_total_transactions_by_end_of_epoch(epoch)
2023 })
2024 .await
2025 }
2026
2027 async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2028 self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2029 .await
2030 }
2031
2032 fn as_any(&self) -> &dyn StdAny {
2033 self
2034 }
2035
2036 fn persist_protocol_configs_and_feature_flags(
2039 &self,
2040 chain_id: Vec<u8>,
2041 ) -> Result<(), IndexerError> {
2042 let chain_id = ChainIdentifier::from(
2043 CheckpointDigest::try_from(chain_id).expect("unable to convert chain id"),
2044 );
2045
2046 let mut all_configs = vec![];
2047 let mut all_flags = vec![];
2048
2049 let (start_version, end_version) = self.get_protocol_version_index_range()?;
2050 info!(
2051 "Persisting protocol configs with start_version: {}, end_version: {}",
2052 start_version, end_version
2053 );
2054
2055 for version in start_version..=end_version {
2058 let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2059 (version as u64).into(),
2060 chain_id.chain(),
2061 )
2062 .ok_or(IndexerError::Generic(format!(
2063 "Unable to fetch protocol version {} and chain {:?}",
2064 version,
2065 chain_id.chain()
2066 )))?;
2067 let configs_vec = protocol_configs
2068 .attr_map()
2069 .into_iter()
2070 .map(|(k, v)| StoredProtocolConfig {
2071 protocol_version: version,
2072 config_name: k,
2073 config_value: v.map(|v| v.to_string()),
2074 })
2075 .collect::<Vec<_>>();
2076 all_configs.extend(configs_vec);
2077
2078 let feature_flags = protocol_configs
2079 .feature_map()
2080 .into_iter()
2081 .map(|(k, v)| StoredFeatureFlag {
2082 protocol_version: version,
2083 flag_name: k,
2084 flag_value: v,
2085 })
2086 .collect::<Vec<_>>();
2087 all_flags.extend(feature_flags);
2088 }
2089
2090 transactional_blocking_with_retry!(
2091 &self.blocking_cp,
2092 |conn| {
2093 for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2094 insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2095 }
2096 insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2097 Ok::<(), IndexerError>(())
2098 },
2099 PG_DB_COMMIT_SLEEP_DURATION
2100 )?;
2101 Ok(())
2102 }
2103
2104 async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2105 if indices.is_empty() {
2106 return Ok(());
2107 }
2108 let len = indices.len();
2109 let guard = self
2110 .metrics
2111 .checkpoint_db_commit_latency_tx_indices
2112 .start_timer();
2113 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2114
2115 let futures = chunks.into_iter().map(|chunk| {
2116 self.spawn_task(move |this: Self| async move {
2117 this.persist_tx_indices_chunk_v2(chunk).await
2118 })
2119 });
2120 futures::future::try_join_all(futures)
2121 .await
2122 .map_err(|e| {
2123 tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2124 IndexerError::from(e)
2125 })?
2126 .into_iter()
2127 .collect::<Result<Vec<_>, _>>()
2128 .map_err(|e| {
2129 IndexerError::PostgresWrite(format!(
2130 "Failed to persist all tx_indices chunks: {e:?}"
2131 ))
2132 })?;
2133 let elapsed = guard.stop_and_record();
2134 info!(elapsed, "Persisted {} tx_indices chunks", len);
2135 Ok(())
2136 }
2137
2138 async fn persist_checkpoint_objects(
2139 &self,
2140 objects: Vec<CheckpointObjectChanges>,
2141 ) -> Result<(), IndexerError> {
2142 if objects.is_empty() {
2143 return Ok(());
2144 }
2145 let guard = self
2146 .metrics
2147 .checkpoint_db_commit_latency_objects
2148 .start_timer();
2149 let CheckpointObjectChanges {
2150 changed_objects: mutations,
2151 deleted_objects: deletions,
2152 } = retain_latest_objects_from_checkpoint_batch(objects);
2153 let mutation_len = mutations.len();
2154 let deletion_len = deletions.len();
2155
2156 let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2157 let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2158 let mutation_futures = mutation_chunks
2159 .into_iter()
2160 .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2161 let deletion_futures = deletion_chunks
2162 .into_iter()
2163 .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2164 futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2165 .await
2166 .map_err(|e| {
2167 tracing::error!("failed to join futures for persisting objects: {e}");
2168 IndexerError::from(e)
2169 })?
2170 .into_iter()
2171 .collect::<Result<Vec<_>, _>>()
2172 .map_err(|e| {
2173 IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2174 })?;
2175
2176 let elapsed = guard.stop_and_record();
2177 info!(
2178 elapsed,
2179 "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2180 );
2181 Ok(())
2182 }
2183
2184 async fn update_status_for_checkpoint_transactions(
2185 &self,
2186 tx_order: Vec<CheckpointTxGlobalOrder>,
2187 ) -> Result<(), IndexerError> {
2188 let guard = self
2189 .metrics
2190 .checkpoint_db_commit_latency_tx_insertion_order
2191 .start_timer();
2192 let len = tx_order.len();
2193
2194 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2195 let futures = chunks.into_iter().map(|c| {
2196 self.spawn_blocking_task(move |this| {
2197 this.update_status_for_checkpoint_transactions_chunk(c)
2198 })
2199 });
2200
2201 futures::future::try_join_all(futures)
2202 .await
2203 .map_err(|e| {
2204 tracing::error!(
2205 "failed to join update_status_for_checkpoint_transactions_chunk futures: {e}",
2206 );
2207 IndexerError::from(e)
2208 })?
2209 .into_iter()
2210 .collect::<Result<Vec<_>, _>>()
2211 .map_err(|e| {
2212 IndexerError::PostgresWrite(format!(
2213 "Failed to update all `tx_global_order` chunks: {e:?}",
2214 ))
2215 })?;
2216 let elapsed = guard.stop_and_record();
2217 info!(
2218 elapsed,
2219 "Updated index status for {len} txs insertion orders"
2220 );
2221 Ok(())
2222 }
2223
2224 async fn persist_tx_global_order(
2225 &self,
2226 tx_order: Vec<CheckpointTxGlobalOrder>,
2227 ) -> Result<(), IndexerError> {
2228 let guard = self
2229 .metrics
2230 .checkpoint_db_commit_latency_tx_insertion_order
2231 .start_timer();
2232 let len = tx_order.len();
2233
2234 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2235 let futures = chunks
2236 .into_iter()
2237 .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2238
2239 futures::future::try_join_all(futures)
2240 .await
2241 .map_err(|e| {
2242 tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2243 IndexerError::from(e)
2244 })?
2245 .into_iter()
2246 .collect::<Result<Vec<_>, _>>()
2247 .map_err(|e| {
2248 IndexerError::PostgresWrite(format!(
2249 "Failed to persist all txs insertion order chunks: {e:?}",
2250 ))
2251 })?;
2252 let elapsed = guard.stop_and_record();
2253 info!(elapsed, "Persisted {len} txs insertion orders");
2254 Ok(())
2255 }
2256}
2257
2258fn make_objects_history_to_commit(
2259 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2260) -> Vec<StoredHistoryObject> {
2261 let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2262 .clone()
2263 .into_iter()
2264 .flat_map(|changes| changes.deleted_objects)
2265 .map(|o| o.into())
2266 .collect();
2267 let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2268 .into_iter()
2269 .flat_map(|changes| changes.changed_objects)
2270 .map(|o| o.into())
2271 .collect();
2272 deleted_objects.into_iter().chain(mutated_objects).collect()
2273}
2274
2275fn retain_latest_indexed_objects(
2281 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2282) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2283 use std::collections::HashMap;
2284
2285 let mut mutations = HashMap::<ObjectID, IndexedObject>::new();
2286 let mut deletions = HashMap::<ObjectID, IndexedDeletedObject>::new();
2287
2288 for change in tx_object_changes {
2289 for mutation in change.changed_objects {
2293 let id = mutation.object.id();
2294 let version = mutation.object.version();
2295
2296 if let Some(existing) = deletions.remove(&id) {
2297 assert!(
2298 existing.object_version < version.value(),
2299 "mutation version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2300 existing.object_version
2301 );
2302 }
2303
2304 if let Some(existing) = mutations.insert(id, mutation) {
2305 assert!(
2306 existing.object.version() < version,
2307 "mutation version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2308 existing.object.version()
2309 );
2310 }
2311 }
2312 for deletion in change.deleted_objects {
2314 let id = deletion.object_id;
2315 let version = deletion.object_version;
2316
2317 if let Some(existing) = mutations.remove(&id) {
2318 assert!(
2319 existing.object.version().value() < version,
2320 "deletion version ({version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2321 existing.object.version(),
2322 );
2323 }
2324
2325 if let Some(existing) = deletions.insert(id, deletion) {
2326 assert!(
2327 existing.object_version < version,
2328 "deletion version ({version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2329 existing.object_version
2330 );
2331 }
2332 }
2333 }
2334
2335 (
2336 mutations.into_values().collect(),
2337 deletions.into_values().collect(),
2338 )
2339}