1use std::collections::{BTreeMap, HashMap};
6
7use iota_types::messages_checkpoint::CheckpointSequenceNumber;
8use tap::tap::TapFallible;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info, instrument};
11
12use super::{CheckpointDataToCommit, CheckpointDataToCommitV2, EpochToCommit};
13use crate::{
14 metrics::IndexerMetrics,
15 store::{IndexerStore, IndexerStoreExt},
16 types::IndexerResult,
17};
18
19pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
20
21pub async fn start_tx_checkpoint_commit_task<S>(
22 state: S,
23 metrics: IndexerMetrics,
24 tx_indexing_receiver: iota_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
25 mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
26 cancel: CancellationToken,
27) -> IndexerResult<()>
28where
29 S: IndexerStore + Clone + Sync + Send + 'static,
30{
31 use futures::StreamExt;
32
33 info!("Indexer checkpoint commit task started...");
34 let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
35 .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
36 .parse::<usize>()
37 .unwrap();
38 info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
39
40 let mut stream = iota_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
41 .ready_chunks(checkpoint_commit_batch_size);
42
43 let mut unprocessed = HashMap::new();
44 let mut batch = vec![];
45
46 while let Some(indexed_checkpoint_batch) = stream.next().await {
47 if cancel.is_cancelled() {
48 break;
49 }
50
51 for checkpoint in indexed_checkpoint_batch {
53 unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
54 }
55 while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
56 let epoch = checkpoint.epoch.clone();
57 batch.push(checkpoint);
58 next_checkpoint_sequence_number += 1;
59 if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
60 commit_checkpoints(&state, batch, epoch, &metrics).await;
61 batch = vec![];
62 }
63 }
64 if !batch.is_empty() {
65 commit_checkpoints(&state, batch, None, &metrics).await;
66 batch = vec![];
67 }
68 }
69 Ok(())
70}
71
72#[expect(unused)]
73pub(crate) async fn start_tx_checkpoint_commit_task_v2<S>(
74 state: S,
75 metrics: IndexerMetrics,
76 tx_indexing_receiver: iota_metrics::metered_channel::Receiver<CheckpointDataToCommitV2>,
77 mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
78 cancel: CancellationToken,
79) -> IndexerResult<()>
80where
81 S: IndexerStoreExt + Clone + Sync + Send + 'static,
82{
83 use futures::StreamExt;
84
85 info!("Indexer checkpoint commit task started...");
86 let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
87 .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
88 .parse::<usize>()
89 .unwrap();
90 info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
91
92 let mut stream = iota_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
93 .ready_chunks(checkpoint_commit_batch_size);
94
95 let mut unprocessed = HashMap::new();
96 let mut batch = vec![];
97
98 while let Some(indexed_checkpoint_batch) = stream.next().await {
99 if cancel.is_cancelled() {
100 break;
101 }
102
103 for checkpoint in indexed_checkpoint_batch {
105 unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
106 }
107 while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
108 let epoch = checkpoint.epoch.clone();
109 batch.push(checkpoint);
110 next_checkpoint_sequence_number += 1;
111 if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
112 commit_checkpoints_v2(&state, batch, epoch, &metrics).await;
113 batch = vec![];
114 }
115 }
116 if !batch.is_empty() && unprocessed.is_empty() {
117 commit_checkpoints_v2(&state, batch, None, &metrics).await;
118 batch = vec![];
119 }
120 }
121 Ok(())
122}
123
124#[instrument(skip_all, fields(
126 first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
127 last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
128))]
129async fn commit_checkpoints<S>(
130 state: &S,
131 indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
132 epoch: Option<EpochToCommit>,
133 metrics: &IndexerMetrics,
134) where
135 S: IndexerStore + Clone + Sync + Send + 'static,
136{
137 let mut checkpoint_batch = vec![];
138 let mut tx_batch = vec![];
139 let mut events_batch = vec![];
140 let mut tx_indices_batch = vec![];
141 let mut event_indices_batch = vec![];
142 let mut display_updates_batch = BTreeMap::new();
143 let mut object_changes_batch = vec![];
144 let mut object_history_changes_batch = vec![];
145 let mut object_versions_batch = vec![];
146 let mut packages_batch = vec![];
147
148 for indexed_checkpoint in indexed_checkpoint_batch {
149 let CheckpointDataToCommit {
150 checkpoint,
151 transactions,
152 events,
153 event_indices,
154 tx_indices,
155 display_updates,
156 object_changes,
157 object_history_changes,
158 object_versions,
159 packages,
160 epoch: _,
161 } = indexed_checkpoint;
162 checkpoint_batch.push(checkpoint);
163 tx_batch.push(transactions);
164 events_batch.push(events);
165 tx_indices_batch.push(tx_indices);
166 event_indices_batch.push(event_indices);
167 display_updates_batch.extend(display_updates.into_iter());
168 object_changes_batch.push(object_changes);
169 object_history_changes_batch.push(object_history_changes);
170 object_versions_batch.push(object_versions);
171 packages_batch.push(packages);
172 }
173
174 let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
175 let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
176
177 let guard = metrics.checkpoint_db_commit_latency.start_timer();
178 let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
179
180 let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
181
182 let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
183 let event_indices_batch = event_indices_batch
184 .into_iter()
185 .flatten()
186 .collect::<Vec<_>>();
187 let object_versions_batch = object_versions_batch
188 .into_iter()
189 .flatten()
190 .collect::<Vec<_>>();
191 let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
192 let checkpoint_num = checkpoint_batch.len();
193 let tx_count = tx_batch.len();
194
195 {
196 let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
197 let mut persist_tasks = vec![
198 state.persist_transactions(tx_batch),
199 state.persist_tx_indices(tx_indices_batch),
200 state.persist_events(events_batch),
201 state.persist_event_indices(event_indices_batch),
202 state.persist_displays(display_updates_batch),
203 state.persist_packages(packages_batch),
204 state.persist_objects(object_changes_batch.clone()),
205 state.persist_object_history(object_history_changes_batch.clone()),
206 state.persist_object_versions(object_versions_batch.clone()),
207 ];
208 if let Some(epoch_data) = epoch.clone() {
209 persist_tasks.push(state.persist_epoch(epoch_data));
210 }
211 futures::future::join_all(persist_tasks)
212 .await
213 .into_iter()
214 .map(|res| {
215 if res.is_err() {
216 error!("Failed to persist data with error: {:?}", res);
217 }
218 res
219 })
220 .collect::<IndexerResult<Vec<_>>>()
221 .expect("Persisting data into DB should not fail.");
222 }
223
224 let is_epoch_end = epoch.is_some();
225
226 if let Some(epoch_data) = epoch {
228 state
229 .advance_epoch(epoch_data)
230 .await
231 .tap_err(|e| {
232 error!("Failed to advance epoch with error: {}", e.to_string());
233 })
234 .expect("Advancing epochs in DB should not fail.");
235 metrics.total_epoch_committed.inc();
236
237 state
239 .refresh_participation_metrics()
240 .await
241 .tap_err(|e| {
242 error!("Failed to update participation metrics: {e}");
243 })
244 .expect("Updating participation metrics should not fail.");
245 }
246
247 state
248 .persist_checkpoints(checkpoint_batch)
249 .await
250 .tap_err(|e| {
251 error!(
252 "Failed to persist checkpoint data with error: {}",
253 e.to_string()
254 );
255 })
256 .expect("Persisting data into DB should not fail.");
257
258 if is_epoch_end {
259 let chain_id = state
262 .get_chain_identifier()
263 .await
264 .expect("Failed to get chain identifier")
265 .expect("Chain identifier should have been indexed at this point");
266 let _ = state.persist_protocol_configs_and_feature_flags(chain_id);
267 }
268
269 let elapsed = guard.stop_and_record();
270
271 info!(
272 elapsed,
273 "Checkpoint {}-{} committed with {} transactions.",
274 first_checkpoint_seq,
275 last_checkpoint_seq,
276 tx_count,
277 );
278 metrics
279 .latest_tx_checkpoint_sequence_number
280 .set(last_checkpoint_seq as i64);
281 metrics
282 .total_tx_checkpoint_committed
283 .inc_by(checkpoint_num as u64);
284 metrics.total_transaction_committed.inc_by(tx_count as u64);
285 metrics
286 .transaction_per_checkpoint
287 .observe(tx_count as f64 / (last_checkpoint_seq - first_checkpoint_seq + 1) as f64);
288 metrics
292 .thousand_transaction_avg_db_commit_latency
293 .observe(elapsed * 1000.0 / tx_count as f64);
294}
295
296#[instrument(skip_all, fields(
298first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
299last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
300))]
301async fn commit_checkpoints_v2<S>(
302 state: &S,
303 indexed_checkpoint_batch: Vec<CheckpointDataToCommitV2>,
304 epoch: Option<EpochToCommit>,
305 metrics: &IndexerMetrics,
306) where
307 S: IndexerStoreExt + Clone + Sync + Send + 'static,
308{
309 let mut checkpoint_batch = vec![];
310 let mut tx_batch = vec![];
311 let mut events_batch = vec![];
312 let mut tx_indices_batch = vec![];
313 let mut event_indices_batch = vec![];
314 let mut display_updates_batch = BTreeMap::new();
315 let mut object_changes_batch = vec![];
316 let mut object_history_changes_batch = vec![];
317 let mut object_versions_batch = vec![];
318 let mut packages_batch = vec![];
319
320 for indexed_checkpoint in indexed_checkpoint_batch {
321 let CheckpointDataToCommitV2 {
322 checkpoint,
323 transactions,
324 events,
325 event_indices,
326 tx_indices,
327 display_updates,
328 object_changes,
329 object_history_changes,
330 object_versions,
331 packages,
332 epoch: _,
333 } = indexed_checkpoint;
334 checkpoint_batch.push(checkpoint);
335 tx_batch.push(transactions);
336 events_batch.push(events);
337 tx_indices_batch.push(tx_indices);
338 event_indices_batch.push(event_indices);
339 display_updates_batch.extend(display_updates.into_iter());
340 object_changes_batch.push(object_changes);
341 object_history_changes_batch.push(object_history_changes);
342 object_versions_batch.push(object_versions);
343 packages_batch.push(packages);
344 }
345
346 let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
347 let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
348
349 let guard = metrics.checkpoint_db_commit_latency.start_timer();
350 let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
351 let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
352
353 let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
354 let event_indices_batch = event_indices_batch
355 .into_iter()
356 .flatten()
357 .collect::<Vec<_>>();
358 let object_versions_batch = object_versions_batch
359 .into_iter()
360 .flatten()
361 .collect::<Vec<_>>();
362 let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
363 let checkpoint_num = checkpoint_batch.len();
364 let tx_count = tx_batch.len();
365
366 {
367 let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
368 let mut persist_tasks = vec![
369 state.persist_transactions(tx_batch),
370 state.persist_tx_indices_v2(tx_indices_batch),
371 state.persist_events(events_batch),
372 state.persist_event_indices(event_indices_batch),
373 state.persist_displays(display_updates_batch),
374 state.persist_packages(packages_batch),
375 state.persist_objects(object_changes_batch.clone()),
376 state.persist_object_history(object_history_changes_batch.clone()),
377 state.persist_object_versions(object_versions_batch.clone()),
378 ];
379 if let Some(epoch_data) = epoch.clone() {
380 persist_tasks.push(state.persist_epoch(epoch_data));
381 }
382 futures::future::join_all(persist_tasks)
383 .await
384 .into_iter()
385 .map(|res| {
386 if res.is_err() {
387 error!("Failed to persist data with error: {:?}", res);
388 }
389 res
390 })
391 .collect::<IndexerResult<Vec<_>>>()
392 .expect("Persisting data into DB should not fail.");
393 }
394
395 let is_epoch_end = epoch.is_some();
396
397 if let Some(epoch_data) = epoch {
399 state
400 .advance_epoch(epoch_data)
401 .await
402 .tap_err(|e| {
403 error!("Failed to advance epoch with error: {}", e.to_string());
404 })
405 .expect("Advancing epochs in DB should not fail.");
406 metrics.total_epoch_committed.inc();
407
408 state
410 .refresh_participation_metrics()
411 .await
412 .tap_err(|e| {
413 error!("Failed to update participation metrics: {e}");
414 })
415 .expect("Updating participation metrics should not fail.");
416 }
417
418 state
419 .persist_checkpoints(checkpoint_batch)
420 .await
421 .tap_err(|e| {
422 error!(
423 "Failed to persist checkpoint data with error: {}",
424 e.to_string()
425 );
426 })
427 .expect("Persisting data into DB should not fail.");
428
429 if is_epoch_end {
430 let chain_id = state
433 .get_chain_identifier()
434 .await
435 .expect("Failed to get chain identifier")
436 .expect("Chain identifier should have been indexed at this point");
437 let _ = state.persist_protocol_configs_and_feature_flags(chain_id);
438 }
439
440 let elapsed = guard.stop_and_record();
441
442 info!(
443 elapsed,
444 "Checkpoint {}-{} committed with {} transactions.",
445 first_checkpoint_seq,
446 last_checkpoint_seq,
447 tx_count,
448 );
449 metrics
450 .latest_tx_checkpoint_sequence_number
451 .set(last_checkpoint_seq as i64);
452 metrics
453 .total_tx_checkpoint_committed
454 .inc_by(checkpoint_num as u64);
455 metrics.total_transaction_committed.inc_by(tx_count as u64);
456 metrics
457 .transaction_per_checkpoint
458 .observe(tx_count as f64 / (last_checkpoint_seq - first_checkpoint_seq + 1) as f64);
459 metrics
463 .thousand_transaction_avg_db_commit_latency
464 .observe(elapsed * 1000.0 / tx_count as f64);
465}