iota_indexer/handlers/
checkpoint_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, slice, sync::Arc};
6
7use async_trait::async_trait;
8use iota_data_ingestion_core::Worker;
9use iota_json_rpc_types::IotaTransactionKind;
10use iota_metrics::{get_metrics, spawn_monitored_task};
11use iota_rest_api::{CheckpointData, CheckpointTransaction};
12use iota_types::{
13    base_types::ObjectID,
14    dynamic_field::{DynamicFieldInfo, DynamicFieldType},
15    effects::TransactionEffectsAPI,
16    event::{SystemEpochInfoEvent, SystemEpochInfoEventV1, SystemEpochInfoEventV2},
17    iota_system_state::{IotaSystemStateTrait, get_iota_system_state},
18    messages_checkpoint::{
19        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
20    },
21    object::{Object, Owner},
22    transaction::TransactionDataAPI,
23};
24use itertools::Itertools;
25use move_core_types::{
26    self,
27    language_storage::{StructTag, TypeTag},
28};
29use tokio_util::sync::CancellationToken;
30use tracing::info;
31
32use crate::{
33    db::ConnectionPool,
34    errors::IndexerError,
35    handlers::{
36        CheckpointDataToCommit, EpochToCommit, TransactionObjectChangesToCommit,
37        committer::start_tx_checkpoint_commit_task,
38        tx_processor::{EpochEndIndexingObjectStore, TxChangesProcessor},
39    },
40    metrics::IndexerMetrics,
41    models::{display::StoredDisplay, obj_indices::StoredObjectVersion},
42    store::{IndexerStore, PgIndexerStore},
43    types::{
44        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent,
45        IndexedObject, IndexedPackage, IndexedTransaction, IndexerResult,
46        IotaSystemStateSummaryView, TxIndex,
47    },
48};
49
50const CHECKPOINT_QUEUE_SIZE: usize = 100;
51
52pub async fn new_handlers(
53    state: PgIndexerStore,
54    metrics: IndexerMetrics,
55    next_checkpoint_sequence_number: CheckpointSequenceNumber,
56    cancel: CancellationToken,
57) -> Result<CheckpointHandler, IndexerError> {
58    let checkpoint_queue_size = std::env::var("CHECKPOINT_QUEUE_SIZE")
59        .unwrap_or(CHECKPOINT_QUEUE_SIZE.to_string())
60        .parse::<usize>()
61        .unwrap();
62    let global_metrics = get_metrics().unwrap();
63    let (indexed_checkpoint_sender, indexed_checkpoint_receiver) =
64        iota_metrics::metered_channel::channel(
65            checkpoint_queue_size,
66            &global_metrics
67                .channel_inflight
68                .with_label_values(&["checkpoint_indexing"]),
69        );
70
71    let state_clone = state.clone();
72    let metrics_clone = metrics.clone();
73    spawn_monitored_task!(start_tx_checkpoint_commit_task(
74        state_clone,
75        metrics_clone,
76        indexed_checkpoint_receiver,
77        next_checkpoint_sequence_number,
78        cancel.clone()
79    ));
80    Ok(CheckpointHandler::new(
81        state,
82        metrics,
83        indexed_checkpoint_sender,
84    ))
85}
86
87pub struct CheckpointHandler {
88    state: PgIndexerStore,
89    metrics: IndexerMetrics,
90    indexed_checkpoint_sender: iota_metrics::metered_channel::Sender<CheckpointDataToCommit>,
91}
92
93#[async_trait]
94impl Worker for CheckpointHandler {
95    type Message = ();
96    type Error = IndexerError;
97
98    async fn process_checkpoint(
99        &self,
100        checkpoint: Arc<CheckpointData>,
101    ) -> Result<Self::Message, Self::Error> {
102        self.metrics
103            .latest_fullnode_checkpoint_sequence_number
104            .set(checkpoint.checkpoint_summary.sequence_number as i64);
105        let time_now_ms = chrono::Utc::now().timestamp_millis();
106        let cp_download_lag = time_now_ms - checkpoint.checkpoint_summary.timestamp_ms as i64;
107        info!(
108            "checkpoint download lag for cp {}: {} ms",
109            checkpoint.checkpoint_summary.sequence_number, cp_download_lag
110        );
111        self.metrics.download_lag_ms.set(cp_download_lag);
112        self.metrics
113            .max_downloaded_checkpoint_sequence_number
114            .set(checkpoint.checkpoint_summary.sequence_number as i64);
115        self.metrics
116            .downloaded_checkpoint_timestamp_ms
117            .set(checkpoint.checkpoint_summary.timestamp_ms as i64);
118        info!(
119            "Indexer lag: downloaded checkpoint {} with time now {} and checkpoint time {}",
120            checkpoint.checkpoint_summary.sequence_number,
121            time_now_ms,
122            checkpoint.checkpoint_summary.timestamp_ms
123        );
124
125        let checkpoint_data = Self::index_checkpoint(
126            self.state.clone().into(),
127            &checkpoint,
128            Arc::new(self.metrics.clone()),
129            Self::index_packages(slice::from_ref(&checkpoint), &self.metrics),
130        )
131        .await?;
132        self.indexed_checkpoint_sender
133            .send(checkpoint_data)
134            .await
135            .map_err(|_| {
136                IndexerError::MpscChannel(
137                    "Failed to send checkpoint data, receiver half closed".into(),
138                )
139            })?;
140        Ok(())
141    }
142}
143
144impl CheckpointHandler {
145    fn new(
146        state: PgIndexerStore,
147        metrics: IndexerMetrics,
148        indexed_checkpoint_sender: iota_metrics::metered_channel::Sender<CheckpointDataToCommit>,
149    ) -> Self {
150        Self {
151            state,
152            metrics,
153            indexed_checkpoint_sender,
154        }
155    }
156
157    async fn index_epoch(
158        state: Arc<PgIndexerStore>,
159        data: &CheckpointData,
160    ) -> Result<Option<EpochToCommit>, IndexerError> {
161        let checkpoint_object_store = EpochEndIndexingObjectStore::new(data);
162
163        let CheckpointData {
164            transactions,
165            checkpoint_summary,
166            checkpoint_contents: _,
167        } = data;
168
169        // Genesis epoch
170        if *checkpoint_summary.sequence_number() == 0 {
171            info!("Processing genesis epoch");
172            let system_state =
173                get_iota_system_state(&checkpoint_object_store)?.into_iota_system_state_summary();
174            return Ok(Some(EpochToCommit {
175                last_epoch: None,
176                new_epoch: IndexedEpochInfo::from_new_system_state_summary(
177                    &system_state,
178                    0, // first_checkpoint_id
179                    None,
180                ),
181                network_total_transactions: 0,
182            }));
183        }
184
185        // If not end of epoch, return
186        if checkpoint_summary.end_of_epoch_data.is_none() {
187            return Ok(None);
188        }
189
190        let system_state =
191            get_iota_system_state(&checkpoint_object_store)?.into_iota_system_state_summary();
192        let event = transactions
193            .iter()
194            .flat_map(|t| t.events.as_ref().map(|e| &e.data))
195            .flatten()
196            .find(|ev| ev.is_system_epoch_info_event_v1() || ev.is_system_epoch_info_event_v2())
197            .map(|ev| {
198                if ev.is_system_epoch_info_event_v2() {
199                    SystemEpochInfoEvent::V2(
200                        bcs::from_bytes::<SystemEpochInfoEventV2>(&ev.contents).expect(
201                            "event deserialization should succeed as type was pre-validated",
202                        ),
203                    )
204                } else {
205                    SystemEpochInfoEvent::V1(
206                        bcs::from_bytes::<SystemEpochInfoEventV1>(&ev.contents).expect(
207                            "event deserialization should succeed as type was pre-validated",
208                        ),
209                    )
210                }
211            })
212            .unwrap_or_else(|| {
213                panic!(
214                    "Can't find SystemEpochInfoEvent in epoch end checkpoint {}",
215                    checkpoint_summary.sequence_number()
216                )
217            });
218
219        // Now we just entered epoch X, we want to calculate the diff between
220        // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2).
221        // Note that on the indexer's chain-reading side, this is not guaranteed
222        // to have the latest data. Rather than impose a wait on the reading
223        // side, however, we overwrite this on the persisting side, where we can
224        // guarantee that the previous epoch's checkpoints have been written to
225        // db.
226
227        let epoch = system_state.epoch();
228        let network_tx_count_prev_epoch = match epoch {
229            // If first epoch change, this number is 0
230            1 => Ok(0),
231            _ => {
232                let last_epoch = epoch - 2;
233                state
234                    .get_network_total_transactions_by_end_of_epoch(last_epoch)
235                    .await
236            }
237        }?;
238
239        Ok(Some(EpochToCommit {
240            last_epoch: Some(IndexedEpochInfo::from_end_of_epoch_data(
241                &system_state,
242                checkpoint_summary,
243                &event,
244                network_tx_count_prev_epoch,
245            )),
246            new_epoch: IndexedEpochInfo::from_new_system_state_summary(
247                &system_state,
248                checkpoint_summary.sequence_number + 1, // first_checkpoint_id
249                Some(&event),
250            ),
251            network_total_transactions: checkpoint_summary.network_total_transactions,
252        }))
253    }
254
255    fn derive_object_versions(
256        object_history_changes: &TransactionObjectChangesToCommit,
257    ) -> Vec<StoredObjectVersion> {
258        let mut object_versions = vec![];
259        for changed_obj in object_history_changes.changed_objects.iter() {
260            object_versions.push(changed_obj.into());
261        }
262        for deleted_obj in object_history_changes.deleted_objects.iter() {
263            object_versions.push(deleted_obj.into());
264        }
265        object_versions
266    }
267
268    async fn index_checkpoint(
269        state: Arc<PgIndexerStore>,
270        data: &CheckpointData,
271        metrics: Arc<IndexerMetrics>,
272        packages: Vec<IndexedPackage>,
273    ) -> Result<CheckpointDataToCommit, IndexerError> {
274        let checkpoint_seq = data.checkpoint_summary.sequence_number;
275        info!(checkpoint_seq, "Indexing checkpoint data blob");
276
277        // Index epoch
278        let epoch = Self::index_epoch(state, data).await?;
279
280        // Index Objects
281        let object_changes: TransactionObjectChangesToCommit =
282            Self::index_objects(data, &metrics).await?;
283        let object_history_changes: TransactionObjectChangesToCommit =
284            Self::index_objects_history(data).await?;
285        let object_versions = Self::derive_object_versions(&object_history_changes);
286
287        let (checkpoint, db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = {
288            let CheckpointData {
289                transactions,
290                checkpoint_summary,
291                checkpoint_contents,
292            } = data;
293
294            let (db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) =
295                Self::index_transactions(
296                    transactions,
297                    checkpoint_summary,
298                    checkpoint_contents,
299                    &metrics,
300                )
301                .await?;
302
303            let successful_tx_num: u64 = db_transactions.iter().map(|t| t.successful_tx_num).sum();
304            (
305                IndexedCheckpoint::from_iota_checkpoint(
306                    checkpoint_summary,
307                    checkpoint_contents,
308                    successful_tx_num as usize,
309                ),
310                db_transactions,
311                db_events,
312                db_tx_indices,
313                db_event_indices,
314                db_displays,
315            )
316        };
317        let time_now_ms = chrono::Utc::now().timestamp_millis();
318        metrics
319            .index_lag_ms
320            .set(time_now_ms - checkpoint.timestamp_ms as i64);
321        metrics
322            .max_indexed_checkpoint_sequence_number
323            .set(checkpoint.sequence_number as i64);
324        metrics
325            .indexed_checkpoint_timestamp_ms
326            .set(checkpoint.timestamp_ms as i64);
327        info!(
328            "Indexer lag: indexed checkpoint {} with time now {} and checkpoint time {}",
329            checkpoint.sequence_number, time_now_ms, checkpoint.timestamp_ms
330        );
331
332        Ok(CheckpointDataToCommit {
333            checkpoint,
334            transactions: db_transactions,
335            events: db_events,
336            tx_indices: db_tx_indices,
337            event_indices: db_event_indices,
338            display_updates: db_displays,
339            object_changes,
340            object_history_changes,
341            object_versions,
342            packages,
343            epoch,
344        })
345    }
346
347    async fn index_transactions(
348        transactions: &[CheckpointTransaction],
349        checkpoint_summary: &CertifiedCheckpointSummary,
350        checkpoint_contents: &CheckpointContents,
351        metrics: &IndexerMetrics,
352    ) -> IndexerResult<(
353        Vec<IndexedTransaction>,
354        Vec<IndexedEvent>,
355        Vec<TxIndex>,
356        Vec<EventIndex>,
357        BTreeMap<String, StoredDisplay>,
358    )> {
359        let checkpoint_seq = checkpoint_summary.sequence_number();
360
361        let mut tx_seq_num_iter = checkpoint_contents
362            .enumerate_transactions(checkpoint_summary)
363            .map(|(seq, execution_digest)| (execution_digest.transaction, seq));
364
365        if checkpoint_contents.size() != transactions.len() {
366            return Err(IndexerError::FullNodeReading(format!(
367                "CheckpointContents has different size {} compared to Transactions {} for checkpoint {}",
368                checkpoint_contents.size(),
369                transactions.len(),
370                checkpoint_seq
371            )));
372        }
373
374        let mut db_transactions = Vec::new();
375        let mut db_events = Vec::new();
376        let mut db_displays = BTreeMap::new();
377        let mut db_tx_indices = Vec::new();
378        let mut db_event_indices = Vec::new();
379
380        for tx in transactions {
381            // Unwrap safe - we checked they have equal length above
382            let (tx_digest, tx_sequence_number) = tx_seq_num_iter.next().unwrap();
383            let actual_tx_digest = tx.transaction.digest();
384            if tx_digest != *actual_tx_digest {
385                return Err(IndexerError::FullNodeReading(format!(
386                    "Transactions has different ordering from CheckpointContents, for checkpoint {}, Mismatch found at {} v.s. {}",
387                    checkpoint_seq, tx_digest, actual_tx_digest,
388                )));
389            }
390
391            let (indexed_tx, tx_indices, indexed_events, events_indices, stored_displays) =
392                Self::index_transaction(
393                    tx,
394                    tx_sequence_number,
395                    *checkpoint_seq,
396                    checkpoint_summary.timestamp_ms,
397                    metrics,
398                )
399                .await?;
400            db_transactions.push(indexed_tx);
401            db_tx_indices.push(tx_indices);
402            db_events.extend(indexed_events);
403            db_event_indices.extend(events_indices);
404            db_displays.extend(stored_displays);
405        }
406        Ok((
407            db_transactions,
408            db_events,
409            db_tx_indices,
410            db_event_indices,
411            db_displays,
412        ))
413    }
414
415    pub async fn index_transaction(
416        tx: &CheckpointTransaction,
417        tx_sequence_number: u64,
418        checkpoint_seq: CheckpointSequenceNumber,
419        checkpoint_timestamp_ms: u64,
420        metrics: &IndexerMetrics,
421    ) -> IndexerResult<(
422        IndexedTransaction,
423        TxIndex,
424        Vec<IndexedEvent>,
425        Vec<EventIndex>,
426        BTreeMap<String, StoredDisplay>,
427    )> {
428        let CheckpointTransaction {
429            transaction: sender_signed_data,
430            effects: fx,
431            events,
432            input_objects,
433            output_objects,
434        } = tx;
435
436        let tx_digest = sender_signed_data.digest();
437        let tx = sender_signed_data.transaction_data();
438        let events = events
439            .as_ref()
440            .map(|events| events.data.clone())
441            .unwrap_or_default();
442
443        let transaction_kind = IotaTransactionKind::from(tx.kind());
444
445        let db_events = events
446            .iter()
447            .enumerate()
448            .map(|(idx, event)| {
449                IndexedEvent::from_event(
450                    tx_sequence_number,
451                    idx as u64,
452                    checkpoint_seq,
453                    *tx_digest,
454                    event,
455                    checkpoint_timestamp_ms,
456                )
457            })
458            .collect();
459
460        let db_event_indices = events
461            .iter()
462            .enumerate()
463            .map(|(idx, event)| EventIndex::from_event(tx_sequence_number, idx as u64, event))
464            .collect();
465
466        let db_displays = events
467            .iter()
468            .flat_map(StoredDisplay::try_from_event)
469            .map(|display| (display.object_type.clone(), display))
470            .collect();
471
472        let objects = input_objects
473            .iter()
474            .chain(output_objects.iter())
475            .collect::<Vec<_>>();
476
477        let (balance_change, object_changes) = TxChangesProcessor::new(&objects, metrics.clone())
478            .get_changes(tx, fx, tx_digest)
479            .await?;
480
481        let db_txn = IndexedTransaction {
482            tx_sequence_number,
483            tx_digest: *tx_digest,
484            checkpoint_sequence_number: checkpoint_seq,
485            timestamp_ms: checkpoint_timestamp_ms,
486            sender_signed_data: sender_signed_data.data().clone(),
487            effects: fx.clone(),
488            object_changes,
489            balance_change,
490            events,
491            transaction_kind,
492            successful_tx_num: if fx.status().is_ok() {
493                tx.kind().tx_count() as u64
494            } else {
495                0
496            },
497        };
498
499        // Input Objects
500        let input_objects = tx
501            .input_objects()
502            .expect("committed txns have been validated")
503            .into_iter()
504            .map(|obj_kind| obj_kind.object_id())
505            .collect::<Vec<_>>();
506
507        // Changed Objects
508        let changed_objects = fx
509            .all_changed_objects()
510            .into_iter()
511            .map(|(object_ref, _owner, _write_kind)| object_ref.0)
512            .collect::<Vec<_>>();
513
514        // Payers
515        let payers = vec![tx.gas_owner()];
516
517        // Sender
518        let sender = tx.sender();
519
520        // Recipients
521        let recipients = fx
522            .all_changed_objects()
523            .into_iter()
524            .filter_map(|(_object_ref, owner, _write_kind)| match owner {
525                Owner::AddressOwner(address) => Some(address),
526                _ => None,
527            })
528            .unique()
529            .collect::<Vec<_>>();
530
531        // Move Calls
532        let move_calls = tx
533            .move_calls()
534            .iter()
535            .map(|(p, m, f)| (*<&ObjectID>::clone(p), m.to_string(), f.to_string()))
536            .collect();
537
538        let db_tx_indices = TxIndex {
539            tx_sequence_number,
540            transaction_digest: *tx_digest,
541            checkpoint_sequence_number: checkpoint_seq,
542            input_objects,
543            changed_objects,
544            sender,
545            payers,
546            recipients,
547            move_calls,
548            tx_kind: transaction_kind,
549        };
550
551        Ok((
552            db_txn,
553            db_tx_indices,
554            db_events,
555            db_event_indices,
556            db_displays,
557        ))
558    }
559
560    pub(crate) async fn index_objects(
561        data: &CheckpointData,
562        metrics: &IndexerMetrics,
563    ) -> Result<TransactionObjectChangesToCommit, IndexerError> {
564        let _timer = metrics.indexing_objects_latency.start_timer();
565        let checkpoint_seq = data.checkpoint_summary.sequence_number;
566
567        let eventually_removed_object_refs_post_version =
568            data.eventually_removed_object_refs_post_version();
569        let indexed_eventually_removed_objects = eventually_removed_object_refs_post_version
570            .into_iter()
571            .map(|obj_ref| IndexedDeletedObject {
572                object_id: obj_ref.0,
573                object_version: obj_ref.1.into(),
574                checkpoint_sequence_number: checkpoint_seq,
575            })
576            .collect();
577
578        let latest_live_output_objects = data.latest_live_output_objects();
579        let changed_objects = latest_live_output_objects
580            .into_iter()
581            .map(|o| {
582                try_extract_df_kind(o)
583                    .map(|df_kind| IndexedObject::from_object(checkpoint_seq, o.clone(), df_kind))
584            })
585            .collect::<Result<Vec<_>, _>>()?;
586        Ok(TransactionObjectChangesToCommit {
587            changed_objects,
588            deleted_objects: indexed_eventually_removed_objects,
589        })
590    }
591
592    // similar to index_objects, but objects_history keeps all versions of objects
593    async fn index_objects_history(
594        data: &CheckpointData,
595    ) -> Result<TransactionObjectChangesToCommit, IndexerError> {
596        let checkpoint_seq = data.checkpoint_summary.sequence_number;
597        let deleted_objects = data
598            .transactions
599            .iter()
600            .flat_map(|tx| tx.removed_object_refs_post_version())
601            .collect::<Vec<_>>();
602        let indexed_deleted_objects: Vec<IndexedDeletedObject> = deleted_objects
603            .into_iter()
604            .map(|obj_ref| IndexedDeletedObject {
605                object_id: obj_ref.0,
606                object_version: obj_ref.1.into(),
607                checkpoint_sequence_number: checkpoint_seq,
608            })
609            .collect();
610
611        let output_objects: Vec<_> = data
612            .transactions
613            .iter()
614            .flat_map(|tx| &tx.output_objects)
615            .collect();
616        // TODO(gegaowp): the current df_info implementation is not correct,
617        // but we have decided remove all df_* except df_kind.
618        let changed_objects = output_objects
619            .into_iter()
620            .map(|o| {
621                try_extract_df_kind(o)
622                    .map(|df_kind| IndexedObject::from_object(checkpoint_seq, o.clone(), df_kind))
623            })
624            .collect::<Result<Vec<_>, _>>()?;
625
626        Ok(TransactionObjectChangesToCommit {
627            changed_objects,
628            deleted_objects: indexed_deleted_objects,
629        })
630    }
631
632    fn index_packages(
633        checkpoint_data: &[CheckpointData],
634        metrics: &IndexerMetrics,
635    ) -> Vec<IndexedPackage> {
636        let _timer = metrics.indexing_packages_latency.start_timer();
637        checkpoint_data
638            .iter()
639            .flat_map(|data| {
640                let checkpoint_sequence_number = data.checkpoint_summary.sequence_number;
641                data.transactions
642                    .iter()
643                    .flat_map(|tx| &tx.output_objects)
644                    .filter_map(|o| {
645                        if let iota_types::object::Data::Package(p) = &o.data {
646                            Some(IndexedPackage {
647                                package_id: o.id(),
648                                move_package: p.clone(),
649                                checkpoint_sequence_number,
650                            })
651                        } else {
652                            None
653                        }
654                    })
655                    .collect::<Vec<_>>()
656            })
657            .collect()
658    }
659
660    pub(crate) fn pg_blocking_cp(state: PgIndexerStore) -> Result<ConnectionPool, IndexerError> {
661        let state_as_any = state.as_any();
662        if let Some(pg_state) = state_as_any.downcast_ref::<PgIndexerStore>() {
663            return Ok(pg_state.blocking_cp());
664        }
665        Err(IndexerError::Uncategorized(anyhow::anyhow!(
666            "Failed to downcast state to PgIndexerStore"
667        )))
668    }
669}
670
671/// If `o` is a dynamic `Field<K, V>`, determine whether it represents a Dynamic
672/// Field or a Dynamic Object Field based on its type.
673fn try_extract_df_kind(o: &Object) -> IndexerResult<Option<DynamicFieldType>> {
674    // Skip if not a move object
675    let Some(move_object) = o.data.try_as_move() else {
676        return Ok(None);
677    };
678
679    if !move_object.type_().is_dynamic_field() {
680        return Ok(None);
681    }
682
683    let type_: StructTag = move_object.type_().clone().into();
684    let [name, _] = type_.type_params.as_slice() else {
685        return Ok(None);
686    };
687
688    Ok(Some(
689        if matches!(name, TypeTag::Struct(s) if DynamicFieldInfo::is_dynamic_object_field_wrapper(s))
690        {
691            DynamicFieldType::DynamicObject
692        } else {
693            DynamicFieldType::DynamicField
694        },
695    ))
696}