1use core::result::Result::Ok;
6use std::{
7 any::Any as StdAny,
8 collections::{BTreeMap, HashMap},
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use diesel::{
14 ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl,
15 dsl::{max, min, sql},
16 sql_types::{Array, BigInt, Bytea, Nullable, SmallInt, Text},
17 upsert::excluded,
18};
19use downcast::Any;
20use iota_protocol_config::ProtocolConfig;
21use iota_sdk_types::ObjectId;
22use iota_types::digests::{ChainIdentifier, CheckpointDigest};
23use itertools::Itertools;
24use strum::IntoEnumIterator;
25use tap::TapFallible;
26use tracing::info;
27
28use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
29use crate::{
30 blocking_call_is_ok_or_panic,
31 db::ConnectionPool,
32 errors::{Context, IndexerError},
33 ingestion::{
34 common::{
35 persist::CommitterWatermark,
36 prepare::{
37 CheckpointObjectChanges, LiveObject, RemovedObject,
38 retain_latest_objects_from_checkpoint_batch,
39 },
40 },
41 primary::persist::{EpochToCommit, TransactionObjectChangesToCommit},
42 },
43 insert_or_ignore_into,
44 metrics::IndexerMetrics,
45 models::{
46 checkpoints::{StoredChainIdentifier, StoredCheckpoint, StoredCpTx},
47 display::StoredDisplay,
48 epoch::{StoredEpochInfo, StoredFeatureFlag, StoredProtocolConfig},
49 events::StoredEvent,
50 obj_indices::StoredObjectVersion,
51 objects::{
52 StoredBackwardHistoryObject, StoredCheckpointedObject, StoredDeletedObject,
53 StoredObject, StoredObjects,
54 },
55 packages::StoredPackage,
56 transactions::{OptimisticTransaction, StoredTransaction, TxGlobalOrder},
57 tx_indices::TxIndexSplit,
58 watermarks::StoredWatermark,
59 },
60 on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
61 persist_chunk_into_table_in_existing_connection,
62 pruning::pruner::PrunableTable,
63 read_only_blocking, run_query, run_query_with_retry,
64 schema::{
65 chain_identifier, checkpointed_objects, checkpoints, display, epochs, event_emit_module,
66 event_emit_package, event_senders, event_struct_instantiation, event_struct_module,
67 event_struct_name, event_struct_package, events, feature_flags, objects,
68 objects_backward_history, objects_version, optimistic_transactions, packages,
69 protocol_configs, pruner_cp_watermark, transactions, tx_calls_fun, tx_calls_mod,
70 tx_calls_pkg, tx_changed_objects, tx_digests, tx_global_order, tx_input_objects, tx_kinds,
71 tx_recipients, tx_senders, tx_wrapped_or_deleted_objects, watermarks,
72 },
73 store::{IndexerStore, diesel_macro::mark_in_blocking_pool},
74 transactional_blocking_with_retry,
75 types::{
76 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
77 IndexedPackage, IndexedTransaction, TxIndex,
78 },
79};
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub struct TxGlobalOrderCursor {
85 pub global_sequence_number: i64,
86 pub optimistic_sequence_number: i64,
87}
88
89#[macro_export]
90macro_rules! chunk {
91 ($data: expr, $size: expr) => {{
92 $data
93 .into_iter()
94 .chunks($size)
95 .into_iter()
96 .map(|c| c.collect())
97 .collect::<Vec<Vec<_>>>()
98 }};
99}
100
101macro_rules! prune_tx_or_event_indice_table {
102 ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => {
103 diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx)))
104 .execute($conn)
105 .map_err(IndexerError::from)
106 .context($context_msg)?;
107 };
108}
109
110const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
115const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
117const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
121const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
122
123#[derive(Clone)]
124pub struct PgIndexerStoreConfig {
125 pub parallel_chunk_size: usize,
126 pub parallel_objects_chunk_size: usize,
127}
128
129pub struct PgIndexerStore {
130 blocking_cp: ConnectionPool,
131 metrics: IndexerMetrics,
132 partition_manager: PgPartitionManager,
133 config: PgIndexerStoreConfig,
134}
135
136impl Clone for PgIndexerStore {
137 fn clone(&self) -> PgIndexerStore {
138 Self {
139 blocking_cp: self.blocking_cp.clone(),
140 metrics: self.metrics.clone(),
141 partition_manager: self.partition_manager.clone(),
142 config: self.config.clone(),
143 }
144 }
145}
146
147impl PgIndexerStore {
148 pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
149 let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
150 .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
151 .parse::<usize>()
152 .unwrap();
153 let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
154 .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
155 .parse::<usize>()
156 .unwrap();
157 let partition_manager = PgPartitionManager::new(blocking_cp.clone())
158 .expect("failed to initialize partition manager");
159 let config = PgIndexerStoreConfig {
160 parallel_chunk_size,
161 parallel_objects_chunk_size,
162 };
163
164 Self {
165 blocking_cp,
166 metrics,
167 partition_manager,
168 config,
169 }
170 }
171
172 pub fn get_metrics(&self) -> IndexerMetrics {
173 self.metrics.clone()
174 }
175
176 pub fn blocking_cp(&self) -> ConnectionPool {
177 self.blocking_cp.clone()
178 }
179
180 pub fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
182 let start = read_only_blocking!(&self.blocking_cp, |conn| {
185 protocol_configs::dsl::protocol_configs
186 .select(max(protocol_configs::protocol_version))
187 .first::<Option<i64>>(conn)
188 })
189 .context("Failed reading latest protocol version from PostgresDB")?
190 .map_or(1, |v| v + 1);
191
192 let end = read_only_blocking!(&self.blocking_cp, |conn| {
194 epochs::dsl::epochs
195 .select(max(epochs::protocol_version))
196 .first::<Option<i64>>(conn)
197 })
198 .context("Failed reading latest epoch protocol version from PostgresDB")?
199 .unwrap_or(1);
200 Ok((start, end))
201 }
202
203 pub fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
204 read_only_blocking!(&self.blocking_cp, |conn| {
205 chain_identifier::dsl::chain_identifier
206 .select(chain_identifier::checkpoint_digest)
207 .first::<Vec<u8>>(conn)
208 .optional()
209 })
210 .context("Failed reading chain id from PostgresDB")
211 }
212
213 fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
214 read_only_blocking!(&self.blocking_cp, |conn| {
215 checkpoints::dsl::checkpoints
216 .select(max(checkpoints::sequence_number))
217 .first::<Option<i64>>(conn)
218 .map(|v| v.map(|v| v as u64))
219 })
220 .context("Failed reading latest checkpoint sequence number from PostgresDB")
221 }
222
223 fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
224 read_only_blocking!(&self.blocking_cp, |conn| {
225 checkpoints::dsl::checkpoints
226 .select((
227 min(checkpoints::sequence_number),
228 max(checkpoints::sequence_number),
229 ))
230 .first::<(Option<i64>, Option<i64>)>(conn)
231 .map(|(min, max)| {
232 (
233 min.unwrap_or_default() as u64,
234 max.unwrap_or_default() as u64,
235 )
236 })
237 })
238 .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
239 }
240
241 fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
242 read_only_blocking!(&self.blocking_cp, |conn| {
243 epochs::dsl::epochs
244 .select((min(epochs::epoch), max(epochs::epoch)))
245 .first::<(Option<i64>, Option<i64>)>(conn)
246 .map(|(min, max)| {
247 (
248 min.unwrap_or_default() as u64,
249 max.unwrap_or_default() as u64,
250 )
251 })
252 })
253 .context("Failed reading min and max epoch numbers from PostgresDB")
254 }
255
256 fn persist_display_updates(
257 &self,
258 display_updates: BTreeMap<String, StoredDisplay>,
259 ) -> Result<(), IndexerError> {
260 transactional_blocking_with_retry!(
261 &self.blocking_cp,
262 {
263 let value = display_updates.values().collect::<Vec<_>>();
264 |conn| self.persist_displays_in_existing_transaction(conn, value)
265 },
266 PG_DB_COMMIT_SLEEP_DURATION
267 )?;
268
269 Ok(())
270 }
271
272 fn persist_changed_objects(&self, objects: Vec<LiveObject>) -> Result<(), IndexerError> {
273 let guard = self
274 .metrics
275 .checkpoint_db_commit_latency_objects_chunks
276 .start_timer();
277 let len = objects.len();
278 let raw_query = r#"
279 INSERT INTO objects (
280 object_id,
281 object_version,
282 object_digest,
283 owner_type,
284 owner_id,
285 object_type,
286 object_type_package,
287 object_type_module,
288 object_type_name,
289 serialized_object,
290 coin_type,
291 coin_balance,
292 df_kind,
293 finalized_in_cp
294 )
295 SELECT
296 u.object_id,
297 u.object_version,
298 u.object_digest,
299 u.owner_type,
300 u.owner_id,
301 u.object_type,
302 u.object_type_package,
303 u.object_type_module,
304 u.object_type_name,
305 u.serialized_object,
306 u.coin_type,
307 u.coin_balance,
308 u.df_kind,
309 u.finalized_in_cp
310 FROM UNNEST(
311 $1::BYTEA[],
312 $2::BIGINT[],
313 $3::BYTEA[],
314 $4::SMALLINT[],
315 $5::BYTEA[],
316 $6::TEXT[],
317 $7::BYTEA[],
318 $8::TEXT[],
319 $9::TEXT[],
320 $10::BYTEA[],
321 $11::TEXT[],
322 $12::BIGINT[],
323 $13::SMALLINT[],
324 $14::BYTEA[],
325 $15::BIGINT[]
326 ) AS u(object_id, object_version, object_digest, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, tx_digest, finalized_in_cp)
327 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
328 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = -1
329 ON CONFLICT (object_id) DO UPDATE
330 SET
331 object_version = EXCLUDED.object_version,
332 object_digest = EXCLUDED.object_digest,
333 owner_type = EXCLUDED.owner_type,
334 owner_id = EXCLUDED.owner_id,
335 object_type = EXCLUDED.object_type,
336 object_type_package = EXCLUDED.object_type_package,
337 object_type_module = EXCLUDED.object_type_module,
338 object_type_name = EXCLUDED.object_type_name,
339 serialized_object = EXCLUDED.serialized_object,
340 coin_type = EXCLUDED.coin_type,
341 coin_balance = EXCLUDED.coin_balance,
342 df_kind = EXCLUDED.df_kind,
343 finalized_in_cp = EXCLUDED.finalized_in_cp
344 WHERE EXCLUDED.object_version > objects.object_version
345 "#;
346 let (objects, tx_digests): (StoredObjects, Vec<_>) = objects
347 .into_iter()
348 .map(LiveObject::split)
349 .map(|(indexed_object, tx_digest)| {
350 (
351 StoredObject::from(indexed_object),
352 tx_digest.into_inner().to_vec(),
353 )
354 })
355 .unzip();
356 let query = diesel::sql_query(raw_query)
357 .bind::<Array<Bytea>, _>(objects.object_ids)
358 .bind::<Array<BigInt>, _>(objects.object_versions)
359 .bind::<Array<Bytea>, _>(objects.object_digests)
360 .bind::<Array<SmallInt>, _>(objects.owner_types)
361 .bind::<Array<Nullable<Bytea>>, _>(objects.owner_ids)
362 .bind::<Array<Nullable<Text>>, _>(objects.object_types)
363 .bind::<Array<Nullable<Bytea>>, _>(objects.object_type_packages)
364 .bind::<Array<Nullable<Text>>, _>(objects.object_type_modules)
365 .bind::<Array<Nullable<Text>>, _>(objects.object_type_names)
366 .bind::<Array<Bytea>, _>(objects.serialized_objects)
367 .bind::<Array<Nullable<Text>>, _>(objects.coin_types)
368 .bind::<Array<Nullable<BigInt>>, _>(objects.coin_balances)
369 .bind::<Array<Nullable<SmallInt>>, _>(objects.df_kinds)
370 .bind::<Array<Bytea>, _>(tx_digests)
371 .bind::<Array<Nullable<BigInt>>, _>(objects.finalized_in_cps);
372 transactional_blocking_with_retry!(
373 &self.blocking_cp,
374 |conn| {
375 query.clone().execute(conn)?;
376 Ok::<(), IndexerError>(())
377 },
378 PG_DB_COMMIT_SLEEP_DURATION
379 )
380 .tap_ok(|_| {
381 let elapsed = guard.stop_and_record();
382 info!(elapsed, "Persisted {len} chunked objects");
383 })
384 .tap_err(|e| {
385 tracing::error!("failed to persist object mutations with error: {e}");
386 })
387 }
388
389 fn persist_removed_objects(&self, objects: Vec<RemovedObject>) -> Result<(), IndexerError> {
390 let guard = self
391 .metrics
392 .checkpoint_db_commit_latency_objects_chunks
393 .start_timer();
394 let len = objects.len();
395 let raw_query = r#"
396 DELETE FROM objects
397 USING (
398 SELECT u.object_id, u.object_version
399 FROM UNNEST(
400 $1::BYTEA[],
401 $2::BIGINT[],
402 $3::BYTEA[]
403 ) AS u(object_id, object_version, tx_digest)
404 LEFT JOIN tx_global_order o ON o.tx_digest = u.tx_digest
405 WHERE o.optimistic_sequence_number IS NULL OR o.optimistic_sequence_number = -1
406 ) AS to_delete
407 WHERE objects.object_id = to_delete.object_id
408 AND objects.object_version < to_delete.object_version
409 "#;
410 let (object_ids, versions, tx_digests): (Vec<_>, Vec<_>, Vec<_>) = objects
411 .into_iter()
412 .map(|removed_object| {
413 (
414 removed_object.object_id().as_bytes().to_vec(),
415 removed_object.version() as i64,
416 removed_object.transaction_digest.into_inner().to_vec(),
417 )
418 })
419 .multiunzip();
420 let query = diesel::sql_query(raw_query)
421 .bind::<Array<Bytea>, _>(object_ids)
422 .bind::<Array<BigInt>, _>(versions)
423 .bind::<Array<Bytea>, _>(tx_digests);
424 transactional_blocking_with_retry!(
425 &self.blocking_cp,
426 |conn| {
427 query.clone().execute(conn)?;
428 Ok::<(), IndexerError>(())
429 },
430 PG_DB_COMMIT_SLEEP_DURATION
431 )
432 .tap_ok(|_| {
433 let elapsed = guard.stop_and_record();
434 info!(elapsed, "Deleted {len} chunked objects");
435 })
436 .tap_err(|e| {
437 tracing::error!("failed to persist object deletions with error: {e}");
438 })
439 }
440
441 fn persist_checkpointed_objects_chunk(
442 &self,
443 objects: Vec<StoredCheckpointedObject>,
444 ) -> Result<(), IndexerError> {
445 use diesel::upsert::excluded;
446
447 let len = objects.len();
448 transactional_blocking_with_retry!(
449 &self.blocking_cp,
450 |conn| {
451 for chunk in objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
452 on_conflict_do_update!(
453 checkpointed_objects::table,
454 chunk,
455 checkpointed_objects::object_id,
456 (
457 checkpointed_objects::object_version
458 .eq(excluded(checkpointed_objects::object_version)),
459 checkpointed_objects::object_status
460 .eq(excluded(checkpointed_objects::object_status)),
461 checkpointed_objects::object_digest
462 .eq(excluded(checkpointed_objects::object_digest)),
463 checkpointed_objects::checkpoint_sequence_number
464 .eq(excluded(checkpointed_objects::checkpoint_sequence_number)),
465 checkpointed_objects::owner_type
466 .eq(excluded(checkpointed_objects::owner_type)),
467 checkpointed_objects::owner_id
468 .eq(excluded(checkpointed_objects::owner_id)),
469 checkpointed_objects::object_type
470 .eq(excluded(checkpointed_objects::object_type)),
471 checkpointed_objects::object_type_package
472 .eq(excluded(checkpointed_objects::object_type_package)),
473 checkpointed_objects::object_type_module
474 .eq(excluded(checkpointed_objects::object_type_module)),
475 checkpointed_objects::object_type_name
476 .eq(excluded(checkpointed_objects::object_type_name)),
477 checkpointed_objects::serialized_object
478 .eq(excluded(checkpointed_objects::serialized_object)),
479 checkpointed_objects::coin_type
480 .eq(excluded(checkpointed_objects::coin_type)),
481 checkpointed_objects::coin_balance
482 .eq(excluded(checkpointed_objects::coin_balance)),
483 checkpointed_objects::df_kind
484 .eq(excluded(checkpointed_objects::df_kind)),
485 ),
486 conn
487 );
488 }
489 Ok::<(), IndexerError>(())
490 },
491 PG_DB_COMMIT_SLEEP_DURATION
492 )
493 .tap_ok(|_| {
494 info!("Persisted {len} checkpointed objects");
495 })
496 .tap_err(|e| {
497 tracing::error!("failed to persist checkpointed objects: {e}");
498 })
499 }
500
501 fn persist_object_mutation_chunk_in_existing_transaction(
502 &self,
503 conn: &mut PgConnection,
504 mutated_object_mutation_chunk: Vec<StoredObject>,
505 ) -> Result<(), IndexerError> {
506 on_conflict_do_update_with_condition!(
507 objects::table,
508 mutated_object_mutation_chunk,
509 objects::object_id,
510 (
511 objects::object_id.eq(excluded(objects::object_id)),
512 objects::object_version.eq(excluded(objects::object_version)),
513 objects::object_digest.eq(excluded(objects::object_digest)),
514 objects::owner_type.eq(excluded(objects::owner_type)),
515 objects::owner_id.eq(excluded(objects::owner_id)),
516 objects::object_type.eq(excluded(objects::object_type)),
517 objects::serialized_object.eq(excluded(objects::serialized_object)),
518 objects::coin_type.eq(excluded(objects::coin_type)),
519 objects::coin_balance.eq(excluded(objects::coin_balance)),
520 objects::df_kind.eq(excluded(objects::df_kind)),
521 objects::finalized_in_cp.eq(excluded(objects::finalized_in_cp)),
522 ),
523 excluded(objects::object_version).gt(objects::object_version),
524 conn
525 );
526 Ok::<(), IndexerError>(())
527 }
528
529 fn persist_object_deletion_chunk_in_existing_transaction(
530 &self,
531 conn: &mut PgConnection,
532 deleted_objects_chunk: Vec<StoredDeletedObject>,
533 ) -> Result<(), IndexerError> {
534 let (object_ids, object_versions): (Vec<_>, Vec<_>) = deleted_objects_chunk
535 .iter()
536 .map(|o| (o.object_id.clone(), o.object_version))
537 .unzip();
538 let raw_query = r#"
539 DELETE FROM objects
540 USING UNNEST($1::BYTEA[], $2::BIGINT[]) AS to_delete(object_id, object_version)
541 WHERE objects.object_id = to_delete.object_id
542 AND objects.object_version < to_delete.object_version
543 "#;
544 diesel::sql_query(raw_query)
545 .bind::<Array<Bytea>, _>(object_ids)
546 .bind::<Array<BigInt>, _>(object_versions)
547 .execute(conn)
548 .map_err(IndexerError::from)
549 .context("Failed to write object deletion to PostgresDB")?;
550 Ok::<(), IndexerError>(())
551 }
552
553 fn persist_objects_backward_history_chunk(
554 &self,
555 stored: Vec<StoredBackwardHistoryObject>,
556 ) -> Result<(), IndexerError> {
557 persist_chunk_into_table!(objects_backward_history::table, stored, &self.blocking_cp)
558 }
559
560 fn persist_object_version_chunk(
561 &self,
562 object_versions: Vec<StoredObjectVersion>,
563 ) -> Result<(), IndexerError> {
564 let guard = self
565 .metrics
566 .checkpoint_db_commit_latency_objects_version_chunks
567 .start_timer();
568
569 transactional_blocking_with_retry!(
570 &self.blocking_cp,
571 |conn| {
572 for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
573 {
574 insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
575 }
576 Ok::<(), IndexerError>(())
577 },
578 PG_DB_COMMIT_SLEEP_DURATION
579 )
580 .tap_ok(|_| {
581 let elapsed = guard.stop_and_record();
582 info!(
583 elapsed,
584 "Persisted {} chunked object versions",
585 object_versions.len(),
586 );
587 })
588 .tap_err(|e| {
589 tracing::error!("failed to persist object versions with error: {e}");
590 })
591 }
592
593 fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
594 let Some(first_checkpoint) = checkpoints.first() else {
595 return Ok(());
596 };
597
598 if first_checkpoint.sequence_number == 0 {
601 let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
602 self.persist_protocol_configs_and_feature_flags(checkpoint_digest)?;
603 transactional_blocking_with_retry!(
604 &self.blocking_cp,
605 |conn| {
606 let checkpoint_digest =
607 first_checkpoint.checkpoint_digest.into_inner().to_vec();
608 insert_or_ignore_into!(
609 chain_identifier::table,
610 StoredChainIdentifier { checkpoint_digest },
611 conn
612 );
613 Ok::<(), IndexerError>(())
614 },
615 PG_DB_COMMIT_SLEEP_DURATION
616 )?;
617 }
618 let guard = self
619 .metrics
620 .checkpoint_db_commit_latency_checkpoints
621 .start_timer();
622
623 let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
624 transactional_blocking_with_retry!(
625 &self.blocking_cp,
626 |conn| {
627 for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
628 insert_or_ignore_into!(pruner_cp_watermark::table, stored_cp_tx_chunk, conn);
629 }
630 Ok::<(), IndexerError>(())
631 },
632 PG_DB_COMMIT_SLEEP_DURATION
633 )
634 .tap_ok(|_| {
635 info!(
636 "Persisted {} pruner_cp_watermark rows.",
637 stored_cp_txs.len(),
638 );
639 })
640 .tap_err(|e| {
641 tracing::error!("failed to persist pruner_cp_watermark with error: {e}");
642 })?;
643
644 let stored_checkpoints = checkpoints
645 .iter()
646 .map(StoredCheckpoint::from)
647 .collect::<Vec<_>>();
648 transactional_blocking_with_retry!(
649 &self.blocking_cp,
650 |conn| {
651 for stored_checkpoint_chunk in
652 stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
653 {
654 insert_or_ignore_into!(checkpoints::table, stored_checkpoint_chunk, conn);
655 let time_now_ms = chrono::Utc::now().timestamp_millis();
656 for stored_checkpoint in stored_checkpoint_chunk {
657 self.metrics
658 .db_commit_lag_ms
659 .set(time_now_ms - stored_checkpoint.timestamp_ms);
660 self.metrics.max_committed_checkpoint_sequence_number.set(
661 stored_checkpoint.sequence_number,
662 );
663 self.metrics.committed_checkpoint_timestamp_ms.set(
664 stored_checkpoint.timestamp_ms,
665 );
666 }
667 for stored_checkpoint in stored_checkpoint_chunk {
668 info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
669 }
670 }
671 Ok::<(), IndexerError>(())
672 },
673 PG_DB_COMMIT_SLEEP_DURATION
674 )
675 .tap_ok(|_| {
676 let elapsed = guard.stop_and_record();
677 info!(
678 elapsed,
679 "Persisted {} checkpoints",
680 stored_checkpoints.len()
681 );
682 })
683 .tap_err(|e| {
684 tracing::error!("failed to persist checkpoints with error: {e}");
685 })
686 }
687
688 fn persist_transactions_chunk(
689 &self,
690 transactions: Vec<IndexedTransaction>,
691 ) -> Result<(), IndexerError> {
692 let guard = self
693 .metrics
694 .checkpoint_db_commit_latency_transactions_chunks
695 .start_timer();
696 let transformation_guard = self
697 .metrics
698 .checkpoint_db_commit_latency_transactions_chunks_transformation
699 .start_timer();
700 let transactions = transactions
701 .iter()
702 .map(StoredTransaction::from)
703 .collect::<Vec<_>>();
704 drop(transformation_guard);
705
706 transactional_blocking_with_retry!(
707 &self.blocking_cp,
708 |conn| {
709 for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
710 insert_or_ignore_into!(transactions::table, transaction_chunk, conn);
711 }
712 Ok::<(), IndexerError>(())
713 },
714 PG_DB_COMMIT_SLEEP_DURATION
715 )
716 .tap_ok(|_| {
717 let elapsed = guard.stop_and_record();
718 info!(
719 elapsed,
720 "Persisted {} chunked transactions",
721 transactions.len()
722 );
723 })
724 .tap_err(|e| {
725 tracing::error!("failed to persist transactions with error: {e}");
726 })
727 }
728
729 fn persist_tx_global_order_chunk(
730 &self,
731 tx_order: Vec<TxGlobalOrder>,
732 ) -> Result<(), IndexerError> {
733 let guard = self
734 .metrics
735 .checkpoint_db_commit_latency_tx_insertion_order_chunks
736 .start_timer();
737
738 transactional_blocking_with_retry!(
739 &self.blocking_cp,
740 |conn| {
741 for tx_order_chunk in tx_order.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
742 on_conflict_do_update_with_condition!(
746 tx_global_order::table,
747 tx_order_chunk,
748 tx_global_order::tx_digest,
749 tx_global_order::chk_tx_sequence_number
750 .eq(excluded(tx_global_order::chk_tx_sequence_number)),
751 tx_global_order::chk_tx_sequence_number.is_null(),
752 conn
753 );
754 }
755 Ok::<(), IndexerError>(())
756 },
757 PG_DB_COMMIT_SLEEP_DURATION
758 )
759 .tap_ok(|_| {
760 let elapsed = guard.stop_and_record();
761 info!(
762 elapsed,
763 "Persisted {} chunked txs insertion order",
764 tx_order.len()
765 );
766 })
767 .tap_err(|e| {
768 tracing::error!("failed to persist txs insertion order with error: {e}");
769 })
770 }
771
772 fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
773 let guard = self
774 .metrics
775 .checkpoint_db_commit_latency_events_chunks
776 .start_timer();
777 let len = events.len();
778 let events = events
779 .into_iter()
780 .map(StoredEvent::from)
781 .collect::<Vec<_>>();
782
783 transactional_blocking_with_retry!(
784 &self.blocking_cp,
785 |conn| {
786 for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
787 insert_or_ignore_into!(events::table, event_chunk, conn);
788 }
789 Ok::<(), IndexerError>(())
790 },
791 PG_DB_COMMIT_SLEEP_DURATION
792 )
793 .tap_ok(|_| {
794 let elapsed = guard.stop_and_record();
795 info!(elapsed, "Persisted {} chunked events", len);
796 })
797 .tap_err(|e| {
798 tracing::error!("failed to persist events with error: {e}");
799 })
800 }
801
802 fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
803 if packages.is_empty() {
804 return Ok(());
805 }
806 let guard = self
807 .metrics
808 .checkpoint_db_commit_latency_packages
809 .start_timer();
810 let packages = packages
811 .into_iter()
812 .map(StoredPackage::from)
813 .collect::<Vec<_>>();
814 transactional_blocking_with_retry!(
815 &self.blocking_cp,
816 |conn| {
817 for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
818 on_conflict_do_update!(
819 packages::table,
820 packages_chunk,
821 packages::package_id,
822 (
823 packages::package_id.eq(excluded(packages::package_id)),
824 packages::move_package.eq(excluded(packages::move_package)),
825 ),
826 conn
827 );
828 }
829 Ok::<(), IndexerError>(())
830 },
831 PG_DB_COMMIT_SLEEP_DURATION
832 )
833 .tap_ok(|_| {
834 let elapsed = guard.stop_and_record();
835 info!(elapsed, "Persisted {} packages", packages.len());
836 })
837 .tap_err(|e| {
838 tracing::error!("failed to persist packages with error: {e}");
839 })
840 }
841
842 async fn persist_event_indices_chunk(
843 &self,
844 indices: Vec<EventIndex>,
845 ) -> Result<(), IndexerError> {
846 let guard = self
847 .metrics
848 .checkpoint_db_commit_latency_event_indices_chunks
849 .start_timer();
850 let len = indices.len();
851 let (
852 event_emit_packages,
853 event_emit_modules,
854 event_senders,
855 event_struct_packages,
856 event_struct_modules,
857 event_struct_names,
858 event_struct_instantiations,
859 ) = indices.into_iter().map(|i| i.split()).fold(
860 (
861 Vec::new(),
862 Vec::new(),
863 Vec::new(),
864 Vec::new(),
865 Vec::new(),
866 Vec::new(),
867 Vec::new(),
868 ),
869 |(
870 mut event_emit_packages,
871 mut event_emit_modules,
872 mut event_senders,
873 mut event_struct_packages,
874 mut event_struct_modules,
875 mut event_struct_names,
876 mut event_struct_instantiations,
877 ),
878 index| {
879 event_emit_packages.push(index.0);
880 event_emit_modules.push(index.1);
881 event_senders.push(index.2);
882 event_struct_packages.push(index.3);
883 event_struct_modules.push(index.4);
884 event_struct_names.push(index.5);
885 event_struct_instantiations.push(index.6);
886 (
887 event_emit_packages,
888 event_emit_modules,
889 event_senders,
890 event_struct_packages,
891 event_struct_modules,
892 event_struct_names,
893 event_struct_instantiations,
894 )
895 },
896 );
897
898 let mut futures = vec![];
900 futures.push(self.spawn_blocking_task(move |this| {
901 persist_chunk_into_table!(
902 event_emit_package::table,
903 event_emit_packages,
904 &this.blocking_cp
905 )
906 }));
907
908 futures.push(self.spawn_blocking_task(move |this| {
909 persist_chunk_into_table!(
910 event_emit_module::table,
911 event_emit_modules,
912 &this.blocking_cp
913 )
914 }));
915
916 futures.push(self.spawn_blocking_task(move |this| {
917 persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp)
918 }));
919
920 futures.push(self.spawn_blocking_task(move |this| {
921 persist_chunk_into_table!(
922 event_struct_package::table,
923 event_struct_packages,
924 &this.blocking_cp
925 )
926 }));
927
928 futures.push(self.spawn_blocking_task(move |this| {
929 persist_chunk_into_table!(
930 event_struct_module::table,
931 event_struct_modules,
932 &this.blocking_cp
933 )
934 }));
935
936 futures.push(self.spawn_blocking_task(move |this| {
937 persist_chunk_into_table!(
938 event_struct_name::table,
939 event_struct_names,
940 &this.blocking_cp
941 )
942 }));
943
944 futures.push(self.spawn_blocking_task(move |this| {
945 persist_chunk_into_table!(
946 event_struct_instantiation::table,
947 event_struct_instantiations,
948 &this.blocking_cp
949 )
950 }));
951
952 futures::future::try_join_all(futures)
953 .await
954 .map_err(|e| {
955 tracing::error!("failed to join event indices futures in a chunk: {e}");
956 IndexerError::from(e)
957 })?
958 .into_iter()
959 .collect::<Result<Vec<_>, _>>()
960 .map_err(|e| {
961 IndexerError::PostgresWrite(format!(
962 "Failed to persist all event indices in a chunk: {e:?}"
963 ))
964 })?;
965 let elapsed = guard.stop_and_record();
966 info!(elapsed, "Persisted {} chunked event indices", len);
967 Ok(())
968 }
969
970 async fn persist_tx_indices_chunk_v2(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
971 let guard = self
972 .metrics
973 .checkpoint_db_commit_latency_tx_indices_chunks
974 .start_timer();
975 let len = indices.len();
976
977 let splits: Vec<TxIndexSplit> = indices.into_iter().map(Into::into).collect();
978
979 let senders: Vec<_> = splits.iter().flat_map(|ix| ix.tx_senders.clone()).collect();
980 let recipients: Vec<_> = splits
981 .iter()
982 .flat_map(|ix| ix.tx_recipients.clone())
983 .collect();
984 let input_objects: Vec<_> = splits
985 .iter()
986 .flat_map(|ix| ix.tx_input_objects.clone())
987 .collect();
988 let changed_objects: Vec<_> = splits
989 .iter()
990 .flat_map(|ix| ix.tx_changed_objects.clone())
991 .collect();
992 let wrapped_or_deleted_objects: Vec<_> = splits
993 .iter()
994 .flat_map(|ix| ix.tx_wrapped_or_deleted_objects.clone())
995 .collect();
996 let pkgs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_pkgs.clone()).collect();
997 let mods: Vec<_> = splits.iter().flat_map(|ix| ix.tx_mods.clone()).collect();
998 let funs: Vec<_> = splits.iter().flat_map(|ix| ix.tx_funs.clone()).collect();
999 let digests: Vec<_> = splits.iter().flat_map(|ix| ix.tx_digests.clone()).collect();
1000 let kinds: Vec<_> = splits.iter().flat_map(|ix| ix.tx_kinds.clone()).collect();
1001
1002 let futures = [
1003 self.spawn_blocking_task(move |this| {
1004 persist_chunk_into_table!(tx_senders::table, senders, &this.blocking_cp)
1005 }),
1006 self.spawn_blocking_task(move |this| {
1007 persist_chunk_into_table!(tx_recipients::table, recipients, &this.blocking_cp)
1008 }),
1009 self.spawn_blocking_task(move |this| {
1010 persist_chunk_into_table!(tx_input_objects::table, input_objects, &this.blocking_cp)
1011 }),
1012 self.spawn_blocking_task(move |this| {
1013 persist_chunk_into_table!(
1014 tx_changed_objects::table,
1015 changed_objects,
1016 &this.blocking_cp
1017 )
1018 }),
1019 self.spawn_blocking_task(move |this| {
1020 persist_chunk_into_table!(
1021 tx_wrapped_or_deleted_objects::table,
1022 wrapped_or_deleted_objects,
1023 &this.blocking_cp
1024 )
1025 }),
1026 self.spawn_blocking_task(move |this| {
1027 persist_chunk_into_table!(tx_calls_pkg::table, pkgs, &this.blocking_cp)
1028 }),
1029 self.spawn_blocking_task(move |this| {
1030 persist_chunk_into_table!(tx_calls_mod::table, mods, &this.blocking_cp)
1031 }),
1032 self.spawn_blocking_task(move |this| {
1033 persist_chunk_into_table!(tx_calls_fun::table, funs, &this.blocking_cp)
1034 }),
1035 self.spawn_blocking_task(move |this| {
1036 persist_chunk_into_table!(tx_digests::table, digests, &this.blocking_cp)
1037 }),
1038 self.spawn_blocking_task(move |this| {
1039 persist_chunk_into_table!(tx_kinds::table, kinds, &this.blocking_cp)
1040 }),
1041 ];
1042
1043 futures::future::try_join_all(futures)
1044 .await
1045 .map_err(|e| {
1046 tracing::error!("failed to join tx indices futures in a chunk: {e}");
1047 IndexerError::from(e)
1048 })?
1049 .into_iter()
1050 .collect::<Result<Vec<_>, _>>()
1051 .map_err(|e| {
1052 IndexerError::PostgresWrite(format!(
1053 "Failed to persist all tx indices in a chunk: {e:?}"
1054 ))
1055 })?;
1056 let elapsed = guard.stop_and_record();
1057 info!(elapsed, "Persisted {} chunked tx_indices", len);
1058 Ok(())
1059 }
1060
1061 fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1062 let guard = self
1063 .metrics
1064 .checkpoint_db_commit_latency_epoch
1065 .start_timer();
1066 let epoch_id = epoch.new_epoch.epoch;
1067
1068 transactional_blocking_with_retry!(
1069 &self.blocking_cp,
1070 |conn| {
1071 if let Some(last_epoch) = &epoch.last_epoch {
1072 info!(last_epoch.epoch, "Persisting epoch end data.");
1073 diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch.epoch)))
1074 .set(last_epoch)
1075 .execute(conn)?;
1076 }
1077
1078 info!(epoch.new_epoch.epoch, "Persisting epoch beginning info");
1079 insert_or_ignore_into!(epochs::table, &epoch.new_epoch, conn);
1080 Ok::<(), IndexerError>(())
1081 },
1082 PG_DB_COMMIT_SLEEP_DURATION
1083 )
1084 .tap_ok(|_| {
1085 let elapsed = guard.stop_and_record();
1086 info!(elapsed, epoch_id, "Persisted epoch beginning info");
1087 })
1088 .tap_err(|e| {
1089 tracing::error!("failed to persist epoch with error: {e}");
1090 })
1091 }
1092
1093 fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1094 let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1095 if let Some(last_epoch_id) = last_epoch_id {
1097 let last_db_epoch: Option<StoredEpochInfo> =
1098 read_only_blocking!(&self.blocking_cp, |conn| {
1099 epochs::table
1100 .filter(epochs::epoch.eq(last_epoch_id))
1101 .first::<StoredEpochInfo>(conn)
1102 .optional()
1103 })
1104 .context("Failed to read last epoch from PostgresDB")?;
1105 if let Some(last_epoch) = last_db_epoch {
1106 let epoch_partition_data =
1107 EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1108 let table_partitions = self.partition_manager.get_table_partitions()?;
1109 for (table, (_, last_partition)) in table_partitions {
1110 if !self
1112 .partition_manager
1113 .get_strategy(&table)
1114 .is_epoch_partitioned()
1115 {
1116 continue;
1117 }
1118 let guard = self.metrics.advance_epoch_latency.start_timer();
1119 self.partition_manager.advance_epoch(
1120 table.clone(),
1121 last_partition,
1122 &epoch_partition_data,
1123 )?;
1124 let elapsed = guard.stop_and_record();
1125 info!(
1126 elapsed,
1127 "Advanced epoch partition {} for table {}",
1128 last_partition,
1129 table.clone()
1130 );
1131 }
1132 } else {
1133 tracing::error!("last epoch: {last_epoch_id} from PostgresDB is None.");
1134 }
1135 }
1136
1137 Ok(())
1138 }
1139
1140 fn prune_checkpoints_table_by_range(
1141 &self,
1142 min_cp: u64,
1143 max_cp: u64,
1144 ) -> Result<(), IndexerError> {
1145 transactional_blocking_with_retry!(
1146 &self.blocking_cp,
1147 |conn| {
1148 diesel::delete(
1149 checkpoints::table
1150 .filter(checkpoints::sequence_number.between(min_cp as i64, max_cp as i64)),
1151 )
1152 .execute(conn)
1153 .map_err(IndexerError::from)
1154 .context("Failed to prune checkpoints table by range")?;
1155
1156 Ok::<(), IndexerError>(())
1157 },
1158 PG_DB_COMMIT_SLEEP_DURATION
1159 )
1160 }
1161
1162 fn prune_tx_global_order(
1165 &self,
1166 conn: &mut PgConnection,
1167 min_tx: i64,
1168 max_tx: i64,
1169 ) -> Result<(), IndexerError> {
1170 diesel::delete(
1171 tx_global_order::table
1172 .filter(tx_global_order::chk_tx_sequence_number.between(min_tx, max_tx)),
1173 )
1174 .execute(conn)
1175 .map_err(IndexerError::from)
1176 .context("Failed to prune tx_global_order table")
1177 .map(|_| ())
1178 }
1179
1180 fn prune_single_tx_or_event_table(
1182 &self,
1183 table: &crate::pruning::pruner::PrunableTable,
1184 min_tx: u64,
1185 max_tx: u64,
1186 ) -> Result<(), IndexerError> {
1187 use crate::pruning::pruner::PrunableTable;
1188
1189 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1190
1191 transactional_blocking_with_retry!(
1192 &self.blocking_cp,
1193 |conn| {
1194 match table {
1195 PrunableTable::EventEmitModule => {
1197 prune_tx_or_event_indice_table!(
1198 event_emit_module,
1199 conn,
1200 min_tx,
1201 max_tx,
1202 "Failed to prune event_emit_module table"
1203 );
1204 }
1205 PrunableTable::EventEmitPackage => {
1206 prune_tx_or_event_indice_table!(
1207 event_emit_package,
1208 conn,
1209 min_tx,
1210 max_tx,
1211 "Failed to prune event_emit_package table"
1212 );
1213 }
1214 PrunableTable::EventSenders => {
1215 prune_tx_or_event_indice_table!(
1216 event_senders,
1217 conn,
1218 min_tx,
1219 max_tx,
1220 "Failed to prune event_senders table"
1221 );
1222 }
1223 PrunableTable::EventStructInstantiation => {
1224 prune_tx_or_event_indice_table!(
1225 event_struct_instantiation,
1226 conn,
1227 min_tx,
1228 max_tx,
1229 "Failed to prune event_struct_instantiation table"
1230 );
1231 }
1232 PrunableTable::EventStructModule => {
1233 prune_tx_or_event_indice_table!(
1234 event_struct_module,
1235 conn,
1236 min_tx,
1237 max_tx,
1238 "Failed to prune event_struct_module table"
1239 );
1240 }
1241 PrunableTable::EventStructName => {
1242 prune_tx_or_event_indice_table!(
1243 event_struct_name,
1244 conn,
1245 min_tx,
1246 max_tx,
1247 "Failed to prune event_struct_name table"
1248 );
1249 }
1250 PrunableTable::EventStructPackage => {
1251 prune_tx_or_event_indice_table!(
1252 event_struct_package,
1253 conn,
1254 min_tx,
1255 max_tx,
1256 "Failed to prune event_struct_package table"
1257 );
1258 }
1259
1260 PrunableTable::TxSenders => {
1262 prune_tx_or_event_indice_table!(
1263 tx_senders,
1264 conn,
1265 min_tx,
1266 max_tx,
1267 "Failed to prune tx_senders table"
1268 );
1269 }
1270 PrunableTable::TxRecipients => {
1271 prune_tx_or_event_indice_table!(
1272 tx_recipients,
1273 conn,
1274 min_tx,
1275 max_tx,
1276 "Failed to prune tx_recipients table"
1277 );
1278 }
1279 PrunableTable::TxInputObjects => {
1280 prune_tx_or_event_indice_table!(
1281 tx_input_objects,
1282 conn,
1283 min_tx,
1284 max_tx,
1285 "Failed to prune tx_input_objects table"
1286 );
1287 }
1288 PrunableTable::TxChangedObjects => {
1289 prune_tx_or_event_indice_table!(
1290 tx_changed_objects,
1291 conn,
1292 min_tx,
1293 max_tx,
1294 "Failed to prune tx_changed_objects table"
1295 );
1296 }
1297 PrunableTable::TxWrappedOrDeletedObjects => {
1298 prune_tx_or_event_indice_table!(
1299 tx_wrapped_or_deleted_objects,
1300 conn,
1301 min_tx,
1302 max_tx,
1303 "Failed to prune tx_wrapped_or_deleted_objects table"
1304 );
1305 }
1306 PrunableTable::TxCallsPkg => {
1307 prune_tx_or_event_indice_table!(
1308 tx_calls_pkg,
1309 conn,
1310 min_tx,
1311 max_tx,
1312 "Failed to prune tx_calls_pkg table"
1313 );
1314 }
1315 PrunableTable::TxCallsMod => {
1316 prune_tx_or_event_indice_table!(
1317 tx_calls_mod,
1318 conn,
1319 min_tx,
1320 max_tx,
1321 "Failed to prune tx_calls_mod table"
1322 );
1323 }
1324 PrunableTable::TxCallsFun => {
1325 prune_tx_or_event_indice_table!(
1326 tx_calls_fun,
1327 conn,
1328 min_tx,
1329 max_tx,
1330 "Failed to prune tx_calls_fun table"
1331 );
1332 }
1333 PrunableTable::TxDigests => {
1334 prune_tx_or_event_indice_table!(
1335 tx_digests,
1336 conn,
1337 min_tx,
1338 max_tx,
1339 "Failed to prune tx_digests table"
1340 );
1341 }
1342 PrunableTable::TxKinds => {
1343 prune_tx_or_event_indice_table!(
1344 tx_kinds,
1345 conn,
1346 min_tx,
1347 max_tx,
1348 "Failed to prune tx_kinds table"
1349 );
1350 }
1351 PrunableTable::TxGlobalOrder => {
1352 self.prune_tx_global_order(conn, min_tx, max_tx)?;
1353 }
1354 _ => {
1355 return Err(IndexerError::InvalidArgument(format!(
1356 "table {} is not a transaction or event index table",
1357 table.as_ref()
1358 )));
1359 }
1360 }
1361 Ok::<(), IndexerError>(())
1362 },
1363 PG_DB_COMMIT_SLEEP_DURATION
1364 )
1365 }
1366
1367 fn prune_optimistic_tx_by_global_seq(
1370 &self,
1371 start: u64,
1372 end: u64,
1373 limit: i64,
1374 ) -> Result<usize, IndexerError> {
1375 use diesel::prelude::*;
1376
1377 transactional_blocking_with_retry!(
1378 &self.blocking_cp,
1379 |conn| {
1380 let sql = r#"
1381 WITH ids_to_delete AS (
1382 SELECT global_sequence_number, optimistic_sequence_number
1383 FROM optimistic_transactions
1384 WHERE global_sequence_number BETWEEN $1 AND $2
1385 ORDER BY global_sequence_number, optimistic_sequence_number
1386 FOR UPDATE
1387 LIMIT $3
1388 )
1389 DELETE FROM optimistic_transactions otx
1390 USING ids_to_delete
1391 WHERE (otx.global_sequence_number, otx.optimistic_sequence_number) =
1392 (ids_to_delete.global_sequence_number, ids_to_delete.optimistic_sequence_number)
1393 "#;
1394 diesel::sql_query(sql)
1395 .bind::<diesel::sql_types::BigInt, _>(start as i64)
1396 .bind::<diesel::sql_types::BigInt, _>(end as i64)
1397 .bind::<diesel::sql_types::BigInt, _>(limit)
1398 .execute(conn)
1399 .map_err(IndexerError::from)
1400 .context(
1401 format!(
1402 "failed to prune optimistic_transactions table by global_sequence_number range [{start}..={end}] with limit {limit}"
1403 )
1404 .as_str(),
1405 )
1406 },
1407 PG_DB_COMMIT_SLEEP_DURATION
1408 )
1409 }
1410
1411 fn prune_backward_history_by_checkpoint_with_limit(
1412 &self,
1413 start: u64,
1414 end: u64,
1415 limit: i64,
1416 ) -> Result<usize, IndexerError> {
1417 use diesel::prelude::*;
1418
1419 transactional_blocking_with_retry!(
1420 &self.blocking_cp,
1421 |conn| {
1422 let sql = r#"
1423 WITH to_delete AS (
1424 SELECT superseded_at_checkpoint, object_id, object_version
1425 FROM objects_backward_history
1426 WHERE superseded_at_checkpoint BETWEEN $1 AND $2
1427 ORDER BY superseded_at_checkpoint, object_id, object_version
1428 FOR UPDATE
1429 LIMIT $3
1430 )
1431 DELETE FROM objects_backward_history bh
1432 USING to_delete
1433 WHERE (bh.superseded_at_checkpoint, bh.object_id, bh.object_version) =
1434 (to_delete.superseded_at_checkpoint, to_delete.object_id, to_delete.object_version)
1435 "#;
1436 diesel::sql_query(sql)
1437 .bind::<diesel::sql_types::BigInt, _>(start as i64)
1438 .bind::<diesel::sql_types::BigInt, _>(end as i64)
1439 .bind::<diesel::sql_types::BigInt, _>(limit)
1440 .execute(conn)
1441 .map_err(IndexerError::from)
1442 .context(
1443 format!(
1444 "failed to prune objects_backward_history by checkpoint range [{start}..={end}] with limit {limit}"
1445 )
1446 .as_str(),
1447 )
1448 },
1449 PG_DB_COMMIT_SLEEP_DURATION
1450 )
1451 }
1452
1453 fn prune_cp_tx_table_by_range(&self, min_cp: u64, max_cp: u64) -> Result<(), IndexerError> {
1454 transactional_blocking_with_retry!(
1455 &self.blocking_cp,
1456 |conn| {
1457 diesel::delete(
1458 pruner_cp_watermark::table.filter(
1459 pruner_cp_watermark::checkpoint_sequence_number
1460 .between(min_cp as i64, max_cp as i64),
1461 ),
1462 )
1463 .execute(conn)
1464 .map_err(IndexerError::from)
1465 .context("Failed to prune pruner_cp_watermark table by range")?;
1466 Ok::<(), IndexerError>(())
1467 },
1468 PG_DB_COMMIT_SLEEP_DURATION
1469 )
1470 }
1471
1472 fn prune_table_by_checkpoint_range(
1473 &self,
1474 table: &crate::pruning::pruner::PrunableTable,
1475 min_checkpoint: u64,
1476 max_checkpoint: u64,
1477 ) -> Result<(), IndexerError> {
1478 use crate::pruning::pruner::PrunableTable;
1479
1480 match table {
1481 PrunableTable::Checkpoints => {
1482 self.prune_checkpoints_table_by_range(min_checkpoint, max_checkpoint)
1483 }
1484 PrunableTable::PrunerCpWatermark => {
1485 self.prune_cp_tx_table_by_range(min_checkpoint, max_checkpoint)
1486 }
1487 _ => Err(IndexerError::InvalidArgument(format!(
1488 "table {} is not pruned by checkpoint",
1489 table.as_ref()
1490 ))),
1491 }
1492 }
1493
1494 fn get_network_total_transactions_by_end_of_epoch(
1495 &self,
1496 epoch: u64,
1497 ) -> Result<Option<u64>, IndexerError> {
1498 read_only_blocking!(&self.blocking_cp, |conn| {
1499 epochs::table
1500 .filter(epochs::epoch.eq(epoch as i64))
1501 .select(epochs::network_total_transactions)
1502 .get_result::<Option<i64>>(conn)
1503 })
1504 .context(format!("failed to get network total transactions in epoch {epoch}").as_str())
1505 .map(|option| option.map(|v| v as u64))
1506 }
1507
1508 fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
1509 transactional_blocking_with_retry!(
1510 &self.blocking_cp,
1511 |conn| {
1512 diesel::sql_query("REFRESH MATERIALIZED VIEW participation_metrics")
1513 .execute(conn)?;
1514 Ok::<(), IndexerError>(())
1515 },
1516 PG_DB_COMMIT_SLEEP_DURATION
1517 )
1518 .tap_ok(|_| {
1519 info!("Successfully refreshed participation_metrics");
1520 })
1521 .tap_err(|e| {
1522 tracing::error!("failed to refresh participation_metrics: {e}");
1523 })
1524 }
1525
1526 fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1527 &self,
1528 watermark: CommitterWatermark,
1529 ) -> Result<(), IndexerError>
1530 where
1531 E::Iterator: Iterator<Item: AsRef<str>>,
1532 {
1533 use diesel::query_dsl::methods::FilterDsl;
1534
1535 let guard = self
1536 .metrics
1537 .checkpoint_db_commit_latency_watermarks
1538 .start_timer();
1539
1540 let upper_bound_updates = E::iter()
1541 .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1542 .collect::<Vec<_>>();
1543
1544 transactional_blocking_with_retry!(
1545 &self.blocking_cp,
1546 |conn| {
1547 diesel::insert_into(watermarks::table)
1548 .values(&upper_bound_updates)
1549 .on_conflict(watermarks::entity)
1550 .do_update()
1551 .set((
1552 watermarks::current_epoch.eq(excluded(watermarks::current_epoch)),
1553 watermarks::max_committed_cp.eq(excluded(watermarks::max_committed_cp)),
1554 watermarks::max_committed_tx.eq(excluded(watermarks::max_committed_tx)),
1555 ))
1556 .filter(excluded(watermarks::max_committed_cp).ge(watermarks::max_committed_cp))
1557 .filter(excluded(watermarks::max_committed_tx).ge(watermarks::max_committed_tx))
1558 .filter(excluded(watermarks::current_epoch).ge(watermarks::current_epoch))
1559 .execute(conn)
1560 .map_err(IndexerError::from)
1561 .context("Failed to update watermarks upper bound")?;
1562 Ok::<(), IndexerError>(())
1563 },
1564 PG_DB_COMMIT_SLEEP_DURATION
1565 )
1566 .tap_ok(|_| {
1567 let elapsed = guard.stop_and_record();
1568 info!(elapsed, "Persisted watermarks");
1569 })
1570 .tap_err(|e| {
1571 tracing::error!("Failed to persist watermarks with error: {}", e);
1572 })
1573 }
1574
1575 fn map_epochs_to_cp_tx(
1576 &self,
1577 epochs: &[u64],
1578 ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1579 let pool = &self.blocking_cp;
1580 let results: Vec<(i64, i64, i64)> = run_query!(pool, move |conn| {
1581 epochs::table
1582 .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1583 .select((
1584 epochs::epoch,
1585 epochs::first_checkpoint_id,
1586 epochs::first_tx_sequence_number,
1587 ))
1588 .load::<(i64, i64, i64)>(conn)
1589 })
1590 .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1591
1592 Ok(results
1593 .into_iter()
1594 .map(|(epoch, checkpoint, tx)| (epoch as u64, (checkpoint as u64, tx as u64)))
1595 .collect())
1596 }
1597
1598 fn update_watermarks_lower_bound(
1599 &self,
1600 watermarks: Vec<(PrunableTable, u64)>,
1601 ) -> Result<(), IndexerError> {
1602 use diesel::query_dsl::methods::FilterDsl;
1603
1604 let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1605 let epoch_mapping = self.map_epochs_to_cp_tx(&epochs)?;
1606 let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1607 .into_iter()
1608 .map(|(table, epoch)| {
1609 let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1610 IndexerError::PersistentStorageDataCorruption(format!(
1611 "epoch {epoch} not found in epoch mapping",
1612 ))
1613 })?;
1614 Ok(StoredWatermark::from_lower_bound_update(
1615 table.as_ref(),
1616 epoch,
1617 *checkpoint,
1618 *tx,
1619 ))
1620 })
1621 .collect();
1622 let lower_bound_updates = lookups?;
1623 let guard = self
1624 .metrics
1625 .checkpoint_db_commit_latency_watermarks
1626 .start_timer();
1627 transactional_blocking_with_retry!(
1628 &self.blocking_cp,
1629 |conn| {
1630 diesel::insert_into(watermarks::table)
1631 .values(&lower_bound_updates)
1632 .on_conflict(watermarks::entity)
1633 .do_update()
1634 .set((
1635 watermarks::min_available_cp.eq(excluded(watermarks::min_available_cp)),
1636 watermarks::min_available_tx.eq(excluded(watermarks::min_available_tx)),
1637 watermarks::min_available_epoch
1638 .eq(excluded(watermarks::min_available_epoch)),
1639 watermarks::min_bounds_updated_at_timestamp_ms.eq(sql::<
1640 diesel::sql_types::BigInt,
1641 >(
1642 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1643 )),
1644 ))
1645 .filter(excluded(watermarks::min_available_cp).gt(watermarks::min_available_cp))
1646 .filter(excluded(watermarks::min_available_tx).gt(watermarks::min_available_tx))
1647 .filter(
1648 excluded(watermarks::min_available_epoch)
1649 .gt(watermarks::min_available_epoch),
1650 )
1651 .filter(
1652 diesel::dsl::sql::<diesel::sql_types::BigInt>(
1653 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1654 )
1655 .gt(watermarks::min_bounds_updated_at_timestamp_ms),
1656 )
1657 .execute(conn)
1658 },
1659 PG_DB_COMMIT_SLEEP_DURATION
1660 )
1661 .tap_ok(|_| {
1662 let elapsed = guard.stop_and_record();
1663 tracing::info!(elapsed, "Persisted watermarks lower bounds");
1664 })
1665 .tap_err(|e| {
1666 tracing::error!("Failed to persist watermarks with error: {}", e);
1667 })?;
1668 Ok(())
1669 }
1670
1671 fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1672 run_query_with_retry!(
1675 &self.blocking_cp,
1676 |conn| {
1677 let stored = watermarks::table
1678 .load::<StoredWatermark>(conn)
1679 .map_err(Into::into)
1680 .context("Failed reading watermarks from PostgresDB")?;
1681 let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1682 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1683 ))
1684 .get_result(conn)
1685 .map_err(Into::into)
1686 .context("Failed reading current timestamp from PostgresDB")?;
1687 Ok::<_, IndexerError>((stored, timestamp))
1688 },
1689 PG_DB_COMMIT_SLEEP_DURATION
1690 )
1691 }
1692
1693 fn update_watermark_lowest_unpruned_key(
1694 &self,
1695 table: &PrunableTable,
1696 lowest_unpruned_key: u64,
1697 ) -> Result<(), IndexerError> {
1698 transactional_blocking_with_retry!(
1699 &self.blocking_cp,
1700 |conn| {
1701 diesel::update(watermarks::table.filter(watermarks::entity.eq(table.as_ref())))
1702 .set(watermarks::lowest_unpruned_key.eq(lowest_unpruned_key as i64))
1703 .execute(conn)
1704 .map_err(IndexerError::from)
1705 .context("failed to update watermark lowest_unpruned_key")?;
1706 Ok::<(), IndexerError>(())
1707 },
1708 PG_DB_COMMIT_SLEEP_DURATION
1709 )
1710 }
1711
1712 fn get_watermark_by_entity(
1713 &self,
1714 entity: &str,
1715 ) -> Result<Option<StoredWatermark>, IndexerError> {
1716 run_query_with_retry!(
1717 &self.blocking_cp,
1718 |conn| {
1719 watermarks::table
1720 .filter(watermarks::entity.eq(entity))
1721 .first::<StoredWatermark>(conn)
1722 .optional()
1723 .map_err(Into::into)
1724 .context("failed reading watermark by entity from PostgresDB")
1725 },
1726 PG_DB_COMMIT_SLEEP_DURATION
1727 )
1728 }
1729
1730 async fn execute_in_blocking_worker<F, R>(&self, f: F) -> Result<R, IndexerError>
1731 where
1732 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1733 R: Send + 'static,
1734 {
1735 let this = self.clone();
1736 let current_span = tracing::Span::current();
1737 tokio::task::spawn_blocking(move || {
1738 mark_in_blocking_pool();
1739 let _guard = current_span.enter();
1740 f(this)
1741 })
1742 .await
1743 .map_err(Into::into)
1744 .and_then(std::convert::identity)
1745 }
1746
1747 fn spawn_blocking_task<F, R>(
1748 &self,
1749 f: F,
1750 ) -> tokio::task::JoinHandle<std::result::Result<R, IndexerError>>
1751 where
1752 F: FnOnce(Self) -> Result<R, IndexerError> + Send + 'static,
1753 R: Send + 'static,
1754 {
1755 let this = self.clone();
1756 let current_span = tracing::Span::current();
1757 let guard = self.metrics.tokio_blocking_task_wait_latency.start_timer();
1758 tokio::task::spawn_blocking(move || {
1759 mark_in_blocking_pool();
1760 let _guard = current_span.enter();
1761 let _elapsed = guard.stop_and_record();
1762 f(this)
1763 })
1764 }
1765
1766 fn spawn_task<F, Fut, R>(&self, f: F) -> tokio::task::JoinHandle<Result<R, IndexerError>>
1767 where
1768 F: FnOnce(Self) -> Fut + Send + 'static,
1769 Fut: std::future::Future<Output = Result<R, IndexerError>> + Send + 'static,
1770 R: Send + 'static,
1771 {
1772 let this = self.clone();
1773 tokio::task::spawn(async move { f(this).await })
1774 }
1775}
1776
1777#[async_trait]
1778impl IndexerStore for PgIndexerStore {
1779 async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1780 self.execute_in_blocking_worker(|this| this.get_latest_checkpoint_sequence_number())
1781 .await
1782 }
1783
1784 async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1785 self.execute_in_blocking_worker(|this| this.get_prunable_epoch_range())
1786 .await
1787 }
1788
1789 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1790 self.execute_in_blocking_worker(|this| this.get_available_checkpoint_range())
1791 .await
1792 }
1793
1794 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1795 self.execute_in_blocking_worker(|this| this.get_chain_identifier())
1796 .await
1797 }
1798
1799 fn persist_objects_in_existing_transaction(
1800 &self,
1801 conn: &mut PgConnection,
1802 object_changes: Vec<TransactionObjectChangesToCommit>,
1803 ) -> Result<(), IndexerError> {
1804 if object_changes.is_empty() {
1805 return Ok(());
1806 }
1807
1808 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1809 let object_mutations = indexed_mutations
1810 .into_iter()
1811 .map(StoredObject::from)
1812 .collect::<Vec<_>>();
1813 let object_deletions = indexed_deletions
1814 .into_iter()
1815 .map(StoredDeletedObject::from)
1816 .collect::<Vec<_>>();
1817
1818 self.persist_object_mutation_chunk_in_existing_transaction(conn, object_mutations)?;
1819 self.persist_object_deletion_chunk_in_existing_transaction(conn, object_deletions)?;
1820
1821 Ok(())
1822 }
1823
1824 async fn persist_object_versions(
1825 &self,
1826 object_versions: Vec<StoredObjectVersion>,
1827 ) -> Result<(), IndexerError> {
1828 if object_versions.is_empty() {
1829 return Ok(());
1830 }
1831
1832 let guard = self
1833 .metrics
1834 .checkpoint_db_commit_latency_objects_version
1835 .start_timer();
1836
1837 let object_versions_count = object_versions.len();
1838
1839 let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1840 let futures = chunks
1841 .into_iter()
1842 .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
1843 .collect::<Vec<_>>();
1844
1845 futures::future::try_join_all(futures)
1846 .await
1847 .map_err(|e| {
1848 tracing::error!("failed to join persist_object_version_chunk futures: {e}");
1849 IndexerError::from(e)
1850 })?
1851 .into_iter()
1852 .collect::<Result<Vec<_>, _>>()
1853 .map_err(|e| {
1854 IndexerError::PostgresWrite(format!(
1855 "Failed to persist all objects version chunks: {e:?}"
1856 ))
1857 })?;
1858 let elapsed = guard.stop_and_record();
1859 info!(elapsed, "Persisted {object_versions_count} object versions");
1860 Ok(())
1861 }
1862
1863 async fn persist_checkpoints(
1864 &self,
1865 checkpoints: Vec<IndexedCheckpoint>,
1866 ) -> Result<(), IndexerError> {
1867 self.execute_in_blocking_worker(move |this| this.persist_checkpoints(checkpoints))
1868 .await
1869 }
1870
1871 async fn persist_transactions(
1872 &self,
1873 transactions: Vec<IndexedTransaction>,
1874 ) -> Result<(), IndexerError> {
1875 let guard = self
1876 .metrics
1877 .checkpoint_db_commit_latency_transactions
1878 .start_timer();
1879 let len = transactions.len();
1880
1881 let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1882 let futures = chunks
1883 .into_iter()
1884 .map(|c| self.spawn_blocking_task(move |this| this.persist_transactions_chunk(c)));
1885
1886 futures::future::try_join_all(futures)
1887 .await
1888 .map_err(|e| {
1889 tracing::error!("failed to join persist_transactions_chunk futures: {e}");
1890 IndexerError::from(e)
1891 })?
1892 .into_iter()
1893 .collect::<Result<Vec<_>, _>>()
1894 .map_err(|e| {
1895 IndexerError::PostgresWrite(format!(
1896 "Failed to persist all transactions chunks: {e:?}"
1897 ))
1898 })?;
1899 let elapsed = guard.stop_and_record();
1900 info!(elapsed, "Persisted {} transactions", len);
1901 Ok(())
1902 }
1903
1904 fn persist_optimistic_transaction_in_existing_transaction(
1905 &self,
1906 conn: &mut PgConnection,
1907 transaction: OptimisticTransaction,
1908 ) -> Result<(), IndexerError> {
1909 insert_or_ignore_into!(optimistic_transactions::table, &transaction, conn);
1910 Ok(())
1911 }
1912
1913 async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
1914 if events.is_empty() {
1915 return Ok(());
1916 }
1917 let len = events.len();
1918 let guard = self
1919 .metrics
1920 .checkpoint_db_commit_latency_events
1921 .start_timer();
1922 let chunks = chunk!(events, self.config.parallel_chunk_size);
1923 let futures = chunks
1924 .into_iter()
1925 .map(|c| self.spawn_blocking_task(move |this| this.persist_events_chunk(c)));
1926
1927 futures::future::try_join_all(futures)
1928 .await
1929 .map_err(|e| {
1930 tracing::error!("failed to join persist_events_chunk futures: {e}");
1931 IndexerError::from(e)
1932 })?
1933 .into_iter()
1934 .collect::<Result<Vec<_>, _>>()
1935 .map_err(|e| {
1936 IndexerError::PostgresWrite(format!("Failed to persist all events chunks: {e:?}"))
1937 })?;
1938 let elapsed = guard.stop_and_record();
1939 info!(elapsed, "Persisted {} events", len);
1940 Ok(())
1941 }
1942
1943 async fn persist_displays(
1944 &self,
1945 display_updates: BTreeMap<String, StoredDisplay>,
1946 ) -> Result<(), IndexerError> {
1947 if display_updates.is_empty() {
1948 return Ok(());
1949 }
1950
1951 self.spawn_blocking_task(move |this| this.persist_display_updates(display_updates))
1952 .await?
1953 }
1954
1955 fn persist_displays_in_existing_transaction(
1956 &self,
1957 conn: &mut PgConnection,
1958 display_updates: Vec<&StoredDisplay>,
1959 ) -> Result<(), IndexerError> {
1960 if display_updates.is_empty() {
1961 return Ok(());
1962 }
1963
1964 on_conflict_do_update_with_condition!(
1965 display::table,
1966 display_updates,
1967 display::object_type,
1968 (
1969 display::id.eq(excluded(display::id)),
1970 display::version.eq(excluded(display::version)),
1971 display::bcs.eq(excluded(display::bcs)),
1972 ),
1973 excluded(display::version).gt(display::version),
1974 conn
1975 );
1976
1977 Ok(())
1978 }
1979
1980 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
1981 if packages.is_empty() {
1982 return Ok(());
1983 }
1984 self.execute_in_blocking_worker(move |this| this.persist_packages(packages))
1985 .await
1986 }
1987
1988 async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
1989 if indices.is_empty() {
1990 return Ok(());
1991 }
1992 let len = indices.len();
1993 let guard = self
1994 .metrics
1995 .checkpoint_db_commit_latency_event_indices
1996 .start_timer();
1997 let chunks = chunk!(indices, self.config.parallel_chunk_size);
1998
1999 let futures = chunks.into_iter().map(|chunk| {
2000 self.spawn_task(move |this: Self| async move {
2001 this.persist_event_indices_chunk(chunk).await
2002 })
2003 });
2004
2005 futures::future::try_join_all(futures)
2006 .await
2007 .map_err(|e| {
2008 tracing::error!("failed to join persist_event_indices_chunk futures: {e}");
2009 IndexerError::from(e)
2010 })?
2011 .into_iter()
2012 .collect::<Result<Vec<_>, _>>()
2013 .map_err(|e| {
2014 IndexerError::PostgresWrite(format!(
2015 "Failed to persist all event_indices chunks: {e:?}"
2016 ))
2017 })?;
2018 let elapsed = guard.stop_and_record();
2019 info!(elapsed, "Persisted {} event_indices chunks", len);
2020 Ok(())
2021 }
2022
2023 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2024 self.execute_in_blocking_worker(move |this| this.persist_epoch(epoch))
2025 .await
2026 }
2027
2028 async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2029 self.execute_in_blocking_worker(move |this| this.advance_epoch(epoch))
2030 .await
2031 }
2032
2033 async fn get_network_total_transactions_by_end_of_epoch(
2034 &self,
2035 epoch: u64,
2036 ) -> Result<Option<u64>, IndexerError> {
2037 self.execute_in_blocking_worker(move |this| {
2038 this.get_network_total_transactions_by_end_of_epoch(epoch)
2039 })
2040 .await
2041 }
2042
2043 async fn refresh_participation_metrics(&self) -> Result<(), IndexerError> {
2044 self.execute_in_blocking_worker(move |this| this.refresh_participation_metrics())
2045 .await
2046 }
2047
2048 async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2049 &self,
2050 watermark: CommitterWatermark,
2051 ) -> Result<(), IndexerError>
2052 where
2053 E::Iterator: Iterator<Item: AsRef<str>>,
2054 {
2055 self.execute_in_blocking_worker(move |this| {
2056 this.update_watermarks_upper_bound::<E>(watermark)
2057 })
2058 .await
2059 }
2060
2061 fn as_any(&self) -> &dyn StdAny {
2062 self
2063 }
2064
2065 fn persist_protocol_configs_and_feature_flags(
2068 &self,
2069 chain_id: Vec<u8>,
2070 ) -> Result<(), IndexerError> {
2071 let chain_id = ChainIdentifier::from(
2072 CheckpointDigest::from_bytes(chain_id).expect("unable to convert chain id"),
2073 );
2074
2075 let mut all_configs = vec![];
2076 let mut all_flags = vec![];
2077
2078 let (start_version, end_version) = self.get_protocol_version_index_range()?;
2079 info!(
2080 "Persisting protocol configs with start_version: {}, end_version: {}",
2081 start_version, end_version
2082 );
2083
2084 for version in start_version..=end_version {
2087 let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2088 (version as u64).into(),
2089 chain_id.chain(),
2090 )
2091 .ok_or(IndexerError::Generic(format!(
2092 "Unable to fetch protocol version {} and chain {:?}",
2093 version,
2094 chain_id.chain()
2095 )))?;
2096 let configs_vec = protocol_configs
2097 .attr_map()
2098 .into_iter()
2099 .map(|(k, v)| StoredProtocolConfig {
2100 protocol_version: version,
2101 config_name: k,
2102 config_value: v.map(|v| v.to_string()),
2103 })
2104 .collect::<Vec<_>>();
2105 all_configs.extend(configs_vec);
2106
2107 let feature_flags = protocol_configs
2108 .feature_map()
2109 .into_iter()
2110 .map(|(k, v)| StoredFeatureFlag {
2111 protocol_version: version,
2112 flag_name: k,
2113 flag_value: v,
2114 })
2115 .collect::<Vec<_>>();
2116 all_flags.extend(feature_flags);
2117 }
2118
2119 transactional_blocking_with_retry!(
2120 &self.blocking_cp,
2121 |conn| {
2122 for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2123 insert_or_ignore_into!(protocol_configs::table, config_chunk, conn);
2124 }
2125 for flag_chunk in all_flags.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2126 insert_or_ignore_into!(feature_flags::table, flag_chunk, conn);
2127 }
2128 Ok::<(), IndexerError>(())
2129 },
2130 PG_DB_COMMIT_SLEEP_DURATION
2131 )?;
2132 Ok(())
2133 }
2134
2135 async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2136 if indices.is_empty() {
2137 return Ok(());
2138 }
2139 let len = indices.len();
2140 let guard = self
2141 .metrics
2142 .checkpoint_db_commit_latency_tx_indices
2143 .start_timer();
2144 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2145
2146 let futures = chunks.into_iter().map(|chunk| {
2147 self.spawn_task(move |this: Self| async move {
2148 this.persist_tx_indices_chunk_v2(chunk).await
2149 })
2150 });
2151 futures::future::try_join_all(futures)
2152 .await
2153 .map_err(|e| {
2154 tracing::error!("failed to join persist_tx_indices_chunk futures: {e}");
2155 IndexerError::from(e)
2156 })?
2157 .into_iter()
2158 .collect::<Result<Vec<_>, _>>()
2159 .map_err(|e| {
2160 IndexerError::PostgresWrite(format!(
2161 "Failed to persist all tx_indices chunks: {e:?}"
2162 ))
2163 })?;
2164 let elapsed = guard.stop_and_record();
2165 info!(elapsed, "Persisted {} tx_indices chunks", len);
2166 Ok(())
2167 }
2168
2169 async fn persist_objects(
2170 &self,
2171 objects: Vec<CheckpointObjectChanges>,
2172 ) -> Result<(), IndexerError> {
2173 if objects.is_empty() {
2174 return Ok(());
2175 }
2176 let guard = self
2177 .metrics
2178 .checkpoint_db_commit_latency_objects
2179 .start_timer();
2180 let CheckpointObjectChanges {
2181 changed_objects: mutations,
2182 deleted_objects: deletions,
2183 } = retain_latest_objects_from_checkpoint_batch(objects);
2184 let mutation_len = mutations.len();
2185 let deletion_len = deletions.len();
2186
2187 let mutation_chunks = chunk!(mutations, self.config.parallel_objects_chunk_size);
2188 let deletion_chunks = chunk!(deletions, self.config.parallel_objects_chunk_size);
2189 let mutation_futures = mutation_chunks
2190 .into_iter()
2191 .map(|c| self.spawn_blocking_task(move |this| this.persist_changed_objects(c)));
2192 let deletion_futures = deletion_chunks
2193 .into_iter()
2194 .map(|c| self.spawn_blocking_task(move |this| this.persist_removed_objects(c)));
2195 futures::future::try_join_all(mutation_futures.chain(deletion_futures))
2196 .await
2197 .map_err(|e| {
2198 tracing::error!("failed to join futures for persisting objects: {e}");
2199 IndexerError::from(e)
2200 })?
2201 .into_iter()
2202 .collect::<Result<Vec<_>, _>>()
2203 .map_err(|e| {
2204 IndexerError::PostgresWrite(format!("Failed to persist all object chunks: {e:?}",))
2205 })?;
2206
2207 let elapsed = guard.stop_and_record();
2208 info!(
2209 elapsed,
2210 "Persisted objects with {mutation_len} mutations and {deletion_len} deletions",
2211 );
2212 Ok(())
2213 }
2214
2215 async fn persist_object_backward_history(
2216 &self,
2217 objects: Vec<StoredBackwardHistoryObject>,
2218 ) -> Result<(), IndexerError> {
2219 if objects.is_empty() {
2220 return Ok(());
2221 }
2222 let len = objects.len();
2223 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
2224 let futures = chunks.into_iter().map(|c| {
2225 self.spawn_blocking_task(move |this| this.persist_objects_backward_history_chunk(c))
2226 });
2227
2228 futures::future::try_join_all(futures)
2229 .await
2230 .map_err(|e| {
2231 tracing::error!(
2232 "failed to join persist_objects_backward_history_chunk futures: {e}"
2233 );
2234 IndexerError::from(e)
2235 })?
2236 .into_iter()
2237 .collect::<Result<Vec<_>, _>>()
2238 .map_err(|e| {
2239 IndexerError::PostgresWrite(format!(
2240 "Failed to persist all objects backward history chunks: {e:?}"
2241 ))
2242 })?;
2243 info!("Persisted {} objects backward history", len);
2244 Ok(())
2245 }
2246
2247 async fn persist_checkpointed_objects(
2248 &self,
2249 objects: Vec<CheckpointObjectChanges>,
2250 ) -> Result<(), IndexerError> {
2251 if objects.is_empty() {
2252 return Ok(());
2253 }
2254 let CheckpointObjectChanges {
2255 changed_objects: mutations,
2256 deleted_objects: deletions,
2257 } = retain_latest_objects_from_checkpoint_batch(objects);
2258 let mutation_len = mutations.len();
2259 let deletion_len = deletions.len();
2260
2261 let checkpointed_objects: Vec<StoredCheckpointedObject> = mutations
2262 .into_iter()
2263 .map(|live| {
2264 let (indexed, _tx_digest) = live.split();
2265 StoredCheckpointedObject::try_from(indexed)
2266 })
2267 .chain(
2268 deletions
2269 .into_iter()
2270 .map(|removed| Ok(StoredCheckpointedObject::from(removed.indexed_object))),
2271 )
2272 .collect::<Result<Vec<_>, IndexerError>>()?;
2273
2274 let len = checkpointed_objects.len();
2275 let chunks = chunk!(
2276 checkpointed_objects,
2277 self.config.parallel_objects_chunk_size
2278 );
2279 let futures = chunks.into_iter().map(|c| {
2280 self.spawn_blocking_task(move |this| this.persist_checkpointed_objects_chunk(c))
2281 });
2282 futures::future::try_join_all(futures)
2283 .await
2284 .map_err(|e| {
2285 tracing::error!("failed to join futures for persisting checkpointed objects: {e}");
2286 IndexerError::from(e)
2287 })?
2288 .into_iter()
2289 .collect::<Result<Vec<_>, _>>()
2290 .map_err(|e| {
2291 IndexerError::PostgresWrite(format!(
2292 "Failed to persist all checkpointed object chunks: {e:?}",
2293 ))
2294 })?;
2295
2296 info!(
2297 "Persisted {len} checkpointed objects ({mutation_len} mutations, {deletion_len} deletions)",
2298 );
2299 Ok(())
2300 }
2301
2302 async fn persist_tx_global_order(
2303 &self,
2304 tx_order: Vec<TxGlobalOrder>,
2305 ) -> Result<(), IndexerError> {
2306 let guard = self
2307 .metrics
2308 .checkpoint_db_commit_latency_tx_insertion_order
2309 .start_timer();
2310 let len = tx_order.len();
2311
2312 let chunks = chunk!(tx_order, self.config.parallel_chunk_size);
2313 let futures = chunks
2314 .into_iter()
2315 .map(|c| self.spawn_blocking_task(move |this| this.persist_tx_global_order_chunk(c)));
2316
2317 futures::future::try_join_all(futures)
2318 .await
2319 .map_err(|e| {
2320 tracing::error!("failed to join persist_tx_global_order_chunk futures: {e}",);
2321 IndexerError::from(e)
2322 })?
2323 .into_iter()
2324 .collect::<Result<Vec<_>, _>>()
2325 .map_err(|e| {
2326 IndexerError::PostgresWrite(format!(
2327 "Failed to persist all txs insertion order chunks: {e:?}",
2328 ))
2329 })?;
2330 let elapsed = guard.stop_and_record();
2331 info!(elapsed, "Persisted {len} txs insertion orders");
2332 Ok(())
2333 }
2334
2335 async fn update_watermarks_lower_bound(
2336 &self,
2337 watermarks: Vec<(PrunableTable, u64)>,
2338 ) -> Result<(), IndexerError> {
2339 self.execute_in_blocking_worker(move |this| this.update_watermarks_lower_bound(watermarks))
2340 .await
2341 }
2342
2343 async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2344 self.execute_in_blocking_worker(move |this| this.get_watermarks())
2345 .await
2346 }
2347
2348 async fn prune_table_by_checkpoint_range(
2349 &self,
2350 table: &crate::pruning::pruner::PrunableTable,
2351 min_checkpoint: u64,
2352 max_checkpoint: u64,
2353 ) -> Result<(), IndexerError> {
2354 let table_clone = *table;
2355 self.execute_in_blocking_worker(move |this| {
2356 this.prune_table_by_checkpoint_range(&table_clone, min_checkpoint, max_checkpoint)
2357 })
2358 .await
2359 }
2360
2361 async fn prune_table_by_tx_range(
2362 &self,
2363 table: &crate::pruning::pruner::PrunableTable,
2364 min_tx: u64,
2365 max_tx: u64,
2366 ) -> Result<(), IndexerError> {
2367 let table_clone = *table;
2368 self.execute_in_blocking_worker(move |this| {
2369 this.prune_single_tx_or_event_table(&table_clone, min_tx, max_tx)
2370 })
2371 .await
2372 }
2373
2374 async fn prune_table_by_global_seq_with_limit(
2375 &self,
2376 table: &crate::pruning::pruner::PrunableTable,
2377 start: u64,
2378 end: u64,
2379 limit: i64,
2380 ) -> Result<usize, IndexerError> {
2381 use crate::pruning::pruner::PrunableTable;
2382
2383 if !matches!(table, PrunableTable::OptimisticTransactions) {
2384 return Err(IndexerError::InvalidArgument(format!(
2385 "table {} does not support pruning by global order with limit",
2386 table.as_ref()
2387 )));
2388 }
2389
2390 self.execute_in_blocking_worker(move |this| {
2391 this.prune_optimistic_tx_by_global_seq(start, end, limit)
2392 })
2393 .await
2394 }
2395
2396 async fn prune_table_by_checkpoint_with_limit(
2397 &self,
2398 table: &crate::pruning::pruner::PrunableTable,
2399 start: u64,
2400 end: u64,
2401 limit: i64,
2402 ) -> Result<usize, IndexerError> {
2403 use crate::pruning::pruner::PrunableTable;
2404
2405 if !matches!(table, PrunableTable::ObjectsBackwardHistory) {
2406 return Err(IndexerError::InvalidArgument(format!(
2407 "table {} does not support pruning by checkpoint with limit",
2408 table.as_ref()
2409 )));
2410 }
2411
2412 self.execute_in_blocking_worker(move |this| {
2413 this.prune_backward_history_by_checkpoint_with_limit(start, end, limit)
2414 })
2415 .await
2416 }
2417
2418 async fn update_watermark_lowest_unpruned_key(
2419 &self,
2420 table: &PrunableTable,
2421 lowest_unpruned_key: u64,
2422 ) -> Result<(), IndexerError> {
2423 let table = *table;
2424 self.execute_in_blocking_worker(move |this| {
2425 this.update_watermark_lowest_unpruned_key(&table, lowest_unpruned_key)
2426 })
2427 .await
2428 }
2429
2430 async fn get_watermark_by_entity(
2431 &self,
2432 entity: String,
2433 ) -> Result<Option<StoredWatermark>, IndexerError> {
2434 self.execute_in_blocking_worker(move |this| this.get_watermark_by_entity(&entity))
2435 .await
2436 }
2437}
2438
2439fn retain_latest_indexed_objects(
2445 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2446) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2447 use std::collections::HashMap;
2448
2449 let mut mutations = HashMap::<ObjectId, IndexedObject>::new();
2450 let mut deletions = HashMap::<ObjectId, IndexedDeletedObject>::new();
2451
2452 for change in tx_object_changes {
2453 for mutation in change.changed_objects {
2457 let id = mutation.object.id();
2458 let version = mutation.object.version();
2459
2460 if let Some(existing) = deletions.remove(&id) {
2461 assert!(
2462 existing.object_version < version,
2463 "mutation version ({version}) should be greater than existing deletion version ({}) for object {id}",
2464 existing.object_version
2465 );
2466 }
2467
2468 if let Some(existing) = mutations.insert(id, mutation) {
2469 assert!(
2470 existing.object.version() < version,
2471 "mutation version ({version}) should be greater than existing mutation version ({}) for object {id}",
2472 existing.object.version()
2473 );
2474 }
2475 }
2476 for deletion in change.deleted_objects {
2478 let id = deletion.object_id;
2479 let version = deletion.object_version;
2480
2481 if let Some(existing) = mutations.remove(&id) {
2482 assert!(
2483 existing.object.version() < version,
2484 "deletion version ({version}) should be greater than existing mutation version ({}) for object {id}",
2485 existing.object.version(),
2486 );
2487 }
2488
2489 if let Some(existing) = deletions.insert(id, deletion) {
2490 assert!(
2491 existing.object_version < version,
2492 "deletion version ({version}) should be greater than existing deletion version ({}) for object {id}",
2493 existing.object_version
2494 );
2495 }
2496 }
2497 }
2498
2499 (
2500 mutations.into_values().collect(),
2501 deletions.into_values().collect(),
2502 )
2503}