iota_indexer/handlers/
objects_snapshot_handler.rs1use std::sync::Arc;
6
7use async_trait::async_trait;
8use iota_data_ingestion_core::Worker;
9use iota_metrics::{get_metrics, metered_channel::Sender, spawn_monitored_task};
10use iota_rest_api::CheckpointData;
11use tokio_util::sync::CancellationToken;
12use tracing::info;
13
14use super::{
15 CommonHandler, Handler, TransactionObjectChangesToCommit, checkpoint_handler::CheckpointHandler,
16};
17use crate::{
18 errors::IndexerError,
19 metrics::IndexerMetrics,
20 store::{IndexerStore, PgIndexerStore},
21 types::IndexerResult,
22};
23
24#[derive(Clone)]
25pub struct ObjectsSnapshotHandler {
26 pub store: PgIndexerStore,
27 pub sender: Sender<(u64, TransactionObjectChangesToCommit)>,
28 snapshot_config: SnapshotLagConfig,
29 metrics: IndexerMetrics,
30}
31
32pub struct CheckpointObjectChanges {
33 pub checkpoint_sequence_number: u64,
34 pub object_changes: TransactionObjectChangesToCommit,
35}
36
37#[derive(Debug, Clone)]
38pub struct SnapshotLagConfig {
39 pub snapshot_min_lag: usize,
40 pub sleep_duration: u64,
41}
42
43impl SnapshotLagConfig {
44 const DEFAULT_MIN_LAG: usize = 300;
45 const DEFAULT_SLEEP_DURATION_SEC: u64 = 5;
46}
47
48impl SnapshotLagConfig {
49 pub fn new(min_lag: Option<usize>, sleep_duration: Option<u64>) -> Self {
50 let default_config = Self::default();
51 Self {
52 snapshot_min_lag: min_lag.unwrap_or(default_config.snapshot_min_lag),
53 sleep_duration: sleep_duration.unwrap_or(default_config.sleep_duration),
54 }
55 }
56}
57
58impl Default for SnapshotLagConfig {
59 fn default() -> Self {
60 SnapshotLagConfig {
61 snapshot_min_lag: Self::DEFAULT_MIN_LAG,
62 sleep_duration: Self::DEFAULT_SLEEP_DURATION_SEC,
63 }
64 }
65}
66
67#[async_trait]
68impl Worker for ObjectsSnapshotHandler {
69 type Message = ();
70 type Error = IndexerError;
71
72 async fn process_checkpoint(
73 &self,
74 checkpoint: Arc<CheckpointData>,
75 ) -> Result<Self::Message, Self::Error> {
76 let transformed_data = CheckpointHandler::index_objects(&checkpoint, &self.metrics).await?;
77 self.sender
78 .send((
79 checkpoint.checkpoint_summary.sequence_number,
80 transformed_data,
81 ))
82 .await
83 .map_err(|_| {
84 IndexerError::MpscChannel(
85 "Failed to send checkpoint object changes, receiver half closed".into(),
86 )
87 })?;
88 Ok(())
89 }
90}
91
92#[async_trait]
93impl Handler<TransactionObjectChangesToCommit> for ObjectsSnapshotHandler {
94 fn name(&self) -> String {
95 "objects_snapshot_handler".to_string()
96 }
97
98 async fn load(
99 &self,
100 transformed_data: Vec<TransactionObjectChangesToCommit>,
101 ) -> IndexerResult<()> {
102 self.store
103 .persist_objects_snapshot(transformed_data)
104 .await?;
105 Ok(())
106 }
107
108 async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>> {
110 self.store
111 .get_latest_object_snapshot_checkpoint_sequence_number()
112 .await
113 }
114
115 async fn set_watermark_hi(&self, watermark_hi: u64) -> IndexerResult<()> {
117 self.metrics
118 .latest_object_snapshot_sequence_number
119 .set(watermark_hi as i64);
120 Ok(())
121 }
122
123 async fn get_max_committable_checkpoint(&self) -> IndexerResult<u64> {
124 let latest_checkpoint = self.store.get_latest_checkpoint_sequence_number().await?;
125 Ok(latest_checkpoint
126 .map(|seq| seq.saturating_sub(self.snapshot_config.snapshot_min_lag as u64))
127 .unwrap_or_default()) }
129}
130
131pub async fn start_objects_snapshot_handler(
132 store: PgIndexerStore,
133 metrics: IndexerMetrics,
134 snapshot_config: SnapshotLagConfig,
135 cancel: CancellationToken,
136) -> IndexerResult<(ObjectsSnapshotHandler, u64)> {
137 info!("Starting object snapshot handler...");
138
139 let global_metrics = get_metrics().unwrap();
140 let (sender, receiver) = iota_metrics::metered_channel::channel(
141 600,
142 &global_metrics
143 .channel_inflight
144 .with_label_values(&["objects_snapshot_handler_checkpoint_data"]),
145 );
146
147 let objects_snapshot_handler =
148 ObjectsSnapshotHandler::new(store.clone(), sender, metrics.clone(), snapshot_config);
149
150 let watermark_hi = objects_snapshot_handler.get_watermark_hi().await?;
151 let common_handler = CommonHandler::new(Box::new(objects_snapshot_handler.clone()));
152 spawn_monitored_task!(common_handler.start_transform_and_load(receiver, cancel));
153 Ok((objects_snapshot_handler, watermark_hi.unwrap_or_default()))
154}
155
156impl ObjectsSnapshotHandler {
157 pub fn new(
158 store: PgIndexerStore,
159 sender: Sender<(u64, TransactionObjectChangesToCommit)>,
160 metrics: IndexerMetrics,
161 snapshot_config: SnapshotLagConfig,
162 ) -> ObjectsSnapshotHandler {
163 Self {
164 store,
165 sender,
166 metrics,
167 snapshot_config,
168 }
169 }
170}