1use core::result::Result::Ok;
6use std::{
7 any::Any as StdAny,
8 collections::{BTreeMap, HashMap},
9 time::{Duration, Instant},
10};
11
12use async_trait::async_trait;
13use diesel::{
14 ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
15 dsl::{max, min},
16 upsert::excluded,
17};
18use downcast::Any;
19use futures::future::Either;
20use iota_protocol_config::ProtocolConfig;
21use iota_types::{
22 base_types::ObjectID,
23 digests::{ChainIdentifier, CheckpointDigest},
24};
25use itertools::Itertools;
26use tap::TapFallible;
27use tracing::info;
28
29use super::{
30 IndexerStore,
31 pg_partition_manager::{EpochPartitionData, PgPartitionManager},
32};
33use crate::{
34 db::ConnectionPool,
35 errors::{Context, IndexerError},
36 handlers::{EpochToCommit, TransactionObjectChangesToCommit},
37 insert_or_ignore_into,
38 metrics::IndexerMetrics,
39 models::{
40 checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
41 display::StoredDisplay,
42 epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
43 event_indices::OptimisticEventIndices,
44 events::{OptimisticEvent, StoredEvent},
45 obj_indices::StoredObjectVersion,
46 objects::{StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot},
47 packages::StoredPackage,
48 transactions::{OptimisticTransaction, StoredTransaction, TxInsertionOrder},
49 tx_indices::OptimisticTxIndices,
50 },
51 on_conflict_do_update, persist_chunk_into_table, read_only_blocking,
52 schema::{
53 chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
54 event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
55 event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
56 objects_version, optimistic_event_emit_module, optimistic_event_emit_package,
57 optimistic_event_senders, optimistic_event_struct_instantiation,
58 optimistic_event_struct_module, optimistic_event_struct_name,
59 optimistic_event_struct_package, optimistic_events, optimistic_transactions,
60 optimistic_tx_calls_fun, optimistic_tx_calls_mod, optimistic_tx_calls_pkg,
61 optimistic_tx_changed_objects, optimistic_tx_input_objects, optimistic_tx_kinds,
62 optimistic_tx_recipients, optimistic_tx_senders, packages, protocol_configs,
63 pruner_cp_watermark, transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg,
64 tx_changed_objects, tx_digests, tx_input_objects, tx_insertion_order, tx_kinds,
65 tx_recipients, tx_senders,
66 },
67 transactional_blocking_with_retry,
68 types::{
69 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
70 IndexedPackage, IndexedTransaction, TxIndex,
71 },
72};
73
74#[macro_export]
75macro_rules! chunk {
76 ($data: expr, $size: expr) => {{
77 $data
78 .into_iter()
79 .chunks($size)
80 .into_iter()
81 .map(|c| c.collect())
82 .collect::<Vec<Vec<_>>>()
83 }};
84}
85
86macro_rules! prune_tx_or_event_indice_table {
87 ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
88 diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
89 .execute($conn)
90 .map_err(IndexerError::from)
91 .context($context_msg)?;
92 };
93}
94
95const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
100const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
102const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
106const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
107
108#[derive(Clone)]
109pub struct PgIndexerStoreConfig {
110 pub parallel_chunk_size: usize,
111 pub parallel_objects_chunk_size: usize,
112 #[expect(unused)]
113 pub epochs_to_keep: Option<u64>,
114}
115
116pub struct PgIndexerStore {
117 blocking_cp: ConnectionPool,
118 metrics: IndexerMetrics,
119 partition_manager: PgPartitionManager,
120 config: PgIndexerStoreConfig,
121}
122
123impl Clone for PgIndexerStore {
124 fn clone(&self) -> PgIndexerStore {
125 Self {
126 blocking_cp: self.blocking_cp.clone(),
127 metrics: self.metrics.clone(),
128 partition_manager: self.partition_manager.clone(),
129 config: self.config.clone(),
130 }
131 }
132}
133
134impl PgIndexerStore {
135 pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
136 let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
137 .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
138 .parse::<usize>()
139 .unwrap();
140 let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
141 .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
142 .parse::<usize>()
143 .unwrap();
144 let epochs_to_keep = std::env::var("EPOCHS_TO_KEEP")
145 .map(|s| s.parse::<u64>().ok())
146 .unwrap_or_else(|_e| None);
147 let partition_manager = PgPartitionManager::new(blocking_cp.clone())
148 .expect("Failed to initialize partition manager");
149 let config = PgIndexerStoreConfig {
150 parallel_chunk_size,
151 parallel_objects_chunk_size,
152 epochs_to_keep,
153 };
154
155 Self {
156 blocking_cp,
157 metrics,
158 partition_manager,
159 config,
160 }
161 }
162
163 pub fn blocking_cp(&self) -> ConnectionPool {
164 self.blocking_cp.clone()
165 }
166
167 pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
168 read_only_blocking!(&self.blocking_cp, |conn| {
169 epochs::dsl::epochs
170 .select(max(epochs::epoch))
171 .first::<Option<i64>>(conn)
172 .map(|v| v.map(|v| v as u64))
173 })
174 .context("Failed reading latest epoch id from PostgresDB")
175 }
176
177 pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
179 let start = read_only_blocking!(&self.blocking_cp, |conn| {
182 protocol_configs::dsl::protocol_configs
183 .select(max(protocol_configs::protocol_version))
184 .first::<Option<i64>>(conn)
185 })
186 .context("Failed reading latest protocol version from PostgresDB")?
187 .map_or(1, |v| v + 1);
188
189 let end = read_only_blocking!(&self.blocking_cp, |conn| {
191 epochs::dsl::epochs
192 .select(max(epochs::protocol_version))
193 .first::<Option<i64>>(conn)
194 })
195 .context("Failed reading latest epoch protocol version from PostgresDB")?
196 .unwrap_or(1);
197 Ok((start, end))
198 }
199
200 pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
201 read_only_blocking!(&self.blocking_cp, |conn| {
202 chain_identifier::dsl::chain_identifier
203 .select(chain_identifier::checkpoint_digest)
204 .first::<Vec<u8>>(conn)
205 .optional()
206 })
207 .context("Failed reading chain id from PostgresDB")
208 }
209
210 fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
211 read_only_blocking!(&self.blocking_cp, |conn| {
212 checkpoints::dsl::checkpoints
213 .select(max(checkpoints::sequence_number))
214 .first::<Option<i64>>(conn)
215 .map(|v| v.map(|v| v as u64))
216 })
217 .context("Failed reading latest checkpoint sequence number from PostgresDB")
218 }
219
220 fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
221 read_only_blocking!(&self.blocking_cp, |conn| {
222 checkpoints::dsl::checkpoints
223 .select((
224 min(checkpoints::sequence_number),
225 max(checkpoints::sequence_number),
226 ))
227 .first::<(Option<i64>, Option<i64>)>(conn)
228 .map(|(min, max)| {
229 (
230 min.unwrap_or_default() as u64,
231 max.unwrap_or_default() as u64,
232 )
233 })
234 })
235 .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
236 }
237
238 fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
239 read_only_blocking!(&self.blocking_cp, |conn| {
240 epochs::dsl::epochs
241 .select((min(epochs::epoch), max(epochs::epoch)))
242 .first::<(Option<i64>, Option<i64>)>(conn)
243 .map(|(min, max)| {
244 (
245 min.unwrap_or_default() as u64,
246 max.unwrap_or_default() as u64,
247 )
248 })
249 })
250 .context("Failed reading min and max epoch numbers from PostgresDB")
251 }
252
253 fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
254 read_only_blocking!(&self.blocking_cp, |conn| {
255 pruner_cp_watermark::dsl::pruner_cp_watermark
256 .select(min(pruner_cp_watermark::checkpoint_sequence_number))
257 .first::<Option<i64>>(conn)
258 .map(|v| v.unwrap_or_default() as u64)
259 })
260 .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
261 }
262
263 fn get_checkpoint_range_for_epoch(
264 &self,
265 epoch: u64,
266 ) -> Result<(u64, Option<u64>), IndexerError> {
267 read_only_blocking!(&self.blocking_cp, |conn| {
268 epochs::dsl::epochs
269 .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
270 .filter(epochs::epoch.eq(epoch as i64))
271 .first::<(i64, Option<i64>)>(conn)
272 .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
273 })
274 .context("Failed reading checkpoint range from PostgresDB")
275 }
276
277 fn get_transaction_range_for_checkpoint(
278 &self,
279 checkpoint: u64,
280 ) -> Result<(u64, u64), IndexerError> {
281 read_only_blocking!(&self.blocking_cp, |conn| {
282 pruner_cp_watermark::dsl::pruner_cp_watermark
283 .select((
284 pruner_cp_watermark::min_tx_sequence_number,
285 pruner_cp_watermark::max_tx_sequence_number,
286 ))
287 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
288 .first::<(i64, i64)>(conn)
289 .map(|(min, max)| (min as u64, max as u64))
290 })
291 .context("Failed reading transaction range from PostgresDB")
292 }
293
294 fn get_latest_object_snapshot_checkpoint_sequence_number(
295 &self,
296 ) -> Result<Option<u64>, IndexerError> {
297 read_only_blocking!(&self.blocking_cp, |conn| {
298 objects_snapshot::dsl::objects_snapshot
299 .select(max(objects_snapshot::checkpoint_sequence_number))
300 .first::<Option<i64>>(conn)
301 .map(|v| v.map(|v| v as u64))
302 })
303 .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
304 }
305
306 fn persist_display_updates(
307 &self,
308 display_updates: BTreeMap<String, StoredDisplay>,
309 ) -> Result<(), IndexerError> {
310 transactional_blocking_with_retry!(
311 &self.blocking_cp,
312 |conn| {
313 on_conflict_do_update!(
314 display::table,
315 display_updates.values().collect::<Vec<_>>(),
316 display::object_type,
317 (
318 display::id.eq(excluded(display::id)),
319 display::version.eq(excluded(display::version)),
320 display::bcs.eq(excluded(display::bcs)),
321 ),
322 conn
323 );
324 Ok::<(), IndexerError>(())
325 },
326 PG_DB_COMMIT_SLEEP_DURATION
327 )?;
328
329 Ok(())
330 }
331
332 fn persist_object_mutation_chunk(
333 &self,
334 mutated_object_mutation_chunk: Vec<StoredObject>,
335 ) -> Result<(), IndexerError> {
336 let guard = self
337 .metrics
338 .checkpoint_db_commit_latency_objects_chunks
339 .start_timer();
340 let len = mutated_object_mutation_chunk.len();
341 transactional_blocking_with_retry!(
342 &self.blocking_cp,
343 |conn| {
344 on_conflict_do_update!(
345 objects::table,
346 mutated_object_mutation_chunk.clone(),
347 objects::object_id,
348 (
349 objects::object_id.eq(excluded(objects::object_id)),
350 objects::object_version.eq(excluded(objects::object_version)),
351 objects::object_digest.eq(excluded(objects::object_digest)),
352 objects::owner_type.eq(excluded(objects::owner_type)),
353 objects::owner_id.eq(excluded(objects::owner_id)),
354 objects::object_type.eq(excluded(objects::object_type)),
355 objects::serialized_object.eq(excluded(objects::serialized_object)),
356 objects::coin_type.eq(excluded(objects::coin_type)),
357 objects::coin_balance.eq(excluded(objects::coin_balance)),
358 objects::df_kind.eq(excluded(objects::df_kind)),
359 ),
360 conn
361 );
362 Ok::<(), IndexerError>(())
363 },
364 PG_DB_COMMIT_SLEEP_DURATION
365 )
366 .tap_ok(|_| {
367 let elapsed = guard.stop_and_record();
368 info!(elapsed, "Persisted {} chunked objects", len);
369 })
370 .tap_err(|e| {
371 tracing::error!("Failed to persist object mutations with error: {}", e);
372 })
373 }
374
375 fn persist_object_deletion_chunk(
376 &self,
377 deleted_objects_chunk: Vec<StoredDeletedObject>,
378 ) -> Result<(), IndexerError> {
379 let guard = self
380 .metrics
381 .checkpoint_db_commit_latency_objects_chunks
382 .start_timer();
383 let len = deleted_objects_chunk.len();
384 transactional_blocking_with_retry!(
385 &self.blocking_cp,
386 |conn| {
387 diesel::delete(
388 objects::table.filter(
389 objects::object_id.eq_any(
390 deleted_objects_chunk
391 .iter()
392 .map(|o| o.object_id.clone())
393 .collect::<Vec<_>>(),
394 ),
395 ),
396 )
397 .execute(conn)
398 .map_err(IndexerError::from)
399 .context("Failed to write object deletion to PostgresDB")?;
400
401 Ok::<(), IndexerError>(())
402 },
403 PG_DB_COMMIT_SLEEP_DURATION
404 )
405 .tap_ok(|_| {
406 let elapsed = guard.stop_and_record();
407 info!(elapsed, "Deleted {} chunked objects", len);
408 })
409 .tap_err(|e| {
410 tracing::error!("Failed to persist object deletions with error: {}", e);
411 })
412 }
413
414 fn backfill_objects_snapshot_chunk(
415 &self,
416 objects_snapshot: Vec<StoredObjectSnapshot>,
417 ) -> Result<(), IndexerError> {
418 let guard = self
419 .metrics
420 .checkpoint_db_commit_latency_objects_snapshot_chunks
421 .start_timer();
422 transactional_blocking_with_retry!(
423 &self.blocking_cp,
424 |conn| {
425 for objects_snapshot_chunk in
426 objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
427 {
428 on_conflict_do_update!(
429 objects_snapshot::table,
430 objects_snapshot_chunk,
431 objects_snapshot::object_id,
432 (
433 objects_snapshot::object_version
434 .eq(excluded(objects_snapshot::object_version)),
435 objects_snapshot::object_status
436 .eq(excluded(objects_snapshot::object_status)),
437 objects_snapshot::object_digest
438 .eq(excluded(objects_snapshot::object_digest)),
439 objects_snapshot::checkpoint_sequence_number
440 .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
441 objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
442 objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
443 objects_snapshot::object_type_package
444 .eq(excluded(objects_snapshot::object_type_package)),
445 objects_snapshot::object_type_module
446 .eq(excluded(objects_snapshot::object_type_module)),
447 objects_snapshot::object_type_name
448 .eq(excluded(objects_snapshot::object_type_name)),
449 objects_snapshot::object_type
450 .eq(excluded(objects_snapshot::object_type)),
451 objects_snapshot::serialized_object
452 .eq(excluded(objects_snapshot::serialized_object)),
453 objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
454 objects_snapshot::coin_balance
455 .eq(excluded(objects_snapshot::coin_balance)),
456 objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
457 ),
458 conn
459 );
460 }
461 Ok::<(), IndexerError>(())
462 },
463 PG_DB_COMMIT_SLEEP_DURATION
464 )
465 .tap_ok(|_| {
466 let elapsed = guard.stop_and_record();
467 info!(
468 elapsed,
469 "Persisted {} chunked objects snapshot",
470 objects_snapshot.len(),
471 );
472 })
473 .tap_err(|e| {
474 tracing::error!("Failed to persist object snapshot with error: {}", e);
475 })
476 }
477
478 fn persist_objects_history_chunk(
479 &self,
480 stored_objects_history: Vec<StoredHistoryObject>,
481 ) -> Result<(), IndexerError> {
482 let guard = self
483 .metrics
484 .checkpoint_db_commit_latency_objects_history_chunks
485 .start_timer();
486 transactional_blocking_with_retry!(
487 &self.blocking_cp,
488 |conn| {
489 for stored_objects_history_chunk in
490 stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
491 {
492 insert_or_ignore_into!(
493 objects_history::table,
494 stored_objects_history_chunk,
495 conn
496 );
497 }
498 Ok::<(), IndexerError>(())
499 },
500 PG_DB_COMMIT_SLEEP_DURATION
501 )
502 .tap_ok(|_| {
503 let elapsed = guard.stop_and_record();
504 info!(
505 elapsed,
506 "Persisted {} chunked objects history",
507 stored_objects_history.len(),
508 );
509 })
510 .tap_err(|e| {
511 tracing::error!("Failed to persist object history with error: {}", e);
512 })
513 }
514
515 fn persist_object_version_chunk(
516 &self,
517 object_versions: Vec<StoredObjectVersion>,
518 ) -> Result<(), IndexerError> {
519 transactional_blocking_with_retry!(
520 &self.blocking_cp,
521 |conn| {
522 for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
523 {
524 insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
525 }
526 Ok::<(), IndexerError>(())
527 },
528 PG_DB_COMMIT_SLEEP_DURATION
529 )
530 .tap_ok(|_| {
531 info!(
532 "Persisted {} chunked object versions",
533 object_versions.len(),
534 );
535 })
536 .tap_err(|e| {
537 tracing::error!("Failed to persist object version chunk with error: {}", e);
538 })
539 }
540
541 fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
542 let Some(first_checkpoint) = checkpoints.first() else {
543 return Ok(());
544 };
545
546 if first_checkpoint.sequence_number == 0 {
549 let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
550 self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
551 transactional_blocking_with_retry!(
552 &self.blocking_cp,
553 |conn| {
554 let checkpoint_digest =
555 first_checkpoint.checkpoint_digest.into_inner().to_vec();
556 insert_or_ignore_into!(
557 chain_identifier::table,
558 StoredChainIdentifier { checkpoint_digest },
559 conn
560 );
561 Ok::<(), IndexerError>(())
562 },
563 PG_DB_COMMIT_SLEEP_DURATION
564 )?;
565 }
566 let guard = self
567 .metrics
568 .checkpoint_db_commit_latency_checkpoints
569 .start_timer();
570
571 let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
572 transactional_blocking_with_retry!(
573 &self.blocking_cp,
574 |conn| {
575 for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
576 insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
577 }
578 Ok::<(), IndexerError>(())
579 },
580 PG_DB_COMMIT_SLEEP_DURATION
581 )
582 .tap_ok(|_| {
583 info!(
584 "Persisted {} pruner_cp_watermark rows.",
585 stored_cp_txs.len(),
586 );
587 })
588 .tap_err(|e| {
589 tracing::error!("Failed to persist pruner_cp_watermark with error: {}", e);
590 })?;
591
592 let stored_checkpoints = checkpoints
593 .iter()
594 .map(StoredCheckpoint::from)
595 .collect::<Vec<_>>();
596 transactional_blocking_with_retry!(
597 &self.blocking_cp,
598 |conn| {
599 for stored_checkpoint_chunk in
600 stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
601 {
602 insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
603 let time_now_ms = chrono::Utc::now().timestamp_millis();
604 for stored_checkpoint in stored_checkpoint_chunk {
605 self.metrics
606 .db_commit_lag_ms
607 .set(time_now_ms - stored_checkpoint.timestamp_ms);
608 self.metrics.max_committed_checkpoint_sequence_number.set(
609 stored_checkpoint.sequence_number,
610 );
611 self.metrics.committed_checkpoint_timestamp_ms.set(
612 stored_checkpoint.timestamp_ms,
613 );
614 }
615 for stored_checkpoint in stored_checkpoint_chunk {
616 info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
617 }
618 }
619 Ok::<(), IndexerError>(())
620 },
621 PG_DB_COMMIT_SLEEP_DURATION
622 )
623 .tap_ok(|_| {
624 let elapsed = guard.stop_and_record();
625 info!(
626 elapsed,
627 "Persisted {} checkpoints",
628 stored_checkpoints.len()
629 );
630 })
631 .tap_err(|e| {
632 tracing::error!("Failed to persist checkpoints with error: {}", e);
633 })
634 }
635
636 fn persist_transactions_chunk(
637 &self,
638 transactions: Vec<IndexedTransaction>,
639 ) -> Result<(), IndexerError> {
640 let guard = self
641 .metrics
642 .checkpoint_db_commit_latency_transactions_chunks
643 .start_timer();
644 let transformation_guard = self
645 .metrics
646 .checkpoint_db_commit_latency_transactions_chunks_transformation
647 .start_timer();
648 let transactions = transactions
649 .iter()
650 .map(StoredTransaction::from)
651 .collect::<Vec<_>>();
652 drop(transformation_guard);
653
654 transactional_blocking_with_retry!(
655 &self.blocking_cp,
656 |conn| {
657 for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
658 insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
659 }
660 Ok::<(), IndexerError>(())
661 },
662 PG_DB_COMMIT_SLEEP_DURATION
663 )
664 .tap_ok(|_| {
665 let elapsed = guard.stop_and_record();
666 info!(
667 elapsed,
668 "Persisted {} chunked transactions",
669 transactions.len()
670 );
671 })
672 .tap_err(|e| {
673 tracing::error!("Failed to persist transactions with error: {}", e);
674 })
675 }
676
677 fn persist_tx_insertion_order_chunk(
678 &self,
679 tx_order: Vec<TxInsertionOrder>,
680 ) -> Result<(), IndexerError> {
681 let guard = self
682 .metrics
683 .checkpoint_db_commit_latency_tx_insertion_order_chunks
684 .start_timer();
685
686 transactional_blocking_with_retry!(
687 &self.blocking_cp,
688 |conn| {
689 for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
690 insert_or_ignore_into!(tx_insertion_order::table, tx_order_chunk, conn);
691 }
692 Ok::<(), IndexerError>(())
693 },
694 PG_DB_COMMIT_SLEEP_DURATION
695 )
696 .tap_ok(|_| {
697 let elapsed = guard.stop_and_record();
698 info!(
699 elapsed,
700 "Persisted {} chunked txs insertion order",
701 tx_order.len()
702 );
703 })
704 .tap_err(|e| {
705 tracing::error!("Failed to persist txs insertion order with error: {e}");
706 })
707 }
708
709 fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
710 let guard = self
711 .metrics
712 .checkpoint_db_commit_latency_events_chunks
713 .start_timer();
714 let len = events.len();
715 let events = events
716 .into_iter()
717 .map(StoredEvent::from)
718 .collect::<Vec<_>>();
719
720 transactional_blocking_with_retry!(
721 &self.blocking_cp,
722 |conn| {
723 for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
724 insert_or_ignore_into!(events::table, event_chunk, conn);
725 }
726 Ok::<(), IndexerError>(())
727 },
728 PG_DB_COMMIT_SLEEP_DURATION
729 )
730 .tap_ok(|_| {
731 let elapsed = guard.stop_and_record();
732 info!(elapsed, "Persisted {} chunked events", len);
733 })
734 .tap_err(|e| {
735 tracing::error!("Failed to persist events with error: {}", e);
736 })
737 }
738
739 fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
740 if packages.is_empty() {
741 return Ok(());
742 }
743 let guard = self
744 .metrics
745 .checkpoint_db_commit_latency_packages
746 .start_timer();
747 let packages = packages
748 .into_iter()
749 .map(StoredPackage::from)
750 .collect::<Vec<_>>();
751 transactional_blocking_with_retry!(
752 &self.blocking_cp,
753 |conn| {
754 for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
755 on_conflict_do_update!(
756 packages::table,
757 packages_chunk,
758 packages::package_id,
759 (
760 packages::package_id.eq(excluded(packages::package_id)),
761 packages::move_package.eq(excluded(packages::move_package)),
762 ),
763 conn
764 );
765 }
766 Ok::<(), IndexerError>(())
767 },
768 PG_DB_COMMIT_SLEEP_DURATION
769 )
770 .tap_ok(|_| {
771 let elapsed = guard.stop_and_record();
772 info!(elapsed, "Persisted {} packages", packages.len());
773 })
774 .tap_err(|e| {
775 tracing::error!("Failed to persist packages with error: {}", e);
776 })
777 }
778
779 async fn persist_event_indices_chunk(
780 &self,
781 indices: Vec<EventIndex>,
782 ) -> Result<(), IndexerError> {
783 let guard = self
784 .metrics
785 .checkpoint_db_commit_latency_event_indices_chunks
786 .start_timer();
787 let len = indices.len();
788 let (
789 event_emit_packages,
790 event_emit_modules,
791 event_senders,
792 event_struct_packages,
793 event_struct_modules,
794 event_struct_names,
795 event_struct_instantiations,
796 ) = indices.into_iter().map(|i| i.split()).fold(
797 (
798 Vec::new(),
799 Vec::new(),
800 Vec::new(),
801 Vec::new(),
802 Vec::new(),
803 Vec::new(),
804 Vec::new(),
805 ),
806 |(
807 mut event_emit_packages,
808 mut event_emit_modules,
809 mut event_senders,
810 mut event_struct_packages,
811 mut event_struct_modules,
812 mut event_struct_names,
813 mut event_struct_instantiations,
814 ),
815 index| {
816 event_emit_packages.push(index.0);
817 event_emit_modules.push(index.1);
818 event_senders.push(index.2);
819 event_struct_packages.push(index.3);
820 event_struct_modules.push(index.4);
821 event_struct_names.push(index.5);
822 event_struct_instantiations.push(index.6);
823 (
824 event_emit_packages,
825 event_emit_modules,
826 event_senders,
827 event_struct_packages,
828 event_struct_modules,
829 event_struct_names,
830 event_struct_instantiations,
831 )
832 },
833 );
834
835 let mut futures = vec![];
837 futures.push(self.spawn_blocking_task(move |this| {
838 persist_chunk_into_table!(
839 event_emit_package::table,
840 event_emit_packages,
841 &this.blocking_cp
842 )
843 }));
844
845 futures.push(self.spawn_blocking_task(move |this| {
846 persist_chunk_into_table!(
847 event_emit_module::table,
848 event_emit_modules,
849 &this.blocking_cp
850 )
851 }));
852
853 futures.push(self.spawn_blocking_task(move |this| {
854 persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
855 }));
856
857 futures.push(self.spawn_blocking_task(move |this| {
858 persist_chunk_into_table!(
859 event_struct_package::table,
860 event_struct_packages,
861 &this.blocking_cp
862 )
863 }));
864
865 futures.push(self.spawn_blocking_task(move |this| {
866 persist_chunk_into_table!(
867 event_struct_module::table,
868 event_struct_modules,
869 &this.blocking_cp
870 )
871 }));
872
873 futures.push(self.spawn_blocking_task(move |this| {
874 persist_chunk_into_table!(
875 event_struct_name::table,
876 event_struct_names,
877 &this.blocking_cp
878 )
879 }));
880
881 futures.push(self.spawn_blocking_task(move |this| {
882 persist_chunk_into_table!(
883 event_struct_instantiation::table,
884 event_struct_instantiations,
885 &this.blocking_cp
886 )
887 }));
888
889 futures::future::try_join_all(futures)
890 .await
891 .map_err(|e| {
892 tracing::error!("Failed to join event indices futures in a chunk: {}", e);
893 IndexerError::from(e)
894 })?
895 .into_iter()
896 .collect::<Result<Vec<_>, _>>()
897 .map_err(|e| {
898 IndexerError::PostgresWrite(format!(
899 "Failed to persist all event indices in a chunk: {:?}",
900 e
901 ))
902 })?;
903 let elapsed = guard.stop_and_record();
904 info!(elapsed, "Persisted {} chunked event indices", len);
905 Ok(())
906 }
907
908 async fn persist_tx_indices_chunk(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
909 let guard = self
910 .metrics
911 .checkpoint_db_commit_latency_tx_indices_chunks
912 .start_timer();
913 let len = indices.len();
914 let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) =
915 indices.into_iter().map(|i| i.split()).fold(
916 (
917 Vec::new(),
918 Vec::new(),
919 Vec::new(),
920 Vec::new(),
921 Vec::new(),
922 Vec::new(),
923 Vec::new(),
924 Vec::new(),
925 Vec::new(),
926 ),
927 |(
928 mut tx_senders,
929 mut tx_recipients,
930 mut tx_input_objects,
931 mut tx_changed_objects,
932 mut tx_pkgs,
933 mut tx_mods,
934 mut tx_funs,
935 mut tx_digests,
936 mut tx_kinds,
937 ),
938 index| {
939 tx_senders.extend(index.0);
940 tx_recipients.extend(index.1);
941 tx_input_objects.extend(index.2);
942 tx_changed_objects.extend(index.3);
943 tx_pkgs.extend(index.4);
944 tx_mods.extend(index.5);
945 tx_funs.extend(index.6);
946 tx_digests.extend(index.7);
947 tx_kinds.extend(index.8);
948 (
949 tx_senders,
950 tx_recipients,
951 tx_input_objects,
952 tx_changed_objects,
953 tx_pkgs,
954 tx_mods,
955 tx_funs,
956 tx_digests,
957 tx_kinds,
958 )
959 },
960 );
961
962 let mut futures = vec![];
963 futures.push(self.spawn_blocking_task(move |this| {
964 let now = Instant::now();
965 let senders_len = senders.len();
966 let recipients_len = recipients.len();
967 transactional_blocking_with_retry!(
968 &this.blocking_cp,
969 |conn| {
970 for chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
971 insert_or_ignore_into!(tx_senders::table, chunk, conn);
972 }
973 for chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
974 insert_or_ignore_into!(tx_recipients::table, chunk, conn);
975 }
976 Ok::<(), IndexerError>(())
977 },
978 PG_DB_COMMIT_SLEEP_DURATION
979 )
980 .tap_ok(|_| {
981 let elapsed = now.elapsed().as_secs_f64();
982 info!(
983 elapsed,
984 "Persisted {} rows to tx_senders and {} rows to tx_recipients",
985 senders_len,
986 recipients_len,
987 );
988 })
989 .tap_err(|e| {
990 tracing::error!(
991 "Failed to persist tx_senders and tx_recipients with error: {}",
992 e
993 );
994 })
995 }));
996
997 futures.push(self.spawn_blocking_task(move |this| {
998 let now = Instant::now();
999 let input_objects_len = input_objects.len();
1000 transactional_blocking_with_retry!(
1001 &this.blocking_cp,
1002 |conn| {
1003 for chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1004 insert_or_ignore_into!(tx_input_objects::table, chunk, conn);
1005 }
1006 Ok::<(), IndexerError>(())
1007 },
1008 PG_DB_COMMIT_SLEEP_DURATION
1009 )
1010 .tap_ok(|_| {
1011 let elapsed = now.elapsed().as_secs_f64();
1012 info!(
1013 elapsed,
1014 "Persisted {} rows to tx_input_objects", input_objects_len,
1015 );
1016 })
1017 .tap_err(|e| {
1018 tracing::error!("Failed to persist tx_input_objects with error: {}", e);
1019 })
1020 }));
1021
1022 futures.push(self.spawn_blocking_task(move |this| {
1023 let now = Instant::now();
1024 let changed_objects_len = changed_objects.len();
1025 transactional_blocking_with_retry!(
1026 &this.blocking_cp,
1027 |conn| {
1028 for chunk in changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1029 insert_or_ignore_into!(tx_changed_objects::table, chunk, conn);
1030 }
1031 Ok::<(), IndexerError>(())
1032 },
1033 PG_DB_COMMIT_SLEEP_DURATION
1034 )
1035 .tap_ok(|_| {
1036 let elapsed = now.elapsed().as_secs_f64();
1037 info!(
1038 elapsed,
1039 "Persisted {} rows to tx_changed_objects table", changed_objects_len,
1040 );
1041 })
1042 .tap_err(|e| {
1043 tracing::error!("Failed to persist tx_changed_objects with error: {}", e);
1044 })
1045 }));
1046
1047 futures.push(self.spawn_blocking_task(move |this| {
1048 let now = Instant::now();
1049 let rows_len = pkgs.len();
1050 transactional_blocking_with_retry!(
1051 &this.blocking_cp,
1052 |conn| {
1053 for chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1054 insert_or_ignore_into!(tx_calls_pkg::table, chunk, conn);
1055 }
1056 Ok::<(), IndexerError>(())
1057 },
1058 PG_DB_COMMIT_SLEEP_DURATION
1059 )
1060 .tap_ok(|_| {
1061 let elapsed = now.elapsed().as_secs_f64();
1062 info!(
1063 elapsed,
1064 "Persisted {} rows to tx_calls_pkg tables", rows_len
1065 );
1066 })
1067 .tap_err(|e| {
1068 tracing::error!("Failed to persist tx_calls_pkg with error: {}", e);
1069 })
1070 }));
1071
1072 futures.push(self.spawn_blocking_task(move |this| {
1073 let now = Instant::now();
1074 let rows_len = mods.len();
1075 transactional_blocking_with_retry!(
1076 &this.blocking_cp,
1077 |conn| {
1078 for chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1079 insert_or_ignore_into!(tx_calls_mod::table, chunk, conn);
1080 }
1081 Ok::<(), IndexerError>(())
1082 },
1083 PG_DB_COMMIT_SLEEP_DURATION
1084 )
1085 .tap_ok(|_| {
1086 let elapsed = now.elapsed().as_secs_f64();
1087 info!(elapsed, "Persisted {} rows to tx_calls_mod table", rows_len);
1088 })
1089 .tap_err(|e| {
1090 tracing::error!("Failed to persist tx_calls_mod with error: {}", e);
1091 })
1092 }));
1093
1094 futures.push(self.spawn_blocking_task(move |this| {
1095 let now = Instant::now();
1096 let rows_len = funs.len();
1097 transactional_blocking_with_retry!(
1098 &this.blocking_cp,
1099 |conn| {
1100 for chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1101 insert_or_ignore_into!(tx_calls_fun::table, chunk, conn);
1102 }
1103 Ok::<(), IndexerError>(())
1104 },
1105 PG_DB_COMMIT_SLEEP_DURATION
1106 )
1107 .tap_ok(|_| {
1108 let elapsed = now.elapsed().as_secs_f64();
1109 info!(elapsed, "Persisted {} rows to tx_calls_fun table", rows_len);
1110 })
1111 .tap_err(|e| {
1112 tracing::error!("Failed to persist tx_calls_fun with error: {}", e);
1113 })
1114 }));
1115
1116 futures.push(self.spawn_blocking_task(move |this| {
1117 let now = Instant::now();
1118 let calls_len = digests.len();
1119 transactional_blocking_with_retry!(
1120 &this.blocking_cp,
1121 |conn| {
1122 for chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1123 insert_or_ignore_into!(tx_digests::table, chunk, conn);
1124 }
1125 Ok::<(), IndexerError>(())
1126 },
1127 Duration::from_secs(60)
1128 )
1129 .tap_ok(|_| {
1130 let elapsed = now.elapsed().as_secs_f64();
1131 info!(elapsed, "Persisted {} rows to tx_digests tables", calls_len);
1132 })
1133 .tap_err(|e| {
1134 tracing::error!("Failed to persist tx_digests with error: {}", e);
1135 })
1136 }));
1137
1138 futures.push(self.spawn_blocking_task(move |this| {
1139 let now = Instant::now();
1140 let rows_len = kinds.len();
1141 transactional_blocking_with_retry!(
1142 &this.blocking_cp,
1143 |conn| {
1144 for chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1145 insert_or_ignore_into!(tx_kinds::table, chunk, conn);
1146 }
1147 Ok::<(), IndexerError>(())
1148 },
1149 Duration::from_secs(60)
1150 )
1151 .tap_ok(|_| {
1152 let elapsed = now.elapsed().as_secs_f64();
1153 info!(elapsed, "Persisted {} rows to tx_kinds tables", rows_len);
1154 })
1155 .tap_err(|e| {
1156 tracing::error!("Failed to persist tx_kinds with error: {}", e);
1157 })
1158 }));
1159
1160 futures::future::try_join_all(futures)
1161 .await
1162 .map_err(|e| {
1163 tracing::error!("Failed to join tx indices futures in a chunk: {}", e);
1164 IndexerError::from(e)
1165 })?
1166 .into_iter()
1167 .collect::<Result<Vec<_>, _>>()
1168 .map_err(|e| {
1169 IndexerError::PostgresWrite(format!(
1170 "Failed to persist all tx indices in a chunk: {:?}",
1171 e
1172 ))
1173 })?;
1174 let elapsed = guard.stop_and_record();
1175 info!(elapsed, "Persisted {} chunked tx_indices", len);
1176 Ok(())
1177 }
1178
1179 fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1180 let guard = self
1181 .metrics
1182 .checkpoint_db_commit_latency_epoch
1183 .start_timer();
1184 let epoch_id = epoch.new_epoch.epoch;
1185
1186 transactional_blocking_with_retry!(
1187 &self.blocking_cp,
1188 |conn| {
1189 if let Some(last_epoch) = &epoch.last_epoch {
1190 let last_epoch_id = last_epoch.epoch;
1191 let previous_epoch_network_total_transactions = match epoch_id {
1196 0 | 1 => 0,
1197 _ => {
1198 let prev_epoch_id = epoch_id - 2;
1199 let result = checkpoints::table
1200 .filter(checkpoints::epoch.eq(prev_epoch_id as i64))
1201 .select(max(checkpoints::network_total_transactions))
1202 .first::<Option<i64>>(conn)
1203 .map(|o| o.unwrap_or(0))?;
1204
1205 result as u64
1206 }
1207 };
1208
1209 let epoch_total_transactions = epoch.network_total_transactions
1210 - previous_epoch_network_total_transactions;
1211
1212 let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch);
1213 last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64);
1214 info!(last_epoch_id, "Persisting epoch end data.");
1215 on_conflict_do_update!(
1216 epochs::table,
1217 vec![last_epoch],
1218 epochs::epoch,
1219 (
1220 epochs::system_state.eq(excluded(epochs::system_state)),
1225 epochs::epoch_total_transactions
1226 .eq(excluded(epochs::epoch_total_transactions)),
1227 epochs::last_checkpoint_id.eq(excluded(epochs::last_checkpoint_id)),
1228 epochs::epoch_end_timestamp.eq(excluded(epochs::epoch_end_timestamp)),
1229 epochs::storage_charge.eq(excluded(epochs::storage_charge)),
1230 epochs::storage_rebate.eq(excluded(epochs::storage_rebate)),
1231 epochs::total_gas_fees.eq(excluded(epochs::total_gas_fees)),
1232 epochs::total_stake_rewards_distributed
1233 .eq(excluded(epochs::total_stake_rewards_distributed)),
1234 epochs::epoch_commitments.eq(excluded(epochs::epoch_commitments)),
1235 epochs::burnt_tokens_amount.eq(excluded(epochs::burnt_tokens_amount)),
1236 epochs::minted_tokens_amount.eq(excluded(epochs::minted_tokens_amount)),
1237 ),
1238 conn
1239 );
1240 }
1241
1242 let epoch_id = epoch.new_epoch.epoch;
1243 info!(epoch_id, "Persisting epoch beginning info");
1244 let new_epoch = StoredEpochInfo::from_epoch_beginning_info(&epoch.new_epoch);
1245 insert_or_ignore_into!(epochs::table, new_epoch, conn);
1246 Ok::<(), IndexerError>(())
1247 },
1248 PG_DB_COMMIT_SLEEP_DURATION
1249 )
1250 .tap_ok(|_| {
1251 let elapsed = guard.stop_and_record();
1252 info!(elapsed, epoch_id, "Persisted epoch beginning info");
1253 })
1254 .tap_err(|e| {
1255 tracing::error!("Failed to persist epoch with error: {}", e);
1256 })
1257 }
1258
1259 fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1260 let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1261 if let Some(last_epoch_id) = last_epoch_id {
1263 let last_db_epoch: Option<StoredEpochInfo> =
1264 read_only_blocking!(&self.blocking_cp, |conn| {
1265 epochs::table
1266 .filter(epochs::epoch.eq(last_epoch_id as i64))
1267 .first::<StoredEpochInfo>(conn)
1268 .optional()
1269 })
1270 .context("Failed to read last epoch from PostgresDB")?;
1271 if let Some(last_epoch) = last_db_epoch {
1272 let epoch_partition_data =
1273 EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1274 let table_partitions = self.partition_manager.get_table_partitions()?;
1275 for (table, (_, last_partition)) in table_partitions {
1276 if !self
1278 .partition_manager
1279 .get_strategy(&table)
1280 .is_epoch_partitioned()
1281 {
1282 continue;
1283 }
1284 let guard = self.metrics.advance_epoch_latency.start_timer();
1285 self.partition_manager.advance_epoch(
1286 table.clone(),
1287 last_partition,
1288 &epoch_partition_data,
1289 )?;
1290 let elapsed = guard.stop_and_record();
1291 info!(
1292 elapsed,
1293 "Advanced epoch partition {} for table {}",
1294 last_partition,
1295 table.clone()
1296 );
1297 }
1298 } else {
1299 tracing::error!("Last epoch: {} from PostgresDB is None.", last_epoch_id);
1300 }
1301 }
1302
1303 Ok(())
1304 }
1305
1306 fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1307 transactional_blocking_with_retry!(
1308 &self.blocking_cp,
1309 |conn| {
1310 diesel::delete(
1311 checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1312 )
1313 .execute(conn)
1314 .map_err(IndexerError::from)
1315 .context("Failed to prune checkpoints table")?;
1316
1317 Ok::<(), IndexerError>(())
1318 },
1319 PG_DB_COMMIT_SLEEP_DURATION
1320 )
1321 }
1322
1323 fn prune_epochs_table(&self, epoch: u64) -> Result<(), IndexerError> {
1324 transactional_blocking_with_retry!(
1325 &self.blocking_cp,
1326 |conn| {
1327 diesel::delete(epochs::table.filter(epochs::epoch.eq(epoch as i64)))
1328 .execute(conn)
1329 .map_err(IndexerError::from)
1330 .context("Failed to prune epochs table")?;
1331 Ok::<(), IndexerError>(())
1332 },
1333 PG_DB_COMMIT_SLEEP_DURATION
1334 )
1335 }
1336
1337 fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1338 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1339 transactional_blocking_with_retry!(
1340 &self.blocking_cp,
1341 |conn| {
1342 prune_tx_or_event_indice_table!(
1343 event_emit_module,
1344 conn,
1345 min_tx,
1346 max_tx,
1347 "Failed to prune event_emit_module table"
1348 );
1349 prune_tx_or_event_indice_table!(
1350 event_emit_package,
1351 conn,
1352 min_tx,
1353 max_tx,
1354 "Failed to prune event_emit_package table"
1355 );
1356 prune_tx_or_event_indice_table![
1357 event_senders,
1358 conn,
1359 min_tx,
1360 max_tx,
1361 "Failed to prune event_senders table"
1362 ];
1363 prune_tx_or_event_indice_table![
1364 event_struct_instantiation,
1365 conn,
1366 min_tx,
1367 max_tx,
1368 "Failed to prune event_struct_instantiation table"
1369 ];
1370 prune_tx_or_event_indice_table![
1371 event_struct_module,
1372 conn,
1373 min_tx,
1374 max_tx,
1375 "Failed to prune event_struct_module table"
1376 ];
1377 prune_tx_or_event_indice_table![
1378 event_struct_name,
1379 conn,
1380 min_tx,
1381 max_tx,
1382 "Failed to prune event_struct_name table"
1383 ];
1384 prune_tx_or_event_indice_table![
1385 event_struct_package,
1386 conn,
1387 min_tx,
1388 max_tx,
1389 "Failed to prune event_struct_package table"
1390 ];
1391 Ok::<(), IndexerError>(())
1392 },
1393 PG_DB_COMMIT_SLEEP_DURATION
1394 )
1395 }
1396
1397 fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1398 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1399 transactional_blocking_with_retry!(
1400 &self.blocking_cp,
1401 |conn| {
1402 prune_tx_or_event_indice_table!(
1403 tx_senders,
1404 conn,
1405 min_tx,
1406 max_tx,
1407 "Failed to prune tx_senders table"
1408 );
1409 prune_tx_or_event_indice_table!(
1410 tx_recipients,
1411 conn,
1412 min_tx,
1413 max_tx,
1414 "Failed to prune tx_recipients table"
1415 );
1416 prune_tx_or_event_indice_table![
1417 tx_input_objects,
1418 conn,
1419 min_tx,
1420 max_tx,
1421 "Failed to prune tx_input_objects table"
1422 ];
1423 prune_tx_or_event_indice_table![
1424 tx_changed_objects,
1425 conn,
1426 min_tx,
1427 max_tx,
1428 "Failed to prune tx_changed_objects table"
1429 ];
1430 prune_tx_or_event_indice_table![
1431 tx_calls_pkg,
1432 conn,
1433 min_tx,
1434 max_tx,
1435 "Failed to prune tx_calls_pkg table"
1436 ];
1437 prune_tx_or_event_indice_table![
1438 tx_calls_mod,
1439 conn,
1440 min_tx,
1441 max_tx,
1442 "Failed to prune tx_calls_mod table"
1443 ];
1444 prune_tx_or_event_indice_table![
1445 tx_calls_fun,
1446 conn,
1447 min_tx,
1448 max_tx,
1449 "Failed to prune tx_calls_fun table"
1450 ];
1451 prune_tx_or_event_indice_table![
1452 tx_digests,
1453 conn,
1454 min_tx,
1455 max_tx,
1456 "Failed to prune tx_digests table"
1457 ];
1458 Ok::<(), IndexerError>(())
1459 },
1460 PG_DB_COMMIT_SLEEP_DURATION
1461 )
1462 }
1463
1464 fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1465 transactional_blocking_with_retry!(
1466 &self.blocking_cp,
1467 |conn| {
1468 diesel::delete(
1469 pruner_cp_watermark::table
1470 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1471 )
1472 .execute(conn)
1473 .map_err(IndexerError::from)
1474 .context("Failed to prune pruner_cp_watermark table")?;
1475 Ok::<(), IndexerError>(())
1476 },
1477 PG_DB_COMMIT_SLEEP_DURATION
1478 )
1479 }
1480
1481 fn get_network_total_transactions_by_end_of_epoch(
1482 &self,
1483 epoch: u64,
1484 ) -> Result<u64, IndexerError> {
1485 read_only_blocking!(&self.blocking_cp, |conn| {
1486 checkpoints::table
1487 .filter(checkpoints::epoch.eq(epoch as i64))
1488 .select(checkpoints::network_total_transactions)
1489 .order_by(checkpoints::sequence_number.desc())
1490 .first::<i64>(conn)
1491 })
1492 .context("Failed to get network total transactions in epoch")
1493 .map(|v| v as u64)
1494 }
1495
1496 fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1497 transactional_blocking_with_retry!(
1498 &self.blocking_cp,
1499 |conn| {
1500 diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1501 .execute(conn)?;
1502 Ok::<(), IndexerError>(())
1503 },
1504 PG_DB_COMMIT_SLEEP_DURATION
1505 )
1506 .tap_ok(|_| {
1507 info!("Successfully refreshed participation_metrics");
1508 })
1509 .tap_err(|e| {
1510 tracing::error!("Failed to refresh participation_metrics: {e}");
1511 })
1512 }
1513
1514 async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1515 where
1516 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1517 R: Send + 'static,
1518 {
1519 let this = self.clone();
1520 let current_span = tracing::Span::current();
1521 tokio::task::spawn_blocking(move || {
1522 let _guard = current_span.enter();
1523 f(this)
1524 })
1525 .await
1526 .map_err(Into::into)
1527 .and_then(std::convert::identity)
1528 }
1529
1530 fn spawn_blocking_task<F, R>(
1531 &self,
1532 f: F,
1533 ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1534 where
1535 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1536 R: Send + 'static,
1537 {
1538 let this = self.clone();
1539 let current_span = tracing::Span::current();
1540 let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1541 tokio::task::spawn_blocking(move || {
1542 let _guard = current_span.enter();
1543 let _elapsed = guard.stop_and_record();
1544 f(this)
1545 })
1546 }
1547
1548 fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1549 where
1550 F: FnOnce(Self) -> Fut + Send + 'static,
1551 Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1552 R: Send + 'static,
1553 {
1554 let this = self.clone();
1555 tokio::task::spawn(async move { f(this).await })
1556 }
1557}
1558
1559#[async_trait]
1560impl IndexerStore for PgIndexerStore {
1561 async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1562 self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1563 .await
1564 }
1565
1566 async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1567 self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1568 .await
1569 }
1570
1571 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1572 self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1573 .await
1574 }
1575
1576 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1577 self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1578 .await
1579 }
1580
1581 async fn get_latest_object_snapshot_checkpoint_sequence_number(
1582 &self,
1583 ) -> Result<Option<u64>, IndexerError> {
1584 self.execute_in_blocking_worker(|this| {
1585 this.get_latest_object_snapshot_checkpoint_sequence_number()
1586 })
1587 .await
1588 }
1589
1590 async fn persist_objects(
1591 &self,
1592 object_changes: Vec<TransactionObjectChangesToCommit>,
1593 ) -> Result<(), IndexerError> {
1594 if object_changes.is_empty() {
1595 return Ok(());
1596 }
1597 let guard = self
1598 .metrics
1599 .checkpoint_db_commit_latency_objects
1600 .start_timer();
1601 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1602 let object_mutations = indexed_mutations
1603 .into_iter()
1604 .map(StoredObject::from)
1605 .collect::<Vec<_>>();
1606 let object_deletions = indexed_deletions
1607 .into_iter()
1608 .map(StoredDeletedObject::from)
1609 .collect::<Vec<_>>();
1610 let mutation_len = object_mutations.len();
1611 let deletion_len = object_deletions.len();
1612
1613 let object_mutation_chunks =
1614 chunk!(object_mutations, self.config.parallel_objects_chunk_size);
1615 let object_deletion_chunks =
1616 chunk!(object_deletions, self.config.parallel_objects_chunk_size);
1617 let mutation_futures = object_mutation_chunks
1618 .into_iter()
1619 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_mutation_chunk(c)));
1620 futures::future::try_join_all(mutation_futures)
1621 .await
1622 .map_err(|e| {
1623 tracing::error!(
1624 "Failed to join persist_object_mutation_chunk futures: {}",
1625 e
1626 );
1627 IndexerError::from(e)
1628 })?
1629 .into_iter()
1630 .collect::<Result<Vec<_>, _>>()
1631 .map_err(|e| {
1632 IndexerError::PostgresWrite(format!(
1633 "Failed to persist all object mutation chunks: {:?}",
1634 e
1635 ))
1636 })?;
1637 let deletion_futures = object_deletion_chunks
1638 .into_iter()
1639 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_deletion_chunk(c)));
1640 futures::future::try_join_all(deletion_futures)
1641 .await
1642 .map_err(|e| {
1643 tracing::error!(
1644 "Failed to join persist_object_deletion_chunk futures: {}",
1645 e
1646 );
1647 IndexerError::from(e)
1648 })?
1649 .into_iter()
1650 .collect::<Result<Vec<_>, _>>()
1651 .map_err(|e| {
1652 IndexerError::PostgresWrite(format!(
1653 "Failed to persist all object deletion chunks: {:?}",
1654 e
1655 ))
1656 })?;
1657
1658 let elapsed = guard.stop_and_record();
1659 info!(
1660 elapsed,
1661 "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
1662 );
1663 Ok(())
1664 }
1665
1666 async fn persist_objects_snapshot(
1667 &self,
1668 object_changes: Vec<TransactionObjectChangesToCommit>,
1669 ) -> Result<(), IndexerError> {
1670 if object_changes.is_empty() {
1671 return Ok(());
1672 }
1673 let guard = self
1674 .metrics
1675 .checkpoint_db_commit_latency_objects_snapshot
1676 .start_timer();
1677 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1678 let objects_snapshot = indexed_mutations
1679 .into_iter()
1680 .map(StoredObjectSnapshot::from)
1681 .chain(
1682 indexed_deletions
1683 .into_iter()
1684 .map(StoredObjectSnapshot::from),
1685 )
1686 .collect::<Vec<_>>();
1687 let len = objects_snapshot.len();
1688 let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1689 let futures = chunks
1690 .into_iter()
1691 .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1692
1693 futures::future::try_join_all(futures)
1694 .await
1695 .map_err(|e| {
1696 tracing::error!(
1697 "Failed to join backfill_objects_snapshot_chunk futures: {}",
1698 e
1699 );
1700 IndexerError::from(e)
1701 })?
1702 .into_iter()
1703 .collect::<Result<Vec<_>, _>>()
1704 .map_err(|e| {
1705 IndexerError::PostgresWrite(format!(
1706 "Failed to persist all objects snapshot chunks: {:?}",
1707 e
1708 ))
1709 })?;
1710 let elapsed = guard.stop_and_record();
1711 info!(elapsed, "Persisted {} objects snapshot", len);
1712 Ok(())
1713 }
1714
1715 async fn persist_object_history(
1716 &self,
1717 object_changes: Vec<TransactionObjectChangesToCommit>,
1718 ) -> Result<(), IndexerError> {
1719 let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1720 .map(|val| val.eq_ignore_ascii_case("true"))
1721 .unwrap_or(false);
1722 if skip_history {
1723 info!("skipping object history");
1724 return Ok(());
1725 }
1726
1727 if object_changes.is_empty() {
1728 return Ok(());
1729 }
1730 let objects = make_objects_history_to_commit(object_changes);
1731 let guard = self
1732 .metrics
1733 .checkpoint_db_commit_latency_objects_history
1734 .start_timer();
1735
1736 let len = objects.len();
1737 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1738 let futures = chunks
1739 .into_iter()
1740 .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1741
1742 futures::future::try_join_all(futures)
1743 .await
1744 .map_err(|e| {
1745 tracing::error!(
1746 "Failed to join persist_objects_history_chunk futures: {}",
1747 e
1748 );
1749 IndexerError::from(e)
1750 })?
1751 .into_iter()
1752 .collect::<Result<Vec<_>, _>>()
1753 .map_err(|e| {
1754 IndexerError::PostgresWrite(format!(
1755 "Failed to persist all objects history chunks: {:?}",
1756 e
1757 ))
1758 })?;
1759 let elapsed = guard.stop_and_record();
1760 info!(elapsed, "Persisted {} objects history", len);
1761 Ok(())
1762 }
1763
1764 async fn persist_object_versions(
1765 &self,
1766 object_versions: Vec<StoredObjectVersion>,
1767 ) -> Result<(), IndexerError> {
1768 if object_versions.is_empty() {
1769 return Ok(());
1770 }
1771 let object_versions_count = object_versions.len();
1772
1773 let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1774 let futures = chunks
1775 .into_iter()
1776 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1777 .collect::<Vec<_>>();
1778
1779 futures::future::try_join_all(futures)
1780 .await
1781 .map_err(|e| {
1782 tracing::error!("Failed to join persist_object_version_chunk futures: {}", e);
1783 IndexerError::from(e)
1784 })?
1785 .into_iter()
1786 .collect::<Result<Vec<_>, _>>()
1787 .map_err(|e| {
1788 IndexerError::PostgresWrite(format!(
1789 "Failed to persist all object version chunks: {:?}",
1790 e
1791 ))
1792 })?;
1793 info!("Persisted {} objects history", object_versions_count);
1794 Ok(())
1795 }
1796
1797 async fn persist_checkpoints(
1798 &self,
1799 checkpoints: Vec<IndexedCheckpoint>,
1800 ) -> Result<(), IndexerError> {
1801 self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1802 .await
1803 }
1804
1805 async fn persist_transactions(
1806 &self,
1807 transactions: Vec<IndexedTransaction>,
1808 ) -> Result<(), IndexerError> {
1809 let guard = self
1810 .metrics
1811 .checkpoint_db_commit_latency_transactions
1812 .start_timer();
1813 let len = transactions.len();
1814
1815 let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1816 let futures = chunks
1817 .into_iter()
1818 .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1819
1820 futures::future::try_join_all(futures)
1821 .await
1822 .map_err(|e| {
1823 tracing::error!("Failed to join persist_transactions_chunk futures: {}", e);
1824 IndexerError::from(e)
1825 })?
1826 .into_iter()
1827 .collect::<Result<Vec<_>, _>>()
1828 .map_err(|e| {
1829 IndexerError::PostgresWrite(format!(
1830 "Failed to persist all transactions chunks: {:?}",
1831 e
1832 ))
1833 })?;
1834 let elapsed = guard.stop_and_record();
1835 info!(elapsed, "Persisted {} transactions", len);
1836 Ok(())
1837 }
1838
1839 async fn persist_optimistic_transaction(
1840 &self,
1841 transaction: OptimisticTransaction,
1842 ) -> Result<(), IndexerError> {
1843 let insertion_order = transaction.insertion_order;
1844
1845 self.spawn_blocking_task(move |this| {
1846 transactional_blocking_with_retry!(
1847 &this.blocking_cp,
1848 |conn| {
1849 insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1850 Ok::<(), IndexerError>(())
1851 },
1852 PG_DB_COMMIT_SLEEP_DURATION
1853 )
1854 .tap_err(|e| {
1855 tracing::error!("Failed to persist transactions with error: {}", e);
1856 })
1857 })
1858 .await
1859 .map_err(|e| {
1860 IndexerError::PostgresWrite(format!(
1861 "Failed to persist optimistic transaction: {:?}",
1862 e
1863 ))
1864 })??;
1865
1866 info!("Persisted optimistic transaction {insertion_order}");
1867 Ok(())
1868 }
1869
1870 async fn persist_tx_insertion_order(
1871 &self,
1872 tx_order: Vec<TxInsertionOrder>,
1873 ) -> Result<(), IndexerError> {
1874 let guard = self
1875 .metrics
1876 .checkpoint_db_commit_latency_tx_insertion_order
1877 .start_timer();
1878 let len = tx_order.len();
1879
1880 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
1881 let futures = chunks.into_iter().map(|c| {
1882 self.spawn_blocking_task(move |this| this.persist_tx_insertion_order_chunk(c))
1883 });
1884
1885 futures::future::try_join_all(futures)
1886 .await
1887 .map_err(|e| {
1888 tracing::error!("Failed to join persist_tx_insertion_order_chunk futures: {e}",);
1889 IndexerError::from(e)
1890 })?
1891 .into_iter()
1892 .collect::<Result<Vec<_>, _>>()
1893 .map_err(|e| {
1894 IndexerError::PostgresWrite(format!(
1895 "Failed to persist all txs insertion order chunks: {e:?}",
1896 ))
1897 })?;
1898 let elapsed = guard.stop_and_record();
1899 info!(elapsed, "Persisted {len} txs insertion orders");
1900 Ok(())
1901 }
1902
1903 async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1904 if events.is_empty() {
1905 return Ok(());
1906 }
1907 let len = events.len();
1908 let guard = self
1909 .metrics
1910 .checkpoint_db_commit_latency_events
1911 .start_timer();
1912 let chunks = chunk!(events, self.config.parallel_chunk_size);
1913 let futures = chunks
1914 .into_iter()
1915 .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1916
1917 futures::future::try_join_all(futures)
1918 .await
1919 .map_err(|e| {
1920 tracing::error!("Failed to join persist_events_chunk futures: {}", e);
1921 IndexerError::from(e)
1922 })?
1923 .into_iter()
1924 .collect::<Result<Vec<_>, _>>()
1925 .map_err(|e| {
1926 IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {:?}", e))
1927 })?;
1928 let elapsed = guard.stop_and_record();
1929 info!(elapsed, "Persisted {} events", len);
1930 Ok(())
1931 }
1932
1933 async fn persist_optimistic_events(
1934 &self,
1935 events: Vec<OptimisticEvent>,
1936 ) -> Result<(), IndexerError> {
1937 if events.is_empty() {
1938 return Ok(());
1939 }
1940
1941 self.spawn_blocking_task(move |this| {
1942 persist_chunk_into_table!(optimistic_events::table, events, &this.blocking_cp)
1943 })
1944 .await
1945 .map_err(|e| {
1946 tracing::error!(
1947 "Failed to join persist_chunk_into_table in persist_optimistic_events: {e}"
1948 );
1949 IndexerError::from(e)
1950 })?
1951 .map_err(|e| {
1952 IndexerError::PostgresWrite(format!(
1953 "Failed to persist all optimistic events chunks: {:?}",
1954 e
1955 ))
1956 })
1957 }
1958
1959 async fn persist_displays(
1960 &self,
1961 display_updates: BTreeMap<String, StoredDisplay>,
1962 ) -> Result<(), IndexerError> {
1963 if display_updates.is_empty() {
1964 return Ok(());
1965 }
1966
1967 self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
1968 .await?
1969 }
1970
1971 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1972 if packages.is_empty() {
1973 return Ok(());
1974 }
1975 self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
1976 .await
1977 }
1978
1979 async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
1980 if indices.is_empty() {
1981 return Ok(());
1982 }
1983 let len = indices.len();
1984 let guard = self
1985 .metrics
1986 .checkpoint_db_commit_latency_event_indices
1987 .start_timer();
1988 let chunks = chunk!(indices, self.config.parallel_chunk_size);
1989
1990 let futures = chunks.into_iter().map(|chunk| {
1991 self.spawn_task(move |this: Self| async move {
1992 this.persist_event_indices_chunk(chunk).await
1993 })
1994 });
1995
1996 futures::future::try_join_all(futures)
1997 .await
1998 .map_err(|e| {
1999 tracing::error!("Failed to join persist_event_indices_chunk futures: {}", e);
2000 IndexerError::from(e)
2001 })?
2002 .into_iter()
2003 .collect::<Result<Vec<_>, _>>()
2004 .map_err(|e| {
2005 IndexerError::PostgresWrite(format!(
2006 "Failed to persist all event_indices chunks: {:?}",
2007 e
2008 ))
2009 })?;
2010 let elapsed = guard.stop_and_record();
2011 info!(elapsed, "Persisted {} event_indices chunks", len);
2012 Ok(())
2013 }
2014
2015 async fn persist_optimistic_event_indices(
2016 &self,
2017 indices: OptimisticEventIndices,
2018 ) -> Result<(), IndexerError> {
2019 let OptimisticEventIndices {
2020 optimistic_event_emit_packages,
2021 optimistic_event_emit_modules,
2022 optimistic_event_senders,
2023 optimistic_event_struct_packages,
2024 optimistic_event_struct_modules,
2025 optimistic_event_struct_names,
2026 optimistic_event_struct_instantiations,
2027 } = indices;
2028
2029 let mut futures = vec![];
2030 futures.push(self.spawn_blocking_task(move |this| {
2031 persist_chunk_into_table!(
2032 optimistic_event_emit_package::table,
2033 optimistic_event_emit_packages,
2034 &this.blocking_cp
2035 )
2036 }));
2037
2038 futures.push(self.spawn_blocking_task(move |this| {
2039 persist_chunk_into_table!(
2040 optimistic_event_emit_module::table,
2041 optimistic_event_emit_modules,
2042 &this.blocking_cp
2043 )
2044 }));
2045
2046 futures.push(self.spawn_blocking_task(move |this| {
2047 persist_chunk_into_table!(
2048 optimistic_event_senders::table,
2049 optimistic_event_senders,
2050 &this.blocking_cp
2051 )
2052 }));
2053
2054 futures.push(self.spawn_blocking_task(move |this| {
2055 persist_chunk_into_table!(
2056 optimistic_event_struct_package::table,
2057 optimistic_event_struct_packages,
2058 &this.blocking_cp
2059 )
2060 }));
2061
2062 futures.push(self.spawn_blocking_task(move |this| {
2063 persist_chunk_into_table!(
2064 optimistic_event_struct_module::table,
2065 optimistic_event_struct_modules,
2066 &this.blocking_cp
2067 )
2068 }));
2069
2070 futures.push(self.spawn_blocking_task(move |this| {
2071 persist_chunk_into_table!(
2072 optimistic_event_struct_name::table,
2073 optimistic_event_struct_names,
2074 &this.blocking_cp
2075 )
2076 }));
2077
2078 futures.push(self.spawn_blocking_task(move |this| {
2079 persist_chunk_into_table!(
2080 optimistic_event_struct_instantiation::table,
2081 optimistic_event_struct_instantiations,
2082 &this.blocking_cp
2083 )
2084 }));
2085
2086 futures::future::try_join_all(futures)
2087 .await
2088 .map_err(|e| {
2089 tracing::error!("Failed to join optimistic event indices futures: {e}");
2090 IndexerError::from(e)
2091 })?
2092 .into_iter()
2093 .collect::<Result<Vec<_>, _>>()
2094 .map_err(|e| {
2095 IndexerError::PostgresWrite(format!(
2096 "Failed to persist all optimistic event indices: {e:?}",
2097 ))
2098 })?;
2099 info!("Persisted optimistic event indices");
2100 Ok(())
2101 }
2102
2103 async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2104 if indices.is_empty() {
2105 return Ok(());
2106 }
2107 let len = indices.len();
2108 let guard = self
2109 .metrics
2110 .checkpoint_db_commit_latency_tx_indices
2111 .start_timer();
2112 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2113
2114 let futures = chunks.into_iter().map(|chunk| {
2115 self.spawn_task(
2116 move |this: Self| async move { this.persist_tx_indices_chunk(chunk).await },
2117 )
2118 });
2119 futures::future::try_join_all(futures)
2120 .await
2121 .map_err(|e| {
2122 tracing::error!("Failed to join persist_tx_indices_chunk futures: {}", e);
2123 IndexerError::from(e)
2124 })?
2125 .into_iter()
2126 .collect::<Result<Vec<_>, _>>()
2127 .map_err(|e| {
2128 IndexerError::PostgresWrite(format!(
2129 "Failed to persist all tx_indices chunks: {:?}",
2130 e
2131 ))
2132 })?;
2133 let elapsed = guard.stop_and_record();
2134 info!(elapsed, "Persisted {} tx_indices chunks", len);
2135 Ok(())
2136 }
2137
2138 async fn persist_optimistic_tx_indices(
2139 &self,
2140 indices: OptimisticTxIndices,
2141 ) -> Result<(), IndexerError> {
2142 let OptimisticTxIndices {
2143 optimistic_tx_senders: senders,
2144 optimistic_tx_recipients: recipients,
2145 optimistic_tx_input_objects: input_objects,
2146 optimistic_tx_changed_objects: changed_objects,
2147 optimistic_tx_pkgs: pkgs,
2148 optimistic_tx_mods: mods,
2149 optimistic_tx_funs: funs,
2150 optimistic_tx_kinds: kinds,
2151 } = indices;
2152
2153 let mut futures = vec![];
2154 futures.push(self.spawn_blocking_task(move |this| {
2155 persist_chunk_into_table!(optimistic_tx_senders::table, senders, &this.blocking_cp)
2156 }));
2157
2158 futures.push(self.spawn_blocking_task(move |this| {
2159 persist_chunk_into_table!(
2160 optimistic_tx_recipients::table,
2161 recipients,
2162 &this.blocking_cp
2163 )
2164 }));
2165
2166 futures.push(self.spawn_blocking_task(move |this| {
2167 persist_chunk_into_table!(
2168 optimistic_tx_input_objects::table,
2169 input_objects,
2170 &this.blocking_cp
2171 )
2172 }));
2173
2174 futures.push(self.spawn_blocking_task(move |this| {
2175 persist_chunk_into_table!(
2176 optimistic_tx_changed_objects::table,
2177 changed_objects,
2178 &this.blocking_cp
2179 )
2180 }));
2181
2182 futures.push(self.spawn_blocking_task(move |this| {
2183 persist_chunk_into_table!(optimistic_tx_calls_pkg::table, pkgs, &this.blocking_cp)
2184 }));
2185
2186 futures.push(self.spawn_blocking_task(move |this| {
2187 persist_chunk_into_table!(optimistic_tx_calls_mod::table, mods, &this.blocking_cp)
2188 }));
2189
2190 futures.push(self.spawn_blocking_task(move |this| {
2191 persist_chunk_into_table!(optimistic_tx_calls_fun::table, funs, &this.blocking_cp)
2192 }));
2193
2194 futures.push(self.spawn_blocking_task(move |this| {
2195 persist_chunk_into_table!(optimistic_tx_kinds::table, kinds, &this.blocking_cp)
2196 }));
2197
2198 futures::future::try_join_all(futures)
2199 .await
2200 .map_err(|e| {
2201 tracing::error!("Failed to join optimistic tx indices futures: {e}");
2202 IndexerError::from(e)
2203 })?
2204 .into_iter()
2205 .collect::<Result<Vec<_>, _>>()
2206 .map_err(|e| {
2207 IndexerError::PostgresWrite(format!(
2208 "Failed to persist all optimistic tx indices: {e:?}",
2209 ))
2210 })?;
2211 info!("Persisted optimistic tx indices");
2212 Ok(())
2213 }
2214
2215 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2216 self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2217 .await
2218 }
2219
2220 async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2221 self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2222 .await
2223 }
2224
2225 async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2226 let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2227 (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2228 _ => Err(IndexerError::PostgresRead(format!(
2229 "Failed to get checkpoint range for epoch {}",
2230 epoch
2231 ))),
2232 }?;
2233
2234 let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2239 min_cp = std::cmp::max(min_cp, min_prunable_cp);
2240 for cp in min_cp..=max_cp {
2241 info!(
2253 "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2254 cp, epoch, min_prunable_cp
2255 );
2256 self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2257 .await
2258 .unwrap_or_else(|e| {
2259 tracing::error!("Failed to prune checkpoint {}: {}", cp, e);
2260 });
2261
2262 let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2263 self.execute_in_blocking_worker(move |this| {
2264 this.prune_tx_indices_table(min_tx, max_tx)
2265 })
2266 .await
2267 .unwrap_or_else(|e| {
2268 tracing::error!("Failed to prune transactions for cp {}: {}", cp, e);
2269 });
2270 info!(
2271 "Pruned transactions for checkpoint {} from tx {} to tx {}",
2272 cp, min_tx, max_tx
2273 );
2274 self.execute_in_blocking_worker(move |this| {
2275 this.prune_event_indices_table(min_tx, max_tx)
2276 })
2277 .await
2278 .unwrap_or_else(|e| {
2279 tracing::error!(
2280 "Failed to prune events of transactions for cp {}: {}",
2281 cp,
2282 e
2283 );
2284 });
2285 info!(
2286 "Pruned events of transactions for checkpoint {} from tx {} to tx {}",
2287 cp, min_tx, max_tx
2288 );
2289 self.metrics.last_pruned_transaction.set(max_tx as i64);
2290
2291 self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2292 .await
2293 .unwrap_or_else(|e| {
2294 tracing::error!(
2295 "Failed to prune pruner_cp_watermark table for cp {}: {}",
2296 cp,
2297 e
2298 );
2299 });
2300 info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2301 self.metrics.last_pruned_checkpoint.set(cp as i64);
2302 }
2303
2304 self.execute_in_blocking_worker(move |this| this.prune_epochs_table(epoch))
2307 .await
2308 .unwrap_or_else(|e| {
2309 tracing::error!("Failed to prune epoch table for epoch {}: {}", epoch, e);
2310 });
2311 Ok(())
2312 }
2313
2314 async fn get_network_total_transactions_by_end_of_epoch(
2315 &self,
2316 epoch: u64,
2317 ) -> Result<u64, IndexerError> {
2318 self.execute_in_blocking_worker(move |this| {
2319 this.get_network_total_transactions_by_end_of_epoch(epoch)
2320 })
2321 .await
2322 }
2323
2324 async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2325 self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2326 .await
2327 }
2328
2329 fn as_any(&self) -> &dyn StdAny {
2330 self
2331 }
2332
2333 fn persist_protocol_configs_and_feature_flags(
2336 &self,
2337 chain_id: Vec<u8>,
2338 ) -> Result<(), IndexerError> {
2339 let chain_id = ChainIdentifier::from(
2340 CheckpointDigest::try_from(chain_id).expect("Unable to convert chain id"),
2341 );
2342
2343 let mut all_configs = vec![];
2344 let mut all_flags = vec![];
2345
2346 let (start_version, end_version) = self.get_protocol_version_index_range()?;
2347 info!(
2348 "Persisting protocol configs with start_version: {}, end_version: {}",
2349 start_version, end_version
2350 );
2351
2352 for version in start_version..=end_version {
2355 let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2356 (version as u64).into(),
2357 chain_id.chain(),
2358 )
2359 .ok_or(IndexerError::Generic(format!(
2360 "Unable to fetch protocol version {} and chain {:?}",
2361 version,
2362 chain_id.chain()
2363 )))?;
2364 let configs_vec = protocol_configs
2365 .attr_map()
2366 .into_iter()
2367 .map(|(k, v)| StoredProtocolConfig {
2368 protocol_version: version,
2369 config_name: k,
2370 config_value: v.map(|v| v.to_string()),
2371 })
2372 .collect::<Vec<_>>();
2373 all_configs.extend(configs_vec);
2374
2375 let feature_flags = protocol_configs
2376 .feature_map()
2377 .into_iter()
2378 .map(|(k, v)| StoredFeatureFlag {
2379 protocol_version: version,
2380 flag_name: k,
2381 flag_value: v,
2382 })
2383 .collect::<Vec<_>>();
2384 all_flags.extend(feature_flags);
2385 }
2386
2387 transactional_blocking_with_retry!(
2391 &self.blocking_cp,
2392 |conn| {
2393 insert_or_ignore_into!(protocol_configs::table, all_configs.clone(), conn);
2394 insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2395 Ok::<(), IndexerError>(())
2396 },
2397 PG_DB_COMMIT_SLEEP_DURATION
2398 )?;
2399 Ok(())
2400 }
2401}
2402
2403fn make_objects_history_to_commit(
2404 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2405) -> Vec<StoredHistoryObject> {
2406 let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2407 .clone()
2408 .into_iter()
2409 .flat_map(|changes| changes.deleted_objects)
2410 .map(|o| o.into())
2411 .collect();
2412 let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2413 .into_iter()
2414 .flat_map(|changes| changes.changed_objects)
2415 .map(|o| o.into())
2416 .collect();
2417 deleted_objects.into_iter().chain(mutated_objects).collect()
2418}
2419
2420fn retain_latest_indexed_objects(
2426 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2427) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2428 let (mutations, deletions) = tx_object_changes
2431 .into_iter()
2432 .flat_map(|change| {
2433 change
2434 .changed_objects
2435 .into_iter()
2436 .map(Either::Left)
2437 .chain(
2438 change
2439 .deleted_objects
2440 .into_iter()
2441 .map(Either::Right),
2442 )
2443 })
2444 .fold(
2445 (HashMap::<ObjectID, IndexedObject>::new(), HashMap::<ObjectID, IndexedDeletedObject>::new()),
2446 |(mut mutations, mut deletions), either_change| {
2447 match either_change {
2448 Either::Left(mutation) => {
2452 let id = mutation.object.id();
2453 let mutation_version = mutation.object.version();
2454 if let Some(existing) = deletions.remove(&id) {
2455 assert!(
2456 existing.object_version < mutation_version.value(),
2457 "Mutation version ({mutation_version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2458 existing.object_version
2459 );
2460 }
2461 if let Some(existing) = mutations.insert(id, mutation) {
2462 assert!(
2463 existing.object.version() < mutation_version,
2464 "Mutation version ({mutation_version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2465 existing.object.version()
2466 );
2467 }
2468 }
2469 Either::Right(deletion) => {
2470 let id = deletion.object_id;
2471 let deletion_version = deletion.object_version;
2472 if let Some(existing) = mutations.remove(&id) {
2473 assert!(
2474 existing.object.version().value() < deletion_version,
2475 "Deletion version ({deletion_version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2476 existing.object.version(),
2477 );
2478 }
2479 if let Some(existing) = deletions.insert(id, deletion) {
2480 assert!(
2481 existing.object_version < deletion_version,
2482 "Deletion version ({deletion_version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2483 existing.object_version
2484 );
2485 }
2486 }
2487 }
2488 (mutations, deletions)
2489 },
2490 );
2491 (
2492 mutations.into_values().collect(),
2493 deletions.into_values().collect(),
2494 )
2495}