1use std::{collections::BTreeMap, slice, sync::Arc};
6
7use async_trait::async_trait;
8use iota_data_ingestion_core::Worker;
9use iota_json_rpc_types::IotaTransactionKind;
10use iota_metrics::{get_metrics, spawn_monitored_task};
11use iota_rest_api::{CheckpointData, CheckpointTransaction};
12use iota_types::{
13 base_types::ObjectID,
14 dynamic_field::{DynamicFieldInfo, DynamicFieldType},
15 effects::TransactionEffectsAPI,
16 event::{SystemEpochInfoEvent, SystemEpochInfoEventV1, SystemEpochInfoEventV2},
17 iota_system_state::{IotaSystemStateTrait, get_iota_system_state},
18 messages_checkpoint::{
19 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
20 },
21 object::{Object, Owner},
22 transaction::TransactionDataAPI,
23};
24use itertools::Itertools;
25use move_core_types::{
26 self,
27 language_storage::{StructTag, TypeTag},
28};
29use tokio_util::sync::CancellationToken;
30use tracing::info;
31
32use crate::{
33 db::ConnectionPool,
34 errors::IndexerError,
35 handlers::{
36 CheckpointDataToCommit, EpochToCommit, TransactionObjectChangesToCommit,
37 committer::start_tx_checkpoint_commit_task,
38 tx_processor::{EpochEndIndexingObjectStore, TxChangesProcessor},
39 },
40 metrics::IndexerMetrics,
41 models::{display::StoredDisplay, obj_indices::StoredObjectVersion},
42 store::{IndexerStore, PgIndexerStore},
43 types::{
44 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent,
45 IndexedObject, IndexedPackage, IndexedTransaction, IndexerResult,
46 IotaSystemStateSummaryView, TxIndex,
47 },
48};
49
50const CHECKPOINT_QUEUE_SIZE: usize = 100;
51
52pub async fn new_handlers(
53 state: PgIndexerStore,
54 metrics: IndexerMetrics,
55 next_checkpoint_sequence_number: CheckpointSequenceNumber,
56 cancel: CancellationToken,
57) -> Result<CheckpointHandler, IndexerError> {
58 let checkpoint_queue_size = std::env::var("CHECKPOINT_QUEUE_SIZE")
59 .unwrap_or(CHECKPOINT_QUEUE_SIZE.to_string())
60 .parse::<usize>()
61 .unwrap();
62 let global_metrics = get_metrics().unwrap();
63 let (indexed_checkpoint_sender, indexed_checkpoint_receiver) =
64 iota_metrics::metered_channel::channel(
65 checkpoint_queue_size,
66 &global_metrics
67 .channel_inflight
68 .with_label_values(&["checkpoint_indexing"]),
69 );
70
71 let state_clone = state.clone();
72 let metrics_clone = metrics.clone();
73 spawn_monitored_task!(start_tx_checkpoint_commit_task(
74 state_clone,
75 metrics_clone,
76 indexed_checkpoint_receiver,
77 next_checkpoint_sequence_number,
78 cancel.clone()
79 ));
80 Ok(CheckpointHandler::new(
81 state,
82 metrics,
83 indexed_checkpoint_sender,
84 ))
85}
86
87pub struct CheckpointHandler {
88 state: PgIndexerStore,
89 metrics: IndexerMetrics,
90 indexed_checkpoint_sender: iota_metrics::metered_channel::Sender<CheckpointDataToCommit>,
91}
92
93#[async_trait]
94impl Worker for CheckpointHandler {
95 type Message = ();
96 type Error = IndexerError;
97
98 async fn process_checkpoint(
99 &self,
100 checkpoint: Arc<CheckpointData>,
101 ) -> Result<Self::Message, Self::Error> {
102 self.metrics
103 .latest_fullnode_checkpoint_sequence_number
104 .set(checkpoint.checkpoint_summary.sequence_number as i64);
105 let time_now_ms = chrono::Utc::now().timestamp_millis();
106 let cp_download_lag = time_now_ms - checkpoint.checkpoint_summary.timestamp_ms as i64;
107 info!(
108 "checkpoint download lag for cp {}: {} ms",
109 checkpoint.checkpoint_summary.sequence_number, cp_download_lag
110 );
111 self.metrics.download_lag_ms.set(cp_download_lag);
112 self.metrics
113 .max_downloaded_checkpoint_sequence_number
114 .set(checkpoint.checkpoint_summary.sequence_number as i64);
115 self.metrics
116 .downloaded_checkpoint_timestamp_ms
117 .set(checkpoint.checkpoint_summary.timestamp_ms as i64);
118 info!(
119 "Indexer lag: downloaded checkpoint {} with time now {} and checkpoint time {}",
120 checkpoint.checkpoint_summary.sequence_number,
121 time_now_ms,
122 checkpoint.checkpoint_summary.timestamp_ms
123 );
124
125 let checkpoint_data = Self::index_checkpoint(
126 self.state.clone().into(),
127 &checkpoint,
128 Arc::new(self.metrics.clone()),
129 Self::index_packages(slice::from_ref(&checkpoint), &self.metrics),
130 )
131 .await?;
132 self.indexed_checkpoint_sender
133 .send(checkpoint_data)
134 .await
135 .map_err(|_| {
136 IndexerError::MpscChannel(
137 "Failed to send checkpoint data, receiver half closed".into(),
138 )
139 })?;
140 Ok(())
141 }
142}
143
144impl CheckpointHandler {
145 fn new(
146 state: PgIndexerStore,
147 metrics: IndexerMetrics,
148 indexed_checkpoint_sender: iota_metrics::metered_channel::Sender<CheckpointDataToCommit>,
149 ) -> Self {
150 Self {
151 state,
152 metrics,
153 indexed_checkpoint_sender,
154 }
155 }
156
157 async fn index_epoch(
158 state: Arc<PgIndexerStore>,
159 data: &CheckpointData,
160 ) -> Result<Option<EpochToCommit>, IndexerError> {
161 let checkpoint_object_store = EpochEndIndexingObjectStore::new(data);
162
163 let CheckpointData {
164 transactions,
165 checkpoint_summary,
166 checkpoint_contents: _,
167 } = data;
168
169 if *checkpoint_summary.sequence_number() == 0 {
171 info!("Processing genesis epoch");
172 let system_state =
173 get_iota_system_state(&checkpoint_object_store)?.into_iota_system_state_summary();
174 return Ok(Some(EpochToCommit {
175 last_epoch: None,
176 new_epoch: IndexedEpochInfo::from_new_system_state_summary(
177 &system_state,
178 0, None,
180 ),
181 network_total_transactions: 0,
182 }));
183 }
184
185 if checkpoint_summary.end_of_epoch_data.is_none() {
187 return Ok(None);
188 }
189
190 let system_state =
191 get_iota_system_state(&checkpoint_object_store)?.into_iota_system_state_summary();
192 let event = transactions
193 .iter()
194 .flat_map(|t| t.events.as_ref().map(|e| &e.data))
195 .flatten()
196 .find(|ev| ev.is_system_epoch_info_event_v1() || ev.is_system_epoch_info_event_v2())
197 .map(|ev| {
198 if ev.is_system_epoch_info_event_v2() {
199 SystemEpochInfoEvent::V2(
200 bcs::from_bytes::<SystemEpochInfoEventV2>(&ev.contents).expect(
201 "event deserialization should succeed as type was pre-validated",
202 ),
203 )
204 } else {
205 SystemEpochInfoEvent::V1(
206 bcs::from_bytes::<SystemEpochInfoEventV1>(&ev.contents).expect(
207 "event deserialization should succeed as type was pre-validated",
208 ),
209 )
210 }
211 })
212 .unwrap_or_else(|| {
213 panic!(
214 "Can't find SystemEpochInfoEvent in epoch end checkpoint {}",
215 checkpoint_summary.sequence_number()
216 )
217 });
218
219 let epoch = system_state.epoch();
228 let network_tx_count_prev_epoch = match epoch {
229 1 => Ok(0),
231 _ => {
232 let last_epoch = epoch - 2;
233 state
234 .get_network_total_transactions_by_end_of_epoch(last_epoch)
235 .await
236 }
237 }?;
238
239 Ok(Some(EpochToCommit {
240 last_epoch: Some(IndexedEpochInfo::from_end_of_epoch_data(
241 &system_state,
242 checkpoint_summary,
243 &event,
244 network_tx_count_prev_epoch,
245 )),
246 new_epoch: IndexedEpochInfo::from_new_system_state_summary(
247 &system_state,
248 checkpoint_summary.sequence_number + 1, Some(&event),
250 ),
251 network_total_transactions: checkpoint_summary.network_total_transactions,
252 }))
253 }
254
255 fn derive_object_versions(
256 object_history_changes: &TransactionObjectChangesToCommit,
257 ) -> Vec<StoredObjectVersion> {
258 let mut object_versions = vec![];
259 for changed_obj in object_history_changes.changed_objects.iter() {
260 object_versions.push(changed_obj.into());
261 }
262 for deleted_obj in object_history_changes.deleted_objects.iter() {
263 object_versions.push(deleted_obj.into());
264 }
265 object_versions
266 }
267
268 async fn index_checkpoint(
269 state: Arc<PgIndexerStore>,
270 data: &CheckpointData,
271 metrics: Arc<IndexerMetrics>,
272 packages: Vec<IndexedPackage>,
273 ) -> Result<CheckpointDataToCommit, IndexerError> {
274 let checkpoint_seq = data.checkpoint_summary.sequence_number;
275 info!(checkpoint_seq, "Indexing checkpoint data blob");
276
277 let epoch = Self::index_epoch(state, data).await?;
279
280 let object_changes: TransactionObjectChangesToCommit =
282 Self::index_objects(data, &metrics).await?;
283 let object_history_changes: TransactionObjectChangesToCommit =
284 Self::index_objects_history(data).await?;
285 let object_versions = Self::derive_object_versions(&object_history_changes);
286
287 let (checkpoint, db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = {
288 let CheckpointData {
289 transactions,
290 checkpoint_summary,
291 checkpoint_contents,
292 } = data;
293
294 let (db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) =
295 Self::index_transactions(
296 transactions,
297 checkpoint_summary,
298 checkpoint_contents,
299 &metrics,
300 )
301 .await?;
302
303 let successful_tx_num: u64 = db_transactions.iter().map(|t| t.successful_tx_num).sum();
304 (
305 IndexedCheckpoint::from_iota_checkpoint(
306 checkpoint_summary,
307 checkpoint_contents,
308 successful_tx_num as usize,
309 ),
310 db_transactions,
311 db_events,
312 db_tx_indices,
313 db_event_indices,
314 db_displays,
315 )
316 };
317 let time_now_ms = chrono::Utc::now().timestamp_millis();
318 metrics
319 .index_lag_ms
320 .set(time_now_ms - checkpoint.timestamp_ms as i64);
321 metrics
322 .max_indexed_checkpoint_sequence_number
323 .set(checkpoint.sequence_number as i64);
324 metrics
325 .indexed_checkpoint_timestamp_ms
326 .set(checkpoint.timestamp_ms as i64);
327 info!(
328 "Indexer lag: indexed checkpoint {} with time now {} and checkpoint time {}",
329 checkpoint.sequence_number, time_now_ms, checkpoint.timestamp_ms
330 );
331
332 Ok(CheckpointDataToCommit {
333 checkpoint,
334 transactions: db_transactions,
335 events: db_events,
336 tx_indices: db_tx_indices,
337 event_indices: db_event_indices,
338 display_updates: db_displays,
339 object_changes,
340 object_history_changes,
341 object_versions,
342 packages,
343 epoch,
344 })
345 }
346
347 async fn index_transactions(
348 transactions: &[CheckpointTransaction],
349 checkpoint_summary: &CertifiedCheckpointSummary,
350 checkpoint_contents: &CheckpointContents,
351 metrics: &IndexerMetrics,
352 ) -> IndexerResult<(
353 Vec<IndexedTransaction>,
354 Vec<IndexedEvent>,
355 Vec<TxIndex>,
356 Vec<EventIndex>,
357 BTreeMap<String, StoredDisplay>,
358 )> {
359 let checkpoint_seq = checkpoint_summary.sequence_number();
360
361 let mut tx_seq_num_iter = checkpoint_contents
362 .enumerate_transactions(checkpoint_summary)
363 .map(|(seq, execution_digest)| (execution_digest.transaction, seq));
364
365 if checkpoint_contents.size() != transactions.len() {
366 return Err(IndexerError::FullNodeReading(format!(
367 "CheckpointContents has different size {} compared to Transactions {} for checkpoint {}",
368 checkpoint_contents.size(),
369 transactions.len(),
370 checkpoint_seq
371 )));
372 }
373
374 let mut db_transactions = Vec::new();
375 let mut db_events = Vec::new();
376 let mut db_displays = BTreeMap::new();
377 let mut db_tx_indices = Vec::new();
378 let mut db_event_indices = Vec::new();
379
380 for tx in transactions {
381 let (tx_digest, tx_sequence_number) = tx_seq_num_iter.next().unwrap();
383 let actual_tx_digest = tx.transaction.digest();
384 if tx_digest != *actual_tx_digest {
385 return Err(IndexerError::FullNodeReading(format!(
386 "Transactions has different ordering from CheckpointContents, for checkpoint {}, Mismatch found at {} v.s. {}",
387 checkpoint_seq, tx_digest, actual_tx_digest,
388 )));
389 }
390
391 let (indexed_tx, tx_indices, indexed_events, events_indices, stored_displays) =
392 Self::index_transaction(
393 tx,
394 tx_sequence_number,
395 *checkpoint_seq,
396 checkpoint_summary.timestamp_ms,
397 metrics,
398 )
399 .await?;
400 db_transactions.push(indexed_tx);
401 db_tx_indices.push(tx_indices);
402 db_events.extend(indexed_events);
403 db_event_indices.extend(events_indices);
404 db_displays.extend(stored_displays);
405 }
406 Ok((
407 db_transactions,
408 db_events,
409 db_tx_indices,
410 db_event_indices,
411 db_displays,
412 ))
413 }
414
415 pub async fn index_transaction(
416 tx: &CheckpointTransaction,
417 tx_sequence_number: u64,
418 checkpoint_seq: CheckpointSequenceNumber,
419 checkpoint_timestamp_ms: u64,
420 metrics: &IndexerMetrics,
421 ) -> IndexerResult<(
422 IndexedTransaction,
423 TxIndex,
424 Vec<IndexedEvent>,
425 Vec<EventIndex>,
426 BTreeMap<String, StoredDisplay>,
427 )> {
428 let CheckpointTransaction {
429 transaction: sender_signed_data,
430 effects: fx,
431 events,
432 input_objects,
433 output_objects,
434 } = tx;
435
436 let tx_digest = sender_signed_data.digest();
437 let tx = sender_signed_data.transaction_data();
438 let events = events
439 .as_ref()
440 .map(|events| events.data.clone())
441 .unwrap_or_default();
442
443 let transaction_kind = IotaTransactionKind::from(tx.kind());
444
445 let db_events = events
446 .iter()
447 .enumerate()
448 .map(|(idx, event)| {
449 IndexedEvent::from_event(
450 tx_sequence_number,
451 idx as u64,
452 checkpoint_seq,
453 *tx_digest,
454 event,
455 checkpoint_timestamp_ms,
456 )
457 })
458 .collect();
459
460 let db_event_indices = events
461 .iter()
462 .enumerate()
463 .map(|(idx, event)| EventIndex::from_event(tx_sequence_number, idx as u64, event))
464 .collect();
465
466 let db_displays = events
467 .iter()
468 .flat_map(StoredDisplay::try_from_event)
469 .map(|display| (display.object_type.clone(), display))
470 .collect();
471
472 let objects = input_objects
473 .iter()
474 .chain(output_objects.iter())
475 .collect::<Vec<_>>();
476
477 let (balance_change, object_changes) = TxChangesProcessor::new(&objects, metrics.clone())
478 .get_changes(tx, fx, tx_digest)
479 .await?;
480
481 let db_txn = IndexedTransaction {
482 tx_sequence_number,
483 tx_digest: *tx_digest,
484 checkpoint_sequence_number: checkpoint_seq,
485 timestamp_ms: checkpoint_timestamp_ms,
486 sender_signed_data: sender_signed_data.data().clone(),
487 effects: fx.clone(),
488 object_changes,
489 balance_change,
490 events,
491 transaction_kind,
492 successful_tx_num: if fx.status().is_ok() {
493 tx.kind().tx_count() as u64
494 } else {
495 0
496 },
497 };
498
499 let input_objects = tx
501 .input_objects()
502 .expect("committed txns have been validated")
503 .into_iter()
504 .map(|obj_kind| obj_kind.object_id())
505 .collect::<Vec<_>>();
506
507 let changed_objects = fx
509 .all_changed_objects()
510 .into_iter()
511 .map(|(object_ref, _owner, _write_kind)| object_ref.0)
512 .collect::<Vec<_>>();
513
514 let payers = vec![tx.gas_owner()];
516
517 let sender = tx.sender();
519
520 let recipients = fx
522 .all_changed_objects()
523 .into_iter()
524 .filter_map(|(_object_ref, owner, _write_kind)| match owner {
525 Owner::AddressOwner(address) => Some(address),
526 _ => None,
527 })
528 .unique()
529 .collect::<Vec<_>>();
530
531 let move_calls = tx
533 .move_calls()
534 .iter()
535 .map(|(p, m, f)| (*<&ObjectID>::clone(p), m.to_string(), f.to_string()))
536 .collect();
537
538 let db_tx_indices = TxIndex {
539 tx_sequence_number,
540 transaction_digest: *tx_digest,
541 checkpoint_sequence_number: checkpoint_seq,
542 input_objects,
543 changed_objects,
544 sender,
545 payers,
546 recipients,
547 move_calls,
548 tx_kind: transaction_kind,
549 };
550
551 Ok((
552 db_txn,
553 db_tx_indices,
554 db_events,
555 db_event_indices,
556 db_displays,
557 ))
558 }
559
560 pub(crate) async fn index_objects(
561 data: &CheckpointData,
562 metrics: &IndexerMetrics,
563 ) -> Result<TransactionObjectChangesToCommit, IndexerError> {
564 let _timer = metrics.indexing_objects_latency.start_timer();
565 let checkpoint_seq = data.checkpoint_summary.sequence_number;
566
567 let eventually_removed_object_refs_post_version =
568 data.eventually_removed_object_refs_post_version();
569 let indexed_eventually_removed_objects = eventually_removed_object_refs_post_version
570 .into_iter()
571 .map(|obj_ref| IndexedDeletedObject {
572 object_id: obj_ref.0,
573 object_version: obj_ref.1.into(),
574 checkpoint_sequence_number: checkpoint_seq,
575 })
576 .collect();
577
578 let latest_live_output_objects = data.latest_live_output_objects();
579 let changed_objects = latest_live_output_objects
580 .into_iter()
581 .map(|o| {
582 try_extract_df_kind(o)
583 .map(|df_kind| IndexedObject::from_object(checkpoint_seq, o.clone(), df_kind))
584 })
585 .collect::<Result<Vec<_>, _>>()?;
586 Ok(TransactionObjectChangesToCommit {
587 changed_objects,
588 deleted_objects: indexed_eventually_removed_objects,
589 })
590 }
591
592 async fn index_objects_history(
594 data: &CheckpointData,
595 ) -> Result<TransactionObjectChangesToCommit, IndexerError> {
596 let checkpoint_seq = data.checkpoint_summary.sequence_number;
597 let deleted_objects = data
598 .transactions
599 .iter()
600 .flat_map(|tx| tx.removed_object_refs_post_version())
601 .collect::<Vec<_>>();
602 let indexed_deleted_objects: Vec<IndexedDeletedObject> = deleted_objects
603 .into_iter()
604 .map(|obj_ref| IndexedDeletedObject {
605 object_id: obj_ref.0,
606 object_version: obj_ref.1.into(),
607 checkpoint_sequence_number: checkpoint_seq,
608 })
609 .collect();
610
611 let output_objects: Vec<_> = data
612 .transactions
613 .iter()
614 .flat_map(|tx| &tx.output_objects)
615 .collect();
616 let changed_objects = output_objects
619 .into_iter()
620 .map(|o| {
621 try_extract_df_kind(o)
622 .map(|df_kind| IndexedObject::from_object(checkpoint_seq, o.clone(), df_kind))
623 })
624 .collect::<Result<Vec<_>, _>>()?;
625
626 Ok(TransactionObjectChangesToCommit {
627 changed_objects,
628 deleted_objects: indexed_deleted_objects,
629 })
630 }
631
632 fn index_packages(
633 checkpoint_data: &[CheckpointData],
634 metrics: &IndexerMetrics,
635 ) -> Vec<IndexedPackage> {
636 let _timer = metrics.indexing_packages_latency.start_timer();
637 checkpoint_data
638 .iter()
639 .flat_map(|data| {
640 let checkpoint_sequence_number = data.checkpoint_summary.sequence_number;
641 data.transactions
642 .iter()
643 .flat_map(|tx| &tx.output_objects)
644 .filter_map(|o| {
645 if let iota_types::object::Data::Package(p) = &o.data {
646 Some(IndexedPackage {
647 package_id: o.id(),
648 move_package: p.clone(),
649 checkpoint_sequence_number,
650 })
651 } else {
652 None
653 }
654 })
655 .collect::<Vec<_>>()
656 })
657 .collect()
658 }
659
660 pub(crate) fn pg_blocking_cp(state: PgIndexerStore) -> Result<ConnectionPool, IndexerError> {
661 let state_as_any = state.as_any();
662 if let Some(pg_state) = state_as_any.downcast_ref::<PgIndexerStore>() {
663 return Ok(pg_state.blocking_cp());
664 }
665 Err(IndexerError::Uncategorized(anyhow::anyhow!(
666 "Failed to downcast state to PgIndexerStore"
667 )))
668 }
669}
670
671fn try_extract_df_kind(o: &Object) -> IndexerResult<Option<DynamicFieldType>> {
674 let Some(move_object) = o.data.try_as_move() else {
676 return Ok(None);
677 };
678
679 if !move_object.type_().is_dynamic_field() {
680 return Ok(None);
681 }
682
683 let type_: StructTag = move_object.type_().clone().into();
684 let [name, _] = type_.type_params.as_slice() else {
685 return Ok(None);
686 };
687
688 Ok(Some(
689 if matches!(name, TypeTag::Struct(s) if DynamicFieldInfo::is_dynamic_object_field_wrapper(s))
690 {
691 DynamicFieldType::DynamicObject
692 } else {
693 DynamicFieldType::DynamicField
694 },
695 ))
696}