1use core::result::Result::Ok;
6use std::{
7 any::Any as StdAny,
8 collections::{BTreeMap, HashMap},
9 time::{Duration, Instant},
10};
11
12use async_trait::async_trait;
13use diesel::{
14 ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
15 dsl::{max, min},
16 upsert::excluded,
17};
18use downcast::Any;
19use futures::future::Either;
20use iota_protocol_config::ProtocolConfig;
21use iota_types::{
22 base_types::ObjectID,
23 digests::{ChainIdentifier, CheckpointDigest},
24};
25use itertools::Itertools;
26use tap::TapFallible;
27use tracing::info;
28
29use super::{
30 IndexerStore,
31 pg_partition_manager::{EpochPartitionData, PgPartitionManager},
32};
33use crate::{
34 db::ConnectionPool,
35 errors::{Context, IndexerError},
36 handlers::{EpochToCommit, TransactionObjectChangesToCommit},
37 insert_or_ignore_into,
38 metrics::IndexerMetrics,
39 models::{
40 checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
41 display::StoredDisplay,
42 epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
43 event_indices::OptimisticEventIndices,
44 events::{OptimisticEvent, StoredEvent},
45 obj_indices::StoredObjectVersion,
46 objects::{StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot},
47 packages::StoredPackage,
48 transactions::{OptimisticTransaction, StoredTransaction, TxInsertionOrder},
49 tx_indices::OptimisticTxIndices,
50 },
51 on_conflict_do_update, persist_chunk_into_table, read_only_blocking,
52 schema::{
53 chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
54 event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
55 event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
56 objects_version, optimistic_event_emit_module, optimistic_event_emit_package,
57 optimistic_event_senders, optimistic_event_struct_instantiation,
58 optimistic_event_struct_module, optimistic_event_struct_name,
59 optimistic_event_struct_package, optimistic_events, optimistic_transactions,
60 optimistic_tx_calls_fun, optimistic_tx_calls_mod, optimistic_tx_calls_pkg,
61 optimistic_tx_changed_objects, optimistic_tx_input_objects, optimistic_tx_kinds,
62 optimistic_tx_recipients, optimistic_tx_senders, packages, protocol_configs,
63 pruner_cp_watermark, transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg,
64 tx_changed_objects, tx_digests, tx_input_objects, tx_insertion_order, tx_kinds,
65 tx_recipients, tx_senders,
66 },
67 transactional_blocking_with_retry,
68 types::{
69 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
70 IndexedPackage, IndexedTransaction, TxIndex,
71 },
72};
73
74#[macro_export]
75macro_rules! chunk {
76 ($data: expr, $size: expr) => {{
77 $data
78 .into_iter()
79 .chunks($size)
80 .into_iter()
81 .map(|c| c.collect())
82 .collect::<Vec<Vec<_>>>()
83 }};
84}
85
86macro_rules! prune_tx_or_event_indice_table {
87 ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
88 diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
89 .execute($conn)
90 .map_err(IndexerError::from)
91 .context($context_msg)?;
92 };
93}
94
95const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
100const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
102const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
106const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
107
108#[derive(Clone)]
109pub struct PgIndexerStoreConfig {
110 pub parallel_chunk_size: usize,
111 pub parallel_objects_chunk_size: usize,
112}
113
114pub struct PgIndexerStore {
115 blocking_cp: ConnectionPool,
116 metrics: IndexerMetrics,
117 partition_manager: PgPartitionManager,
118 config: PgIndexerStoreConfig,
119}
120
121impl Clone for PgIndexerStore {
122 fn clone(&self) -> PgIndexerStore {
123 Self {
124 blocking_cp: self.blocking_cp.clone(),
125 metrics: self.metrics.clone(),
126 partition_manager: self.partition_manager.clone(),
127 config: self.config.clone(),
128 }
129 }
130}
131
132impl PgIndexerStore {
133 pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
134 let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
135 .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
136 .parse::<usize>()
137 .unwrap();
138 let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
139 .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
140 .parse::<usize>()
141 .unwrap();
142 let partition_manager = PgPartitionManager::new(blocking_cp.clone())
143 .expect("Failed to initialize partition manager");
144 let config = PgIndexerStoreConfig {
145 parallel_chunk_size,
146 parallel_objects_chunk_size,
147 };
148
149 Self {
150 blocking_cp,
151 metrics,
152 partition_manager,
153 config,
154 }
155 }
156
157 pub fn blocking_cp(&self) -> ConnectionPool {
158 self.blocking_cp.clone()
159 }
160
161 pub fn get_latest_epoch_id(&self) -> Result<Option<u64>, IndexerError> {
162 read_only_blocking!(&self.blocking_cp, |conn| {
163 epochs::dsl::epochs
164 .select(max(epochs::epoch))
165 .first::<Option<i64>>(conn)
166 .map(|v| v.map(|v| v as u64))
167 })
168 .context("Failed reading latest epoch id from PostgresDB")
169 }
170
171 pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
173 let start = read_only_blocking!(&self.blocking_cp, |conn| {
176 protocol_configs::dsl::protocol_configs
177 .select(max(protocol_configs::protocol_version))
178 .first::<Option<i64>>(conn)
179 })
180 .context("Failed reading latest protocol version from PostgresDB")?
181 .map_or(1, |v| v + 1);
182
183 let end = read_only_blocking!(&self.blocking_cp, |conn| {
185 epochs::dsl::epochs
186 .select(max(epochs::protocol_version))
187 .first::<Option<i64>>(conn)
188 })
189 .context("Failed reading latest epoch protocol version from PostgresDB")?
190 .unwrap_or(1);
191 Ok((start, end))
192 }
193
194 pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
195 read_only_blocking!(&self.blocking_cp, |conn| {
196 chain_identifier::dsl::chain_identifier
197 .select(chain_identifier::checkpoint_digest)
198 .first::<Vec<u8>>(conn)
199 .optional()
200 })
201 .context("Failed reading chain id from PostgresDB")
202 }
203
204 fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
205 read_only_blocking!(&self.blocking_cp, |conn| {
206 checkpoints::dsl::checkpoints
207 .select(max(checkpoints::sequence_number))
208 .first::<Option<i64>>(conn)
209 .map(|v| v.map(|v| v as u64))
210 })
211 .context("Failed reading latest checkpoint sequence number from PostgresDB")
212 }
213
214 fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
215 read_only_blocking!(&self.blocking_cp, |conn| {
216 checkpoints::dsl::checkpoints
217 .select((
218 min(checkpoints::sequence_number),
219 max(checkpoints::sequence_number),
220 ))
221 .first::<(Option<i64>, Option<i64>)>(conn)
222 .map(|(min, max)| {
223 (
224 min.unwrap_or_default() as u64,
225 max.unwrap_or_default() as u64,
226 )
227 })
228 })
229 .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
230 }
231
232 fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
233 read_only_blocking!(&self.blocking_cp, |conn| {
234 epochs::dsl::epochs
235 .select((min(epochs::epoch), max(epochs::epoch)))
236 .first::<(Option<i64>, Option<i64>)>(conn)
237 .map(|(min, max)| {
238 (
239 min.unwrap_or_default() as u64,
240 max.unwrap_or_default() as u64,
241 )
242 })
243 })
244 .context("Failed reading min and max epoch numbers from PostgresDB")
245 }
246
247 fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
248 read_only_blocking!(&self.blocking_cp, |conn| {
249 pruner_cp_watermark::dsl::pruner_cp_watermark
250 .select(min(pruner_cp_watermark::checkpoint_sequence_number))
251 .first::<Option<i64>>(conn)
252 .map(|v| v.unwrap_or_default() as u64)
253 })
254 .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
255 }
256
257 fn get_checkpoint_range_for_epoch(
258 &self,
259 epoch: u64,
260 ) -> Result<(u64, Option<u64>), IndexerError> {
261 read_only_blocking!(&self.blocking_cp, |conn| {
262 epochs::dsl::epochs
263 .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
264 .filter(epochs::epoch.eq(epoch as i64))
265 .first::<(i64, Option<i64>)>(conn)
266 .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
267 })
268 .context("Failed reading checkpoint range from PostgresDB")
269 }
270
271 fn get_transaction_range_for_checkpoint(
272 &self,
273 checkpoint: u64,
274 ) -> Result<(u64, u64), IndexerError> {
275 read_only_blocking!(&self.blocking_cp, |conn| {
276 pruner_cp_watermark::dsl::pruner_cp_watermark
277 .select((
278 pruner_cp_watermark::min_tx_sequence_number,
279 pruner_cp_watermark::max_tx_sequence_number,
280 ))
281 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
282 .first::<(i64, i64)>(conn)
283 .map(|(min, max)| (min as u64, max as u64))
284 })
285 .context("Failed reading transaction range from PostgresDB")
286 }
287
288 fn get_latest_object_snapshot_checkpoint_sequence_number(
289 &self,
290 ) -> Result<Option<u64>, IndexerError> {
291 read_only_blocking!(&self.blocking_cp, |conn| {
292 objects_snapshot::dsl::objects_snapshot
293 .select(max(objects_snapshot::checkpoint_sequence_number))
294 .first::<Option<i64>>(conn)
295 .map(|v| v.map(|v| v as u64))
296 })
297 .context("Failed reading latest object snapshot checkpoint sequence number from PostgresDB")
298 }
299
300 fn persist_display_updates(
301 &self,
302 display_updates: BTreeMap<String, StoredDisplay>,
303 ) -> Result<(), IndexerError> {
304 transactional_blocking_with_retry!(
305 &self.blocking_cp,
306 |conn| {
307 on_conflict_do_update!(
308 display::table,
309 display_updates.values().collect::<Vec<_>>(),
310 display::object_type,
311 (
312 display::id.eq(excluded(display::id)),
313 display::version.eq(excluded(display::version)),
314 display::bcs.eq(excluded(display::bcs)),
315 ),
316 conn
317 );
318 Ok::<(), IndexerError>(())
319 },
320 PG_DB_COMMIT_SLEEP_DURATION
321 )?;
322
323 Ok(())
324 }
325
326 fn persist_object_mutation_chunk(
327 &self,
328 mutated_object_mutation_chunk: Vec<StoredObject>,
329 ) -> Result<(), IndexerError> {
330 let guard = self
331 .metrics
332 .checkpoint_db_commit_latency_objects_chunks
333 .start_timer();
334 let len = mutated_object_mutation_chunk.len();
335 transactional_blocking_with_retry!(
336 &self.blocking_cp,
337 |conn| {
338 on_conflict_do_update!(
339 objects::table,
340 mutated_object_mutation_chunk.clone(),
341 objects::object_id,
342 (
343 objects::object_id.eq(excluded(objects::object_id)),
344 objects::object_version.eq(excluded(objects::object_version)),
345 objects::object_digest.eq(excluded(objects::object_digest)),
346 objects::owner_type.eq(excluded(objects::owner_type)),
347 objects::owner_id.eq(excluded(objects::owner_id)),
348 objects::object_type.eq(excluded(objects::object_type)),
349 objects::serialized_object.eq(excluded(objects::serialized_object)),
350 objects::coin_type.eq(excluded(objects::coin_type)),
351 objects::coin_balance.eq(excluded(objects::coin_balance)),
352 objects::df_kind.eq(excluded(objects::df_kind)),
353 ),
354 conn
355 );
356 Ok::<(), IndexerError>(())
357 },
358 PG_DB_COMMIT_SLEEP_DURATION
359 )
360 .tap_ok(|_| {
361 let elapsed = guard.stop_and_record();
362 info!(elapsed, "Persisted {} chunked objects", len);
363 })
364 .tap_err(|e| {
365 tracing::error!("Failed to persist object mutations with error: {}", e);
366 })
367 }
368
369 fn persist_object_deletion_chunk(
370 &self,
371 deleted_objects_chunk: Vec<StoredDeletedObject>,
372 ) -> Result<(), IndexerError> {
373 let guard = self
374 .metrics
375 .checkpoint_db_commit_latency_objects_chunks
376 .start_timer();
377 let len = deleted_objects_chunk.len();
378 transactional_blocking_with_retry!(
379 &self.blocking_cp,
380 |conn| {
381 diesel::delete(
382 objects::table.filter(
383 objects::object_id.eq_any(
384 deleted_objects_chunk
385 .iter()
386 .map(|o| o.object_id.clone())
387 .collect::<Vec<_>>(),
388 ),
389 ),
390 )
391 .execute(conn)
392 .map_err(IndexerError::from)
393 .context("Failed to write object deletion to PostgresDB")?;
394
395 Ok::<(), IndexerError>(())
396 },
397 PG_DB_COMMIT_SLEEP_DURATION
398 )
399 .tap_ok(|_| {
400 let elapsed = guard.stop_and_record();
401 info!(elapsed, "Deleted {} chunked objects", len);
402 })
403 .tap_err(|e| {
404 tracing::error!("Failed to persist object deletions with error: {}", e);
405 })
406 }
407
408 fn backfill_objects_snapshot_chunk(
409 &self,
410 objects_snapshot: Vec<StoredObjectSnapshot>,
411 ) -> Result<(), IndexerError> {
412 let guard = self
413 .metrics
414 .checkpoint_db_commit_latency_objects_snapshot_chunks
415 .start_timer();
416 transactional_blocking_with_retry!(
417 &self.blocking_cp,
418 |conn| {
419 for objects_snapshot_chunk in
420 objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
421 {
422 on_conflict_do_update!(
423 objects_snapshot::table,
424 objects_snapshot_chunk,
425 objects_snapshot::object_id,
426 (
427 objects_snapshot::object_version
428 .eq(excluded(objects_snapshot::object_version)),
429 objects_snapshot::object_status
430 .eq(excluded(objects_snapshot::object_status)),
431 objects_snapshot::object_digest
432 .eq(excluded(objects_snapshot::object_digest)),
433 objects_snapshot::checkpoint_sequence_number
434 .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
435 objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
436 objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
437 objects_snapshot::object_type_package
438 .eq(excluded(objects_snapshot::object_type_package)),
439 objects_snapshot::object_type_module
440 .eq(excluded(objects_snapshot::object_type_module)),
441 objects_snapshot::object_type_name
442 .eq(excluded(objects_snapshot::object_type_name)),
443 objects_snapshot::object_type
444 .eq(excluded(objects_snapshot::object_type)),
445 objects_snapshot::serialized_object
446 .eq(excluded(objects_snapshot::serialized_object)),
447 objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
448 objects_snapshot::coin_balance
449 .eq(excluded(objects_snapshot::coin_balance)),
450 objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
451 ),
452 conn
453 );
454 }
455 Ok::<(), IndexerError>(())
456 },
457 PG_DB_COMMIT_SLEEP_DURATION
458 )
459 .tap_ok(|_| {
460 let elapsed = guard.stop_and_record();
461 info!(
462 elapsed,
463 "Persisted {} chunked objects snapshot",
464 objects_snapshot.len(),
465 );
466 })
467 .tap_err(|e| {
468 tracing::error!("Failed to persist object snapshot with error: {}", e);
469 })
470 }
471
472 fn persist_objects_history_chunk(
473 &self,
474 stored_objects_history: Vec<StoredHistoryObject>,
475 ) -> Result<(), IndexerError> {
476 let guard = self
477 .metrics
478 .checkpoint_db_commit_latency_objects_history_chunks
479 .start_timer();
480 transactional_blocking_with_retry!(
481 &self.blocking_cp,
482 |conn| {
483 for stored_objects_history_chunk in
484 stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
485 {
486 insert_or_ignore_into!(
487 objects_history::table,
488 stored_objects_history_chunk,
489 conn
490 );
491 }
492 Ok::<(), IndexerError>(())
493 },
494 PG_DB_COMMIT_SLEEP_DURATION
495 )
496 .tap_ok(|_| {
497 let elapsed = guard.stop_and_record();
498 info!(
499 elapsed,
500 "Persisted {} chunked objects history",
501 stored_objects_history.len(),
502 );
503 })
504 .tap_err(|e| {
505 tracing::error!("Failed to persist object history with error: {}", e);
506 })
507 }
508
509 fn persist_object_version_chunk(
510 &self,
511 object_versions: Vec<StoredObjectVersion>,
512 ) -> Result<(), IndexerError> {
513 transactional_blocking_with_retry!(
514 &self.blocking_cp,
515 |conn| {
516 for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
517 {
518 insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
519 }
520 Ok::<(), IndexerError>(())
521 },
522 PG_DB_COMMIT_SLEEP_DURATION
523 )
524 .tap_ok(|_| {
525 info!(
526 "Persisted {} chunked object versions",
527 object_versions.len(),
528 );
529 })
530 .tap_err(|e| {
531 tracing::error!("Failed to persist object version chunk with error: {}", e);
532 })
533 }
534
535 fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
536 let Some(first_checkpoint) = checkpoints.first() else {
537 return Ok(());
538 };
539
540 if first_checkpoint.sequence_number == 0 {
543 let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
544 self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())?;
545 transactional_blocking_with_retry!(
546 &self.blocking_cp,
547 |conn| {
548 let checkpoint_digest =
549 first_checkpoint.checkpoint_digest.into_inner().to_vec();
550 insert_or_ignore_into!(
551 chain_identifier::table,
552 StoredChainIdentifier { checkpoint_digest },
553 conn
554 );
555 Ok::<(), IndexerError>(())
556 },
557 PG_DB_COMMIT_SLEEP_DURATION
558 )?;
559 }
560 let guard = self
561 .metrics
562 .checkpoint_db_commit_latency_checkpoints
563 .start_timer();
564
565 let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
566 transactional_blocking_with_retry!(
567 &self.blocking_cp,
568 |conn| {
569 for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
570 insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
571 }
572 Ok::<(), IndexerError>(())
573 },
574 PG_DB_COMMIT_SLEEP_DURATION
575 )
576 .tap_ok(|_| {
577 info!(
578 "Persisted {} pruner_cp_watermark rows.",
579 stored_cp_txs.len(),
580 );
581 })
582 .tap_err(|e| {
583 tracing::error!("Failed to persist pruner_cp_watermark with error: {}", e);
584 })?;
585
586 let stored_checkpoints = checkpoints
587 .iter()
588 .map(StoredCheckpoint::from)
589 .collect::<Vec<_>>();
590 transactional_blocking_with_retry!(
591 &self.blocking_cp,
592 |conn| {
593 for stored_checkpoint_chunk in
594 stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
595 {
596 insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
597 let time_now_ms = chrono::Utc::now().timestamp_millis();
598 for stored_checkpoint in stored_checkpoint_chunk {
599 self.metrics
600 .db_commit_lag_ms
601 .set(time_now_ms - stored_checkpoint.timestamp_ms);
602 self.metrics.max_committed_checkpoint_sequence_number.set(
603 stored_checkpoint.sequence_number,
604 );
605 self.metrics.committed_checkpoint_timestamp_ms.set(
606 stored_checkpoint.timestamp_ms,
607 );
608 }
609 for stored_checkpoint in stored_checkpoint_chunk {
610 info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
611 }
612 }
613 Ok::<(), IndexerError>(())
614 },
615 PG_DB_COMMIT_SLEEP_DURATION
616 )
617 .tap_ok(|_| {
618 let elapsed = guard.stop_and_record();
619 info!(
620 elapsed,
621 "Persisted {} checkpoints",
622 stored_checkpoints.len()
623 );
624 })
625 .tap_err(|e| {
626 tracing::error!("Failed to persist checkpoints with error: {}", e);
627 })
628 }
629
630 fn persist_transactions_chunk(
631 &self,
632 transactions: Vec<IndexedTransaction>,
633 ) -> Result<(), IndexerError> {
634 let guard = self
635 .metrics
636 .checkpoint_db_commit_latency_transactions_chunks
637 .start_timer();
638 let transformation_guard = self
639 .metrics
640 .checkpoint_db_commit_latency_transactions_chunks_transformation
641 .start_timer();
642 let transactions = transactions
643 .iter()
644 .map(StoredTransaction::from)
645 .collect::<Vec<_>>();
646 drop(transformation_guard);
647
648 transactional_blocking_with_retry!(
649 &self.blocking_cp,
650 |conn| {
651 for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
652 insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
653 }
654 Ok::<(), IndexerError>(())
655 },
656 PG_DB_COMMIT_SLEEP_DURATION
657 )
658 .tap_ok(|_| {
659 let elapsed = guard.stop_and_record();
660 info!(
661 elapsed,
662 "Persisted {} chunked transactions",
663 transactions.len()
664 );
665 })
666 .tap_err(|e| {
667 tracing::error!("Failed to persist transactions with error: {}", e);
668 })
669 }
670
671 fn persist_tx_insertion_order_chunk(
672 &self,
673 tx_order: Vec<TxInsertionOrder>,
674 ) -> Result<(), IndexerError> {
675 let guard = self
676 .metrics
677 .checkpoint_db_commit_latency_tx_insertion_order_chunks
678 .start_timer();
679
680 transactional_blocking_with_retry!(
681 &self.blocking_cp,
682 |conn| {
683 for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
684 insert_or_ignore_into!(tx_insertion_order::table, tx_order_chunk, conn);
685 }
686 Ok::<(), IndexerError>(())
687 },
688 PG_DB_COMMIT_SLEEP_DURATION
689 )
690 .tap_ok(|_| {
691 let elapsed = guard.stop_and_record();
692 info!(
693 elapsed,
694 "Persisted {} chunked txs insertion order",
695 tx_order.len()
696 );
697 })
698 .tap_err(|e| {
699 tracing::error!("Failed to persist txs insertion order with error: {e}");
700 })
701 }
702
703 fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
704 let guard = self
705 .metrics
706 .checkpoint_db_commit_latency_events_chunks
707 .start_timer();
708 let len = events.len();
709 let events = events
710 .into_iter()
711 .map(StoredEvent::from)
712 .collect::<Vec<_>>();
713
714 transactional_blocking_with_retry!(
715 &self.blocking_cp,
716 |conn| {
717 for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
718 insert_or_ignore_into!(events::table, event_chunk, conn);
719 }
720 Ok::<(), IndexerError>(())
721 },
722 PG_DB_COMMIT_SLEEP_DURATION
723 )
724 .tap_ok(|_| {
725 let elapsed = guard.stop_and_record();
726 info!(elapsed, "Persisted {} chunked events", len);
727 })
728 .tap_err(|e| {
729 tracing::error!("Failed to persist events with error: {}", e);
730 })
731 }
732
733 fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
734 if packages.is_empty() {
735 return Ok(());
736 }
737 let guard = self
738 .metrics
739 .checkpoint_db_commit_latency_packages
740 .start_timer();
741 let packages = packages
742 .into_iter()
743 .map(StoredPackage::from)
744 .collect::<Vec<_>>();
745 transactional_blocking_with_retry!(
746 &self.blocking_cp,
747 |conn| {
748 for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
749 on_conflict_do_update!(
750 packages::table,
751 packages_chunk,
752 packages::package_id,
753 (
754 packages::package_id.eq(excluded(packages::package_id)),
755 packages::move_package.eq(excluded(packages::move_package)),
756 ),
757 conn
758 );
759 }
760 Ok::<(), IndexerError>(())
761 },
762 PG_DB_COMMIT_SLEEP_DURATION
763 )
764 .tap_ok(|_| {
765 let elapsed = guard.stop_and_record();
766 info!(elapsed, "Persisted {} packages", packages.len());
767 })
768 .tap_err(|e| {
769 tracing::error!("Failed to persist packages with error: {}", e);
770 })
771 }
772
773 async fn persist_event_indices_chunk(
774 &self,
775 indices: Vec<EventIndex>,
776 ) -> Result<(), IndexerError> {
777 let guard = self
778 .metrics
779 .checkpoint_db_commit_latency_event_indices_chunks
780 .start_timer();
781 let len = indices.len();
782 let (
783 event_emit_packages,
784 event_emit_modules,
785 event_senders,
786 event_struct_packages,
787 event_struct_modules,
788 event_struct_names,
789 event_struct_instantiations,
790 ) = indices.into_iter().map(|i| i.split()).fold(
791 (
792 Vec::new(),
793 Vec::new(),
794 Vec::new(),
795 Vec::new(),
796 Vec::new(),
797 Vec::new(),
798 Vec::new(),
799 ),
800 |(
801 mut event_emit_packages,
802 mut event_emit_modules,
803 mut event_senders,
804 mut event_struct_packages,
805 mut event_struct_modules,
806 mut event_struct_names,
807 mut event_struct_instantiations,
808 ),
809 index| {
810 event_emit_packages.push(index.0);
811 event_emit_modules.push(index.1);
812 event_senders.push(index.2);
813 event_struct_packages.push(index.3);
814 event_struct_modules.push(index.4);
815 event_struct_names.push(index.5);
816 event_struct_instantiations.push(index.6);
817 (
818 event_emit_packages,
819 event_emit_modules,
820 event_senders,
821 event_struct_packages,
822 event_struct_modules,
823 event_struct_names,
824 event_struct_instantiations,
825 )
826 },
827 );
828
829 let mut futures = vec![];
831 futures.push(self.spawn_blocking_task(move |this| {
832 persist_chunk_into_table!(
833 event_emit_package::table,
834 event_emit_packages,
835 &this.blocking_cp
836 )
837 }));
838
839 futures.push(self.spawn_blocking_task(move |this| {
840 persist_chunk_into_table!(
841 event_emit_module::table,
842 event_emit_modules,
843 &this.blocking_cp
844 )
845 }));
846
847 futures.push(self.spawn_blocking_task(move |this| {
848 persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
849 }));
850
851 futures.push(self.spawn_blocking_task(move |this| {
852 persist_chunk_into_table!(
853 event_struct_package::table,
854 event_struct_packages,
855 &this.blocking_cp
856 )
857 }));
858
859 futures.push(self.spawn_blocking_task(move |this| {
860 persist_chunk_into_table!(
861 event_struct_module::table,
862 event_struct_modules,
863 &this.blocking_cp
864 )
865 }));
866
867 futures.push(self.spawn_blocking_task(move |this| {
868 persist_chunk_into_table!(
869 event_struct_name::table,
870 event_struct_names,
871 &this.blocking_cp
872 )
873 }));
874
875 futures.push(self.spawn_blocking_task(move |this| {
876 persist_chunk_into_table!(
877 event_struct_instantiation::table,
878 event_struct_instantiations,
879 &this.blocking_cp
880 )
881 }));
882
883 futures::future::try_join_all(futures)
884 .await
885 .map_err(|e| {
886 tracing::error!("Failed to join event indices futures in a chunk: {}", e);
887 IndexerError::from(e)
888 })?
889 .into_iter()
890 .collect::<Result<Vec<_>, _>>()
891 .map_err(|e| {
892 IndexerError::PostgresWrite(format!(
893 "Failed to persist all event indices in a chunk: {e:?}"
894 ))
895 })?;
896 let elapsed = guard.stop_and_record();
897 info!(elapsed, "Persisted {} chunked event indices", len);
898 Ok(())
899 }
900
901 async fn persist_tx_indices_chunk(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
902 let guard = self
903 .metrics
904 .checkpoint_db_commit_latency_tx_indices_chunks
905 .start_timer();
906 let len = indices.len();
907 let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) =
908 indices.into_iter().map(|i| i.split()).fold(
909 (
910 Vec::new(),
911 Vec::new(),
912 Vec::new(),
913 Vec::new(),
914 Vec::new(),
915 Vec::new(),
916 Vec::new(),
917 Vec::new(),
918 Vec::new(),
919 ),
920 |(
921 mut tx_senders,
922 mut tx_recipients,
923 mut tx_input_objects,
924 mut tx_changed_objects,
925 mut tx_pkgs,
926 mut tx_mods,
927 mut tx_funs,
928 mut tx_digests,
929 mut tx_kinds,
930 ),
931 index| {
932 tx_senders.extend(index.0);
933 tx_recipients.extend(index.1);
934 tx_input_objects.extend(index.2);
935 tx_changed_objects.extend(index.3);
936 tx_pkgs.extend(index.4);
937 tx_mods.extend(index.5);
938 tx_funs.extend(index.6);
939 tx_digests.extend(index.7);
940 tx_kinds.extend(index.8);
941 (
942 tx_senders,
943 tx_recipients,
944 tx_input_objects,
945 tx_changed_objects,
946 tx_pkgs,
947 tx_mods,
948 tx_funs,
949 tx_digests,
950 tx_kinds,
951 )
952 },
953 );
954
955 let mut futures = vec![];
956 futures.push(self.spawn_blocking_task(move |this| {
957 let now = Instant::now();
958 let senders_len = senders.len();
959 let recipients_len = recipients.len();
960 transactional_blocking_with_retry!(
961 &this.blocking_cp,
962 |conn| {
963 for chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
964 insert_or_ignore_into!(tx_senders::table, chunk, conn);
965 }
966 for chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
967 insert_or_ignore_into!(tx_recipients::table, chunk, conn);
968 }
969 Ok::<(), IndexerError>(())
970 },
971 PG_DB_COMMIT_SLEEP_DURATION
972 )
973 .tap_ok(|_| {
974 let elapsed = now.elapsed().as_secs_f64();
975 info!(
976 elapsed,
977 "Persisted {} rows to tx_senders and {} rows to tx_recipients",
978 senders_len,
979 recipients_len,
980 );
981 })
982 .tap_err(|e| {
983 tracing::error!(
984 "Failed to persist tx_senders and tx_recipients with error: {}",
985 e
986 );
987 })
988 }));
989
990 futures.push(self.spawn_blocking_task(move |this| {
991 let now = Instant::now();
992 let input_objects_len = input_objects.len();
993 transactional_blocking_with_retry!(
994 &this.blocking_cp,
995 |conn| {
996 for chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
997 insert_or_ignore_into!(tx_input_objects::table, chunk, conn);
998 }
999 Ok::<(), IndexerError>(())
1000 },
1001 PG_DB_COMMIT_SLEEP_DURATION
1002 )
1003 .tap_ok(|_| {
1004 let elapsed = now.elapsed().as_secs_f64();
1005 info!(
1006 elapsed,
1007 "Persisted {} rows to tx_input_objects", input_objects_len,
1008 );
1009 })
1010 .tap_err(|e| {
1011 tracing::error!("Failed to persist tx_input_objects with error: {}", e);
1012 })
1013 }));
1014
1015 futures.push(self.spawn_blocking_task(move |this| {
1016 let now = Instant::now();
1017 let changed_objects_len = changed_objects.len();
1018 transactional_blocking_with_retry!(
1019 &this.blocking_cp,
1020 |conn| {
1021 for chunk in changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1022 insert_or_ignore_into!(tx_changed_objects::table, chunk, conn);
1023 }
1024 Ok::<(), IndexerError>(())
1025 },
1026 PG_DB_COMMIT_SLEEP_DURATION
1027 )
1028 .tap_ok(|_| {
1029 let elapsed = now.elapsed().as_secs_f64();
1030 info!(
1031 elapsed,
1032 "Persisted {} rows to tx_changed_objects table", changed_objects_len,
1033 );
1034 })
1035 .tap_err(|e| {
1036 tracing::error!("Failed to persist tx_changed_objects with error: {}", e);
1037 })
1038 }));
1039
1040 futures.push(self.spawn_blocking_task(move |this| {
1041 let now = Instant::now();
1042 let rows_len = pkgs.len();
1043 transactional_blocking_with_retry!(
1044 &this.blocking_cp,
1045 |conn| {
1046 for chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1047 insert_or_ignore_into!(tx_calls_pkg::table, chunk, conn);
1048 }
1049 Ok::<(), IndexerError>(())
1050 },
1051 PG_DB_COMMIT_SLEEP_DURATION
1052 )
1053 .tap_ok(|_| {
1054 let elapsed = now.elapsed().as_secs_f64();
1055 info!(
1056 elapsed,
1057 "Persisted {} rows to tx_calls_pkg tables", rows_len
1058 );
1059 })
1060 .tap_err(|e| {
1061 tracing::error!("Failed to persist tx_calls_pkg with error: {}", e);
1062 })
1063 }));
1064
1065 futures.push(self.spawn_blocking_task(move |this| {
1066 let now = Instant::now();
1067 let rows_len = mods.len();
1068 transactional_blocking_with_retry!(
1069 &this.blocking_cp,
1070 |conn| {
1071 for chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1072 insert_or_ignore_into!(tx_calls_mod::table, chunk, conn);
1073 }
1074 Ok::<(), IndexerError>(())
1075 },
1076 PG_DB_COMMIT_SLEEP_DURATION
1077 )
1078 .tap_ok(|_| {
1079 let elapsed = now.elapsed().as_secs_f64();
1080 info!(elapsed, "Persisted {} rows to tx_calls_mod table", rows_len);
1081 })
1082 .tap_err(|e| {
1083 tracing::error!("Failed to persist tx_calls_mod with error: {}", e);
1084 })
1085 }));
1086
1087 futures.push(self.spawn_blocking_task(move |this| {
1088 let now = Instant::now();
1089 let rows_len = funs.len();
1090 transactional_blocking_with_retry!(
1091 &this.blocking_cp,
1092 |conn| {
1093 for chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1094 insert_or_ignore_into!(tx_calls_fun::table, chunk, conn);
1095 }
1096 Ok::<(), IndexerError>(())
1097 },
1098 PG_DB_COMMIT_SLEEP_DURATION
1099 )
1100 .tap_ok(|_| {
1101 let elapsed = now.elapsed().as_secs_f64();
1102 info!(elapsed, "Persisted {} rows to tx_calls_fun table", rows_len);
1103 })
1104 .tap_err(|e| {
1105 tracing::error!("Failed to persist tx_calls_fun with error: {}", e);
1106 })
1107 }));
1108
1109 futures.push(self.spawn_blocking_task(move |this| {
1110 let now = Instant::now();
1111 let calls_len = digests.len();
1112 transactional_blocking_with_retry!(
1113 &this.blocking_cp,
1114 |conn| {
1115 for chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1116 insert_or_ignore_into!(tx_digests::table, chunk, conn);
1117 }
1118 Ok::<(), IndexerError>(())
1119 },
1120 Duration::from_secs(60)
1121 )
1122 .tap_ok(|_| {
1123 let elapsed = now.elapsed().as_secs_f64();
1124 info!(elapsed, "Persisted {} rows to tx_digests tables", calls_len);
1125 })
1126 .tap_err(|e| {
1127 tracing::error!("Failed to persist tx_digests with error: {}", e);
1128 })
1129 }));
1130
1131 futures.push(self.spawn_blocking_task(move |this| {
1132 let now = Instant::now();
1133 let rows_len = kinds.len();
1134 transactional_blocking_with_retry!(
1135 &this.blocking_cp,
1136 |conn| {
1137 for chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1138 insert_or_ignore_into!(tx_kinds::table, chunk, conn);
1139 }
1140 Ok::<(), IndexerError>(())
1141 },
1142 Duration::from_secs(60)
1143 )
1144 .tap_ok(|_| {
1145 let elapsed = now.elapsed().as_secs_f64();
1146 info!(elapsed, "Persisted {} rows to tx_kinds tables", rows_len);
1147 })
1148 .tap_err(|e| {
1149 tracing::error!("Failed to persist tx_kinds with error: {}", e);
1150 })
1151 }));
1152
1153 futures::future::try_join_all(futures)
1154 .await
1155 .map_err(|e| {
1156 tracing::error!("Failed to join tx indices futures in a chunk: {}", e);
1157 IndexerError::from(e)
1158 })?
1159 .into_iter()
1160 .collect::<Result<Vec<_>, _>>()
1161 .map_err(|e| {
1162 IndexerError::PostgresWrite(format!(
1163 "Failed to persist all tx indices in a chunk: {e:?}"
1164 ))
1165 })?;
1166 let elapsed = guard.stop_and_record();
1167 info!(elapsed, "Persisted {} chunked tx_indices", len);
1168 Ok(())
1169 }
1170
1171 fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1172 let guard = self
1173 .metrics
1174 .checkpoint_db_commit_latency_epoch
1175 .start_timer();
1176 let epoch_id = epoch.new_epoch.epoch;
1177
1178 transactional_blocking_with_retry!(
1179 &self.blocking_cp,
1180 |conn| {
1181 if let Some(last_epoch) = &epoch.last_epoch {
1182 let last_epoch_id = last_epoch.epoch;
1183 let previous_epoch_network_total_transactions = match epoch_id {
1188 0 | 1 => 0,
1189 _ => {
1190 let prev_epoch_id = epoch_id - 2;
1191 let result = checkpoints::table
1192 .filter(checkpoints::epoch.eq(prev_epoch_id as i64))
1193 .select(max(checkpoints::network_total_transactions))
1194 .first::<Option<i64>>(conn)
1195 .map(|o| o.unwrap_or(0))?;
1196
1197 result as u64
1198 }
1199 };
1200
1201 let epoch_total_transactions = epoch.network_total_transactions
1202 - previous_epoch_network_total_transactions;
1203
1204 let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch);
1205 last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64);
1206 info!(last_epoch_id, "Persisting epoch end data.");
1207 on_conflict_do_update!(
1208 epochs::table,
1209 vec![last_epoch],
1210 epochs::epoch,
1211 (
1212 epochs::epoch_total_transactions
1214 .eq(excluded(epochs::epoch_total_transactions)),
1215 epochs::last_checkpoint_id.eq(excluded(epochs::last_checkpoint_id)),
1216 epochs::epoch_end_timestamp.eq(excluded(epochs::epoch_end_timestamp)),
1217 epochs::storage_charge.eq(excluded(epochs::storage_charge)),
1218 epochs::storage_rebate.eq(excluded(epochs::storage_rebate)),
1219 epochs::total_gas_fees.eq(excluded(epochs::total_gas_fees)),
1220 epochs::total_stake_rewards_distributed
1221 .eq(excluded(epochs::total_stake_rewards_distributed)),
1222 epochs::epoch_commitments.eq(excluded(epochs::epoch_commitments)),
1223 epochs::burnt_tokens_amount.eq(excluded(epochs::burnt_tokens_amount)),
1224 epochs::minted_tokens_amount.eq(excluded(epochs::minted_tokens_amount)),
1225 ),
1226 conn
1227 );
1228 }
1229
1230 let epoch_id = epoch.new_epoch.epoch;
1231 info!(epoch_id, "Persisting epoch beginning info");
1232 let new_epoch = StoredEpochInfo::from_epoch_beginning_info(&epoch.new_epoch);
1233 insert_or_ignore_into!(epochs::table, new_epoch, conn);
1234 Ok::<(), IndexerError>(())
1235 },
1236 PG_DB_COMMIT_SLEEP_DURATION
1237 )
1238 .tap_ok(|_| {
1239 let elapsed = guard.stop_and_record();
1240 info!(elapsed, epoch_id, "Persisted epoch beginning info");
1241 })
1242 .tap_err(|e| {
1243 tracing::error!("Failed to persist epoch with error: {}", e);
1244 })
1245 }
1246
1247 fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1248 let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1249 if let Some(last_epoch_id) = last_epoch_id {
1251 let last_db_epoch: Option<StoredEpochInfo> =
1252 read_only_blocking!(&self.blocking_cp, |conn| {
1253 epochs::table
1254 .filter(epochs::epoch.eq(last_epoch_id as i64))
1255 .first::<StoredEpochInfo>(conn)
1256 .optional()
1257 })
1258 .context("Failed to read last epoch from PostgresDB")?;
1259 if let Some(last_epoch) = last_db_epoch {
1260 let epoch_partition_data =
1261 EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1262 let table_partitions = self.partition_manager.get_table_partitions()?;
1263 for (table, (_, last_partition)) in table_partitions {
1264 if !self
1266 .partition_manager
1267 .get_strategy(&table)
1268 .is_epoch_partitioned()
1269 {
1270 continue;
1271 }
1272 let guard = self.metrics.advance_epoch_latency.start_timer();
1273 self.partition_manager.advance_epoch(
1274 table.clone(),
1275 last_partition,
1276 &epoch_partition_data,
1277 )?;
1278 let elapsed = guard.stop_and_record();
1279 info!(
1280 elapsed,
1281 "Advanced epoch partition {} for table {}",
1282 last_partition,
1283 table.clone()
1284 );
1285 }
1286 } else {
1287 tracing::error!("Last epoch: {} from PostgresDB is None.", last_epoch_id);
1288 }
1289 }
1290
1291 Ok(())
1292 }
1293
1294 fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1295 transactional_blocking_with_retry!(
1296 &self.blocking_cp,
1297 |conn| {
1298 diesel::delete(
1299 checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1300 )
1301 .execute(conn)
1302 .map_err(IndexerError::from)
1303 .context("Failed to prune checkpoints table")?;
1304
1305 Ok::<(), IndexerError>(())
1306 },
1307 PG_DB_COMMIT_SLEEP_DURATION
1308 )
1309 }
1310
1311 fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1312 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1313 transactional_blocking_with_retry!(
1314 &self.blocking_cp,
1315 |conn| {
1316 prune_tx_or_event_indice_table!(
1317 event_emit_module,
1318 conn,
1319 min_tx,
1320 max_tx,
1321 "Failed to prune event_emit_module table"
1322 );
1323 prune_tx_or_event_indice_table!(
1324 event_emit_package,
1325 conn,
1326 min_tx,
1327 max_tx,
1328 "Failed to prune event_emit_package table"
1329 );
1330 prune_tx_or_event_indice_table![
1331 event_senders,
1332 conn,
1333 min_tx,
1334 max_tx,
1335 "Failed to prune event_senders table"
1336 ];
1337 prune_tx_or_event_indice_table![
1338 event_struct_instantiation,
1339 conn,
1340 min_tx,
1341 max_tx,
1342 "Failed to prune event_struct_instantiation table"
1343 ];
1344 prune_tx_or_event_indice_table![
1345 event_struct_module,
1346 conn,
1347 min_tx,
1348 max_tx,
1349 "Failed to prune event_struct_module table"
1350 ];
1351 prune_tx_or_event_indice_table![
1352 event_struct_name,
1353 conn,
1354 min_tx,
1355 max_tx,
1356 "Failed to prune event_struct_name table"
1357 ];
1358 prune_tx_or_event_indice_table![
1359 event_struct_package,
1360 conn,
1361 min_tx,
1362 max_tx,
1363 "Failed to prune event_struct_package table"
1364 ];
1365 Ok::<(), IndexerError>(())
1366 },
1367 PG_DB_COMMIT_SLEEP_DURATION
1368 )
1369 }
1370
1371 fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1372 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1373 transactional_blocking_with_retry!(
1374 &self.blocking_cp,
1375 |conn| {
1376 prune_tx_or_event_indice_table!(
1377 tx_senders,
1378 conn,
1379 min_tx,
1380 max_tx,
1381 "Failed to prune tx_senders table"
1382 );
1383 prune_tx_or_event_indice_table!(
1384 tx_recipients,
1385 conn,
1386 min_tx,
1387 max_tx,
1388 "Failed to prune tx_recipients table"
1389 );
1390 prune_tx_or_event_indice_table![
1391 tx_input_objects,
1392 conn,
1393 min_tx,
1394 max_tx,
1395 "Failed to prune tx_input_objects table"
1396 ];
1397 prune_tx_or_event_indice_table![
1398 tx_changed_objects,
1399 conn,
1400 min_tx,
1401 max_tx,
1402 "Failed to prune tx_changed_objects table"
1403 ];
1404 prune_tx_or_event_indice_table![
1405 tx_calls_pkg,
1406 conn,
1407 min_tx,
1408 max_tx,
1409 "Failed to prune tx_calls_pkg table"
1410 ];
1411 prune_tx_or_event_indice_table![
1412 tx_calls_mod,
1413 conn,
1414 min_tx,
1415 max_tx,
1416 "Failed to prune tx_calls_mod table"
1417 ];
1418 prune_tx_or_event_indice_table![
1419 tx_calls_fun,
1420 conn,
1421 min_tx,
1422 max_tx,
1423 "Failed to prune tx_calls_fun table"
1424 ];
1425 prune_tx_or_event_indice_table![
1426 tx_digests,
1427 conn,
1428 min_tx,
1429 max_tx,
1430 "Failed to prune tx_digests table"
1431 ];
1432 Ok::<(), IndexerError>(())
1433 },
1434 PG_DB_COMMIT_SLEEP_DURATION
1435 )
1436 }
1437
1438 fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1439 transactional_blocking_with_retry!(
1440 &self.blocking_cp,
1441 |conn| {
1442 diesel::delete(
1443 pruner_cp_watermark::table
1444 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1445 )
1446 .execute(conn)
1447 .map_err(IndexerError::from)
1448 .context("Failed to prune pruner_cp_watermark table")?;
1449 Ok::<(), IndexerError>(())
1450 },
1451 PG_DB_COMMIT_SLEEP_DURATION
1452 )
1453 }
1454
1455 fn get_network_total_transactions_by_end_of_epoch(
1456 &self,
1457 epoch: u64,
1458 ) -> Result<u64, IndexerError> {
1459 read_only_blocking!(&self.blocking_cp, |conn| {
1460 checkpoints::table
1461 .filter(checkpoints::epoch.eq(epoch as i64))
1462 .select(checkpoints::network_total_transactions)
1463 .order_by(checkpoints::sequence_number.desc())
1464 .first::<i64>(conn)
1465 })
1466 .context("Failed to get network total transactions in epoch")
1467 .map(|v| v as u64)
1468 }
1469
1470 fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1471 transactional_blocking_with_retry!(
1472 &self.blocking_cp,
1473 |conn| {
1474 diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1475 .execute(conn)?;
1476 Ok::<(), IndexerError>(())
1477 },
1478 PG_DB_COMMIT_SLEEP_DURATION
1479 )
1480 .tap_ok(|_| {
1481 info!("Successfully refreshed participation_metrics");
1482 })
1483 .tap_err(|e| {
1484 tracing::error!("Failed to refresh participation_metrics: {e}");
1485 })
1486 }
1487
1488 async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1489 where
1490 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1491 R: Send + 'static,
1492 {
1493 let this = self.clone();
1494 let current_span = tracing::Span::current();
1495 tokio::task::spawn_blocking(move || {
1496 let _guard = current_span.enter();
1497 f(this)
1498 })
1499 .await
1500 .map_err(Into::into)
1501 .and_then(std::convert::identity)
1502 }
1503
1504 fn spawn_blocking_task<F, R>(
1505 &self,
1506 f: F,
1507 ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1508 where
1509 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1510 R: Send + 'static,
1511 {
1512 let this = self.clone();
1513 let current_span = tracing::Span::current();
1514 let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1515 tokio::task::spawn_blocking(move || {
1516 let _guard = current_span.enter();
1517 let _elapsed = guard.stop_and_record();
1518 f(this)
1519 })
1520 }
1521
1522 fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1523 where
1524 F: FnOnce(Self) -> Fut + Send + 'static,
1525 Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1526 R: Send + 'static,
1527 {
1528 let this = self.clone();
1529 tokio::task::spawn(async move { f(this).await })
1530 }
1531}
1532
1533#[async_trait]
1534impl IndexerStore for PgIndexerStore {
1535 async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1536 self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1537 .await
1538 }
1539
1540 async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1541 self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1542 .await
1543 }
1544
1545 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1546 self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1547 .await
1548 }
1549
1550 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1551 self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1552 .await
1553 }
1554
1555 async fn get_latest_object_snapshot_checkpoint_sequence_number(
1556 &self,
1557 ) -> Result<Option<u64>, IndexerError> {
1558 self.execute_in_blocking_worker(|this| {
1559 this.get_latest_object_snapshot_checkpoint_sequence_number()
1560 })
1561 .await
1562 }
1563
1564 async fn persist_objects(
1565 &self,
1566 object_changes: Vec<TransactionObjectChangesToCommit>,
1567 ) -> Result<(), IndexerError> {
1568 if object_changes.is_empty() {
1569 return Ok(());
1570 }
1571 let guard = self
1572 .metrics
1573 .checkpoint_db_commit_latency_objects
1574 .start_timer();
1575 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1576 let object_mutations = indexed_mutations
1577 .into_iter()
1578 .map(StoredObject::from)
1579 .collect::<Vec<_>>();
1580 let object_deletions = indexed_deletions
1581 .into_iter()
1582 .map(StoredDeletedObject::from)
1583 .collect::<Vec<_>>();
1584 let mutation_len = object_mutations.len();
1585 let deletion_len = object_deletions.len();
1586
1587 let object_mutation_chunks =
1588 chunk!(object_mutations, self.config.parallel_objects_chunk_size);
1589 let object_deletion_chunks =
1590 chunk!(object_deletions, self.config.parallel_objects_chunk_size);
1591 let mutation_futures = object_mutation_chunks
1592 .into_iter()
1593 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_mutation_chunk(c)));
1594 futures::future::try_join_all(mutation_futures)
1595 .await
1596 .map_err(|e| {
1597 tracing::error!(
1598 "Failed to join persist_object_mutation_chunk futures: {}",
1599 e
1600 );
1601 IndexerError::from(e)
1602 })?
1603 .into_iter()
1604 .collect::<Result<Vec<_>, _>>()
1605 .map_err(|e| {
1606 IndexerError::PostgresWrite(format!(
1607 "Failed to persist all object mutation chunks: {e:?}"
1608 ))
1609 })?;
1610 let deletion_futures = object_deletion_chunks
1611 .into_iter()
1612 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_deletion_chunk(c)));
1613 futures::future::try_join_all(deletion_futures)
1614 .await
1615 .map_err(|e| {
1616 tracing::error!(
1617 "Failed to join persist_object_deletion_chunk futures: {}",
1618 e
1619 );
1620 IndexerError::from(e)
1621 })?
1622 .into_iter()
1623 .collect::<Result<Vec<_>, _>>()
1624 .map_err(|e| {
1625 IndexerError::PostgresWrite(format!(
1626 "Failed to persist all object deletion chunks: {e:?}"
1627 ))
1628 })?;
1629
1630 let elapsed = guard.stop_and_record();
1631 info!(
1632 elapsed,
1633 "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
1634 );
1635 Ok(())
1636 }
1637
1638 async fn persist_objects_snapshot(
1639 &self,
1640 object_changes: Vec<TransactionObjectChangesToCommit>,
1641 ) -> Result<(), IndexerError> {
1642 if object_changes.is_empty() {
1643 return Ok(());
1644 }
1645 let guard = self
1646 .metrics
1647 .checkpoint_db_commit_latency_objects_snapshot
1648 .start_timer();
1649 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1650 let objects_snapshot = indexed_mutations
1651 .into_iter()
1652 .map(StoredObjectSnapshot::from)
1653 .chain(
1654 indexed_deletions
1655 .into_iter()
1656 .map(StoredObjectSnapshot::from),
1657 )
1658 .collect::<Vec<_>>();
1659 let len = objects_snapshot.len();
1660 let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
1661 let futures = chunks
1662 .into_iter()
1663 .map(|c| self.spawn_blocking_task(move |this| this.backfill_objects_snapshot_chunk(c)));
1664
1665 futures::future::try_join_all(futures)
1666 .await
1667 .map_err(|e| {
1668 tracing::error!(
1669 "Failed to join backfill_objects_snapshot_chunk futures: {}",
1670 e
1671 );
1672 IndexerError::from(e)
1673 })?
1674 .into_iter()
1675 .collect::<Result<Vec<_>, _>>()
1676 .map_err(|e| {
1677 IndexerError::PostgresWrite(format!(
1678 "Failed to persist all objects snapshot chunks: {e:?}"
1679 ))
1680 })?;
1681 let elapsed = guard.stop_and_record();
1682 info!(elapsed, "Persisted {} objects snapshot", len);
1683 Ok(())
1684 }
1685
1686 async fn persist_object_history(
1687 &self,
1688 object_changes: Vec<TransactionObjectChangesToCommit>,
1689 ) -> Result<(), IndexerError> {
1690 let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1691 .map(|val| val.eq_ignore_ascii_case("true"))
1692 .unwrap_or(false);
1693 if skip_history {
1694 info!("skipping object history");
1695 return Ok(());
1696 }
1697
1698 if object_changes.is_empty() {
1699 return Ok(());
1700 }
1701 let objects = make_objects_history_to_commit(object_changes);
1702 let guard = self
1703 .metrics
1704 .checkpoint_db_commit_latency_objects_history
1705 .start_timer();
1706
1707 let len = objects.len();
1708 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1709 let futures = chunks
1710 .into_iter()
1711 .map(|c| self.spawn_blocking_task(move |this| this.persist_objects_history_chunk(c)));
1712
1713 futures::future::try_join_all(futures)
1714 .await
1715 .map_err(|e| {
1716 tracing::error!(
1717 "Failed to join persist_objects_history_chunk futures: {}",
1718 e
1719 );
1720 IndexerError::from(e)
1721 })?
1722 .into_iter()
1723 .collect::<Result<Vec<_>, _>>()
1724 .map_err(|e| {
1725 IndexerError::PostgresWrite(format!(
1726 "Failed to persist all objects history chunks: {e:?}"
1727 ))
1728 })?;
1729 let elapsed = guard.stop_and_record();
1730 info!(elapsed, "Persisted {} objects history", len);
1731 Ok(())
1732 }
1733
1734 async fn persist_object_versions(
1735 &self,
1736 object_versions: Vec<StoredObjectVersion>,
1737 ) -> Result<(), IndexerError> {
1738 if object_versions.is_empty() {
1739 return Ok(());
1740 }
1741 let object_versions_count = object_versions.len();
1742
1743 let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1744 let futures = chunks
1745 .into_iter()
1746 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1747 .collect::<Vec<_>>();
1748
1749 futures::future::try_join_all(futures)
1750 .await
1751 .map_err(|e| {
1752 tracing::error!("Failed to join persist_object_version_chunk futures: {}", e);
1753 IndexerError::from(e)
1754 })?
1755 .into_iter()
1756 .collect::<Result<Vec<_>, _>>()
1757 .map_err(|e| {
1758 IndexerError::PostgresWrite(format!(
1759 "Failed to persist all object version chunks: {e:?}"
1760 ))
1761 })?;
1762 info!("Persisted {} objects history", object_versions_count);
1763 Ok(())
1764 }
1765
1766 async fn persist_checkpoints(
1767 &self,
1768 checkpoints: Vec<IndexedCheckpoint>,
1769 ) -> Result<(), IndexerError> {
1770 self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1771 .await
1772 }
1773
1774 async fn persist_transactions(
1775 &self,
1776 transactions: Vec<IndexedTransaction>,
1777 ) -> Result<(), IndexerError> {
1778 let guard = self
1779 .metrics
1780 .checkpoint_db_commit_latency_transactions
1781 .start_timer();
1782 let len = transactions.len();
1783
1784 let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1785 let futures = chunks
1786 .into_iter()
1787 .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1788
1789 futures::future::try_join_all(futures)
1790 .await
1791 .map_err(|e| {
1792 tracing::error!("Failed to join persist_transactions_chunk futures: {}", e);
1793 IndexerError::from(e)
1794 })?
1795 .into_iter()
1796 .collect::<Result<Vec<_>, _>>()
1797 .map_err(|e| {
1798 IndexerError::PostgresWrite(format!(
1799 "Failed to persist all transactions chunks: {e:?}"
1800 ))
1801 })?;
1802 let elapsed = guard.stop_and_record();
1803 info!(elapsed, "Persisted {} transactions", len);
1804 Ok(())
1805 }
1806
1807 async fn persist_optimistic_transaction(
1808 &self,
1809 transaction: OptimisticTransaction,
1810 ) -> Result<(), IndexerError> {
1811 let insertion_order = transaction.insertion_order;
1812
1813 self.spawn_blocking_task(move |this| {
1814 transactional_blocking_with_retry!(
1815 &this.blocking_cp,
1816 |conn| {
1817 insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1818 Ok::<(), IndexerError>(())
1819 },
1820 PG_DB_COMMIT_SLEEP_DURATION
1821 )
1822 .tap_err(|e| {
1823 tracing::error!("Failed to persist transactions with error: {}", e);
1824 })
1825 })
1826 .await
1827 .map_err(|e| {
1828 IndexerError::PostgresWrite(format!("Failed to persist optimistic transaction: {e:?}"))
1829 })??;
1830
1831 info!("Persisted optimistic transaction {insertion_order}");
1832 Ok(())
1833 }
1834
1835 async fn persist_tx_insertion_order(
1836 &self,
1837 tx_order: Vec<TxInsertionOrder>,
1838 ) -> Result<(), IndexerError> {
1839 let guard = self
1840 .metrics
1841 .checkpoint_db_commit_latency_tx_insertion_order
1842 .start_timer();
1843 let len = tx_order.len();
1844
1845 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
1846 let futures = chunks.into_iter().map(|c| {
1847 self.spawn_blocking_task(move |this| this.persist_tx_insertion_order_chunk(c))
1848 });
1849
1850 futures::future::try_join_all(futures)
1851 .await
1852 .map_err(|e| {
1853 tracing::error!("Failed to join persist_tx_insertion_order_chunk futures: {e}",);
1854 IndexerError::from(e)
1855 })?
1856 .into_iter()
1857 .collect::<Result<Vec<_>, _>>()
1858 .map_err(|e| {
1859 IndexerError::PostgresWrite(format!(
1860 "Failed to persist all txs insertion order chunks: {e:?}",
1861 ))
1862 })?;
1863 let elapsed = guard.stop_and_record();
1864 info!(elapsed, "Persisted {len} txs insertion orders");
1865 Ok(())
1866 }
1867
1868 async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1869 if events.is_empty() {
1870 return Ok(());
1871 }
1872 let len = events.len();
1873 let guard = self
1874 .metrics
1875 .checkpoint_db_commit_latency_events
1876 .start_timer();
1877 let chunks = chunk!(events, self.config.parallel_chunk_size);
1878 let futures = chunks
1879 .into_iter()
1880 .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1881
1882 futures::future::try_join_all(futures)
1883 .await
1884 .map_err(|e| {
1885 tracing::error!("Failed to join persist_events_chunk futures: {}", e);
1886 IndexerError::from(e)
1887 })?
1888 .into_iter()
1889 .collect::<Result<Vec<_>, _>>()
1890 .map_err(|e| {
1891 IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
1892 })?;
1893 let elapsed = guard.stop_and_record();
1894 info!(elapsed, "Persisted {} events", len);
1895 Ok(())
1896 }
1897
1898 async fn persist_optimistic_events(
1899 &self,
1900 events: Vec<OptimisticEvent>,
1901 ) -> Result<(), IndexerError> {
1902 if events.is_empty() {
1903 return Ok(());
1904 }
1905
1906 self.spawn_blocking_task(move |this| {
1907 persist_chunk_into_table!(optimistic_events::table, events, &this.blocking_cp)
1908 })
1909 .await
1910 .map_err(|e| {
1911 tracing::error!(
1912 "Failed to join persist_chunk_into_table in persist_optimistic_events: {e}"
1913 );
1914 IndexerError::from(e)
1915 })?
1916 .map_err(|e| {
1917 IndexerError::PostgresWrite(format!(
1918 "Failed to persist all optimistic events chunks: {e:?}"
1919 ))
1920 })
1921 }
1922
1923 async fn persist_displays(
1924 &self,
1925 display_updates: BTreeMap<String, StoredDisplay>,
1926 ) -> Result<(), IndexerError> {
1927 if display_updates.is_empty() {
1928 return Ok(());
1929 }
1930
1931 self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
1932 .await?
1933 }
1934
1935 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1936 if packages.is_empty() {
1937 return Ok(());
1938 }
1939 self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
1940 .await
1941 }
1942
1943 async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
1944 if indices.is_empty() {
1945 return Ok(());
1946 }
1947 let len = indices.len();
1948 let guard = self
1949 .metrics
1950 .checkpoint_db_commit_latency_event_indices
1951 .start_timer();
1952 let chunks = chunk!(indices, self.config.parallel_chunk_size);
1953
1954 let futures = chunks.into_iter().map(|chunk| {
1955 self.spawn_task(move |this: Self| async move {
1956 this.persist_event_indices_chunk(chunk).await
1957 })
1958 });
1959
1960 futures::future::try_join_all(futures)
1961 .await
1962 .map_err(|e| {
1963 tracing::error!("Failed to join persist_event_indices_chunk futures: {}", e);
1964 IndexerError::from(e)
1965 })?
1966 .into_iter()
1967 .collect::<Result<Vec<_>, _>>()
1968 .map_err(|e| {
1969 IndexerError::PostgresWrite(format!(
1970 "Failed to persist all event_indices chunks: {e:?}"
1971 ))
1972 })?;
1973 let elapsed = guard.stop_and_record();
1974 info!(elapsed, "Persisted {} event_indices chunks", len);
1975 Ok(())
1976 }
1977
1978 async fn persist_optimistic_event_indices(
1979 &self,
1980 indices: OptimisticEventIndices,
1981 ) -> Result<(), IndexerError> {
1982 let OptimisticEventIndices {
1983 optimistic_event_emit_packages,
1984 optimistic_event_emit_modules,
1985 optimistic_event_senders,
1986 optimistic_event_struct_packages,
1987 optimistic_event_struct_modules,
1988 optimistic_event_struct_names,
1989 optimistic_event_struct_instantiations,
1990 } = indices;
1991
1992 let mut futures = vec![];
1993 futures.push(self.spawn_blocking_task(move |this| {
1994 persist_chunk_into_table!(
1995 optimistic_event_emit_package::table,
1996 optimistic_event_emit_packages,
1997 &this.blocking_cp
1998 )
1999 }));
2000
2001 futures.push(self.spawn_blocking_task(move |this| {
2002 persist_chunk_into_table!(
2003 optimistic_event_emit_module::table,
2004 optimistic_event_emit_modules,
2005 &this.blocking_cp
2006 )
2007 }));
2008
2009 futures.push(self.spawn_blocking_task(move |this| {
2010 persist_chunk_into_table!(
2011 optimistic_event_senders::table,
2012 optimistic_event_senders,
2013 &this.blocking_cp
2014 )
2015 }));
2016
2017 futures.push(self.spawn_blocking_task(move |this| {
2018 persist_chunk_into_table!(
2019 optimistic_event_struct_package::table,
2020 optimistic_event_struct_packages,
2021 &this.blocking_cp
2022 )
2023 }));
2024
2025 futures.push(self.spawn_blocking_task(move |this| {
2026 persist_chunk_into_table!(
2027 optimistic_event_struct_module::table,
2028 optimistic_event_struct_modules,
2029 &this.blocking_cp
2030 )
2031 }));
2032
2033 futures.push(self.spawn_blocking_task(move |this| {
2034 persist_chunk_into_table!(
2035 optimistic_event_struct_name::table,
2036 optimistic_event_struct_names,
2037 &this.blocking_cp
2038 )
2039 }));
2040
2041 futures.push(self.spawn_blocking_task(move |this| {
2042 persist_chunk_into_table!(
2043 optimistic_event_struct_instantiation::table,
2044 optimistic_event_struct_instantiations,
2045 &this.blocking_cp
2046 )
2047 }));
2048
2049 futures::future::try_join_all(futures)
2050 .await
2051 .map_err(|e| {
2052 tracing::error!("Failed to join optimistic event indices futures: {e}");
2053 IndexerError::from(e)
2054 })?
2055 .into_iter()
2056 .collect::<Result<Vec<_>, _>>()
2057 .map_err(|e| {
2058 IndexerError::PostgresWrite(format!(
2059 "Failed to persist all optimistic event indices: {e:?}",
2060 ))
2061 })?;
2062 info!("Persisted optimistic event indices");
2063 Ok(())
2064 }
2065
2066 async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2067 if indices.is_empty() {
2068 return Ok(());
2069 }
2070 let len = indices.len();
2071 let guard = self
2072 .metrics
2073 .checkpoint_db_commit_latency_tx_indices
2074 .start_timer();
2075 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2076
2077 let futures = chunks.into_iter().map(|chunk| {
2078 self.spawn_task(
2079 move |this: Self| async move { this.persist_tx_indices_chunk(chunk).await },
2080 )
2081 });
2082 futures::future::try_join_all(futures)
2083 .await
2084 .map_err(|e| {
2085 tracing::error!("Failed to join persist_tx_indices_chunk futures: {}", e);
2086 IndexerError::from(e)
2087 })?
2088 .into_iter()
2089 .collect::<Result<Vec<_>, _>>()
2090 .map_err(|e| {
2091 IndexerError::PostgresWrite(format!(
2092 "Failed to persist all tx_indices chunks: {e:?}"
2093 ))
2094 })?;
2095 let elapsed = guard.stop_and_record();
2096 info!(elapsed, "Persisted {} tx_indices chunks", len);
2097 Ok(())
2098 }
2099
2100 async fn persist_optimistic_tx_indices(
2101 &self,
2102 indices: OptimisticTxIndices,
2103 ) -> Result<(), IndexerError> {
2104 let OptimisticTxIndices {
2105 optimistic_tx_senders: senders,
2106 optimistic_tx_recipients: recipients,
2107 optimistic_tx_input_objects: input_objects,
2108 optimistic_tx_changed_objects: changed_objects,
2109 optimistic_tx_pkgs: pkgs,
2110 optimistic_tx_mods: mods,
2111 optimistic_tx_funs: funs,
2112 optimistic_tx_kinds: kinds,
2113 } = indices;
2114
2115 let mut futures = vec![];
2116 futures.push(self.spawn_blocking_task(move |this| {
2117 persist_chunk_into_table!(optimistic_tx_senders::table, senders, &this.blocking_cp)
2118 }));
2119
2120 futures.push(self.spawn_blocking_task(move |this| {
2121 persist_chunk_into_table!(
2122 optimistic_tx_recipients::table,
2123 recipients,
2124 &this.blocking_cp
2125 )
2126 }));
2127
2128 futures.push(self.spawn_blocking_task(move |this| {
2129 persist_chunk_into_table!(
2130 optimistic_tx_input_objects::table,
2131 input_objects,
2132 &this.blocking_cp
2133 )
2134 }));
2135
2136 futures.push(self.spawn_blocking_task(move |this| {
2137 persist_chunk_into_table!(
2138 optimistic_tx_changed_objects::table,
2139 changed_objects,
2140 &this.blocking_cp
2141 )
2142 }));
2143
2144 futures.push(self.spawn_blocking_task(move |this| {
2145 persist_chunk_into_table!(optimistic_tx_calls_pkg::table, pkgs, &this.blocking_cp)
2146 }));
2147
2148 futures.push(self.spawn_blocking_task(move |this| {
2149 persist_chunk_into_table!(optimistic_tx_calls_mod::table, mods, &this.blocking_cp)
2150 }));
2151
2152 futures.push(self.spawn_blocking_task(move |this| {
2153 persist_chunk_into_table!(optimistic_tx_calls_fun::table, funs, &this.blocking_cp)
2154 }));
2155
2156 futures.push(self.spawn_blocking_task(move |this| {
2157 persist_chunk_into_table!(optimistic_tx_kinds::table, kinds, &this.blocking_cp)
2158 }));
2159
2160 futures::future::try_join_all(futures)
2161 .await
2162 .map_err(|e| {
2163 tracing::error!("Failed to join optimistic tx indices futures: {e}");
2164 IndexerError::from(e)
2165 })?
2166 .into_iter()
2167 .collect::<Result<Vec<_>, _>>()
2168 .map_err(|e| {
2169 IndexerError::PostgresWrite(format!(
2170 "Failed to persist all optimistic tx indices: {e:?}",
2171 ))
2172 })?;
2173 info!("Persisted optimistic tx indices");
2174 Ok(())
2175 }
2176
2177 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2178 self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2179 .await
2180 }
2181
2182 async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2183 self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2184 .await
2185 }
2186
2187 async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2188 let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch)? {
2189 (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2190 _ => Err(IndexerError::PostgresRead(format!(
2191 "Failed to get checkpoint range for epoch {epoch}"
2192 ))),
2193 }?;
2194
2195 let min_prunable_cp = self.get_min_prunable_checkpoint()?;
2200 min_cp = std::cmp::max(min_cp, min_prunable_cp);
2201 for cp in min_cp..=max_cp {
2202 info!(
2214 "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2215 cp, epoch, min_prunable_cp
2216 );
2217 self.execute_in_blocking_worker(move |this| this.prune_checkpoints_table(cp))
2218 .await
2219 .unwrap_or_else(|e| {
2220 tracing::error!("Failed to prune checkpoint {}: {}", cp, e);
2221 });
2222
2223 let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp)?;
2224 self.execute_in_blocking_worker(move |this| {
2225 this.prune_tx_indices_table(min_tx, max_tx)
2226 })
2227 .await
2228 .unwrap_or_else(|e| {
2229 tracing::error!("Failed to prune transactions for cp {}: {}", cp, e);
2230 });
2231 info!(
2232 "Pruned transactions for checkpoint {} from tx {} to tx {}",
2233 cp, min_tx, max_tx
2234 );
2235 self.execute_in_blocking_worker(move |this| {
2236 this.prune_event_indices_table(min_tx, max_tx)
2237 })
2238 .await
2239 .unwrap_or_else(|e| {
2240 tracing::error!(
2241 "Failed to prune events of transactions for cp {}: {}",
2242 cp,
2243 e
2244 );
2245 });
2246 info!(
2247 "Pruned events of transactions for checkpoint {} from tx {} to tx {}",
2248 cp, min_tx, max_tx
2249 );
2250 self.metrics.last_pruned_transaction.set(max_tx as i64);
2251
2252 self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp))
2253 .await
2254 .unwrap_or_else(|e| {
2255 tracing::error!(
2256 "Failed to prune pruner_cp_watermark table for cp {}: {}",
2257 cp,
2258 e
2259 );
2260 });
2261 info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2262 self.metrics.last_pruned_checkpoint.set(cp as i64);
2263 }
2264
2265 Ok(())
2266 }
2267
2268 async fn get_network_total_transactions_by_end_of_epoch(
2269 &self,
2270 epoch: u64,
2271 ) -> Result<u64, IndexerError> {
2272 self.execute_in_blocking_worker(move |this| {
2273 this.get_network_total_transactions_by_end_of_epoch(epoch)
2274 })
2275 .await
2276 }
2277
2278 async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2279 self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2280 .await
2281 }
2282
2283 fn as_any(&self) -> &dyn StdAny {
2284 self
2285 }
2286
2287 fn persist_protocol_configs_and_feature_flags(
2290 &self,
2291 chain_id: Vec<u8>,
2292 ) -> Result<(), IndexerError> {
2293 let chain_id = ChainIdentifier::from(
2294 CheckpointDigest::try_from(chain_id).expect("Unable to convert chain id"),
2295 );
2296
2297 let mut all_configs = vec![];
2298 let mut all_flags = vec![];
2299
2300 let (start_version, end_version) = self.get_protocol_version_index_range()?;
2301 info!(
2302 "Persisting protocol configs with start_version: {}, end_version: {}",
2303 start_version, end_version
2304 );
2305
2306 for version in start_version..=end_version {
2309 let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2310 (version as u64).into(),
2311 chain_id.chain(),
2312 )
2313 .ok_or(IndexerError::Generic(format!(
2314 "Unable to fetch protocol version {} and chain {:?}",
2315 version,
2316 chain_id.chain()
2317 )))?;
2318 let configs_vec = protocol_configs
2319 .attr_map()
2320 .into_iter()
2321 .map(|(k, v)| StoredProtocolConfig {
2322 protocol_version: version,
2323 config_name: k,
2324 config_value: v.map(|v| v.to_string()),
2325 })
2326 .collect::<Vec<_>>();
2327 all_configs.extend(configs_vec);
2328
2329 let feature_flags = protocol_configs
2330 .feature_map()
2331 .into_iter()
2332 .map(|(k, v)| StoredFeatureFlag {
2333 protocol_version: version,
2334 flag_name: k,
2335 flag_value: v,
2336 })
2337 .collect::<Vec<_>>();
2338 all_flags.extend(feature_flags);
2339 }
2340
2341 transactional_blocking_with_retry!(
2342 &self.blocking_cp,
2343 |conn| {
2344 for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2345 insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2346 }
2347 insert_or_ignore_into!(feature_flags::table, all_flags.clone(), conn);
2348 Ok::<(), IndexerError>(())
2349 },
2350 PG_DB_COMMIT_SLEEP_DURATION
2351 )?;
2352 Ok(())
2353 }
2354}
2355
2356fn make_objects_history_to_commit(
2357 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2358) -> Vec<StoredHistoryObject> {
2359 let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2360 .clone()
2361 .into_iter()
2362 .flat_map(|changes| changes.deleted_objects)
2363 .map(|o| o.into())
2364 .collect();
2365 let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2366 .into_iter()
2367 .flat_map(|changes| changes.changed_objects)
2368 .map(|o| o.into())
2369 .collect();
2370 deleted_objects.into_iter().chain(mutated_objects).collect()
2371}
2372
2373fn retain_latest_indexed_objects(
2379 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2380) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2381 let (mutations, deletions) = tx_object_changes
2384 .into_iter()
2385 .flat_map(|change| {
2386 change
2387 .changed_objects
2388 .into_iter()
2389 .map(Either::Left)
2390 .chain(
2391 change
2392 .deleted_objects
2393 .into_iter()
2394 .map(Either::Right),
2395 )
2396 })
2397 .fold(
2398 (HashMap::<ObjectID, IndexedObject>::new(), HashMap::<ObjectID, IndexedDeletedObject>::new()),
2399 |(mut mutations, mut deletions), either_change| {
2400 match either_change {
2401 Either::Left(mutation) => {
2405 let id = mutation.object.id();
2406 let mutation_version = mutation.object.version();
2407 if let Some(existing) = deletions.remove(&id) {
2408 assert!(
2409 existing.object_version < mutation_version.value(),
2410 "Mutation version ({mutation_version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2411 existing.object_version
2412 );
2413 }
2414 if let Some(existing) = mutations.insert(id, mutation) {
2415 assert!(
2416 existing.object.version() < mutation_version,
2417 "Mutation version ({mutation_version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2418 existing.object.version()
2419 );
2420 }
2421 }
2422 Either::Right(deletion) => {
2423 let id = deletion.object_id;
2424 let deletion_version = deletion.object_version;
2425 if let Some(existing) = mutations.remove(&id) {
2426 assert!(
2427 existing.object.version().value() < deletion_version,
2428 "Deletion version ({deletion_version:?}) should be greater than existing mutation version ({:?}) for object {id:?}",
2429 existing.object.version(),
2430 );
2431 }
2432 if let Some(existing) = deletions.insert(id, deletion) {
2433 assert!(
2434 existing.object_version < deletion_version,
2435 "Deletion version ({deletion_version:?}) should be greater than existing deletion version ({:?}) for object {id:?}",
2436 existing.object_version
2437 );
2438 }
2439 }
2440 }
2441 (mutations, deletions)
2442 },
2443 );
2444 (
2445 mutations.into_values().collect(),
2446 deletions.into_values().collect(),
2447 )
2448}