iota_indexer/handlers/
objects_snapshot_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use iota_data_ingestion_core::Worker;
9use iota_metrics::{get_metrics, metered_channel::Sender, spawn_monitored_task};
10use iota_rest_api::CheckpointData;
11use tokio_util::sync::CancellationToken;
12use tracing::info;
13
14use super::{
15    CommonHandler, Handler, TransactionObjectChangesToCommit, checkpoint_handler::CheckpointHandler,
16};
17use crate::{
18    errors::IndexerError,
19    metrics::IndexerMetrics,
20    store::{IndexerStore, PgIndexerStore},
21    types::IndexerResult,
22};
23
24#[derive(Clone)]
25pub struct ObjectsSnapshotHandler {
26    pub store: PgIndexerStore,
27    pub sender: Sender<(u64, TransactionObjectChangesToCommit)>,
28    snapshot_config: SnapshotLagConfig,
29    metrics: IndexerMetrics,
30}
31
32pub struct CheckpointObjectChanges {
33    pub checkpoint_sequence_number: u64,
34    pub object_changes: TransactionObjectChangesToCommit,
35}
36
37#[derive(Debug, Clone)]
38pub struct SnapshotLagConfig {
39    pub snapshot_min_lag: usize,
40    pub sleep_duration: u64,
41}
42
43impl SnapshotLagConfig {
44    const DEFAULT_MIN_LAG: usize = 300;
45    const DEFAULT_SLEEP_DURATION_SEC: u64 = 5;
46}
47
48impl SnapshotLagConfig {
49    pub fn new(min_lag: Option<usize>, sleep_duration: Option<u64>) -> Self {
50        let default_config = Self::default();
51        Self {
52            snapshot_min_lag: min_lag.unwrap_or(default_config.snapshot_min_lag),
53            sleep_duration: sleep_duration.unwrap_or(default_config.sleep_duration),
54        }
55    }
56}
57
58impl Default for SnapshotLagConfig {
59    fn default() -> Self {
60        SnapshotLagConfig {
61            snapshot_min_lag: Self::DEFAULT_MIN_LAG,
62            sleep_duration: Self::DEFAULT_SLEEP_DURATION_SEC,
63        }
64    }
65}
66
67#[async_trait]
68impl Worker for ObjectsSnapshotHandler {
69    type Message = ();
70    type Error = IndexerError;
71
72    async fn process_checkpoint(
73        &self,
74        checkpoint: Arc<CheckpointData>,
75    ) -> Result<Self::Message, Self::Error> {
76        let transformed_data = CheckpointHandler::index_objects(&checkpoint, &self.metrics).await?;
77        self.sender
78            .send((
79                checkpoint.checkpoint_summary.sequence_number,
80                transformed_data,
81            ))
82            .await
83            .map_err(|_| {
84                IndexerError::MpscChannel(
85                    "Failed to send checkpoint object changes, receiver half closed".into(),
86                )
87            })?;
88        Ok(())
89    }
90}
91
92#[async_trait]
93impl Handler<TransactionObjectChangesToCommit> for ObjectsSnapshotHandler {
94    fn name(&self) -> String {
95        "objects_snapshot_handler".to_string()
96    }
97
98    async fn load(
99        &self,
100        transformed_data: Vec<TransactionObjectChangesToCommit>,
101    ) -> IndexerResult<()> {
102        self.store
103            .persist_objects_snapshot(transformed_data)
104            .await?;
105        Ok(())
106    }
107
108    // TODO: read watermark table when it's ready.
109    async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>> {
110        self.store
111            .get_latest_object_snapshot_checkpoint_sequence_number()
112            .await
113    }
114
115    // TODO: update watermark table when it's ready.
116    async fn set_watermark_hi(&self, watermark_hi: u64) -> IndexerResult<()> {
117        self.metrics
118            .latest_object_snapshot_sequence_number
119            .set(watermark_hi as i64);
120        Ok(())
121    }
122
123    async fn get_max_committable_checkpoint(&self) -> IndexerResult<u64> {
124        let latest_checkpoint = self.store.get_latest_checkpoint_sequence_number().await?;
125        Ok(latest_checkpoint
126            .map(|seq| seq.saturating_sub(self.snapshot_config.snapshot_min_lag as u64))
127            .unwrap_or_default()) // hold snapshot handler until at least one checkpoint is in DB
128    }
129}
130
131pub async fn start_objects_snapshot_handler(
132    store: PgIndexerStore,
133    metrics: IndexerMetrics,
134    snapshot_config: SnapshotLagConfig,
135    cancel: CancellationToken,
136) -> IndexerResult<(ObjectsSnapshotHandler, u64)> {
137    info!("Starting object snapshot handler...");
138
139    let global_metrics = get_metrics().unwrap();
140    let (sender, receiver) = iota_metrics::metered_channel::channel(
141        600,
142        &global_metrics
143            .channel_inflight
144            .with_label_values(&["objects_snapshot_handler_checkpoint_data"]),
145    );
146
147    let objects_snapshot_handler =
148        ObjectsSnapshotHandler::new(store.clone(), sender, metrics.clone(), snapshot_config);
149
150    let watermark_hi = objects_snapshot_handler.get_watermark_hi().await?;
151    let common_handler = CommonHandler::new(Box::new(objects_snapshot_handler.clone()));
152    spawn_monitored_task!(common_handler.start_transform_and_load(receiver, cancel));
153    Ok((objects_snapshot_handler, watermark_hi.unwrap_or_default()))
154}
155
156impl ObjectsSnapshotHandler {
157    pub fn new(
158        store: PgIndexerStore,
159        sender: Sender<(u64, TransactionObjectChangesToCommit)>,
160        metrics: IndexerMetrics,
161        snapshot_config: SnapshotLagConfig,
162    ) -> ObjectsSnapshotHandler {
163        Self {
164            store,
165            sender,
166            metrics,
167            snapshot_config,
168        }
169    }
170}