iota_indexer/handlers/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, HashMap};
6
7use iota_types::messages_checkpoint::CheckpointSequenceNumber;
8use tap::tap::TapFallible;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info, instrument};
11
12use super::{CheckpointDataToCommit, CheckpointDataToCommitV2, EpochToCommit};
13use crate::{
14    metrics::IndexerMetrics,
15    store::{IndexerStore, IndexerStoreExt},
16    types::IndexerResult,
17};
18
19pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
20
21pub async fn start_tx_checkpoint_commit_task<S>(
22    state: S,
23    metrics: IndexerMetrics,
24    tx_indexing_receiver: iota_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
25    mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
26    cancel: CancellationToken,
27) -> IndexerResult<()>
28where
29    S: IndexerStore + Clone + Sync + Send + 'static,
30{
31    use futures::StreamExt;
32
33    info!("Indexer checkpoint commit task started...");
34    let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
35        .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
36        .parse::<usize>()
37        .unwrap();
38    info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
39
40    let mut stream = iota_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
41        .ready_chunks(checkpoint_commit_batch_size);
42
43    let mut unprocessed = HashMap::new();
44    let mut batch = vec![];
45
46    while let Some(indexed_checkpoint_batch) = stream.next().await {
47        if cancel.is_cancelled() {
48            break;
49        }
50
51        // split the batch into smaller batches per epoch to handle partitioning
52        for checkpoint in indexed_checkpoint_batch {
53            unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
54        }
55        while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
56            let epoch = checkpoint.epoch.clone();
57            batch.push(checkpoint);
58            next_checkpoint_sequence_number += 1;
59            if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
60                commit_checkpoints(&state, batch, epoch, &metrics).await;
61                batch = vec![];
62            }
63        }
64        if !batch.is_empty() {
65            commit_checkpoints(&state, batch, None, &metrics).await;
66            batch = vec![];
67        }
68    }
69    Ok(())
70}
71
72#[expect(unused)]
73pub(crate) async fn start_tx_checkpoint_commit_task_v2<S>(
74    state: S,
75    metrics: IndexerMetrics,
76    tx_indexing_receiver: iota_metrics::metered_channel::Receiver<CheckpointDataToCommitV2>,
77    mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
78    cancel: CancellationToken,
79) -> IndexerResult<()>
80where
81    S: IndexerStoreExt + Clone + Sync + Send + 'static,
82{
83    use futures::StreamExt;
84
85    info!("Indexer checkpoint commit task started...");
86    let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
87        .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
88        .parse::<usize>()
89        .unwrap();
90    info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
91
92    let mut stream = iota_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
93        .ready_chunks(checkpoint_commit_batch_size);
94
95    let mut unprocessed = HashMap::new();
96    let mut batch = vec![];
97
98    while let Some(indexed_checkpoint_batch) = stream.next().await {
99        if cancel.is_cancelled() {
100            break;
101        }
102
103        // split the batch into smaller batches per epoch to handle partitioning
104        for checkpoint in indexed_checkpoint_batch {
105            unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
106        }
107        while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
108            let epoch = checkpoint.epoch.clone();
109            batch.push(checkpoint);
110            next_checkpoint_sequence_number += 1;
111            if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
112                commit_checkpoints_v2(&state, batch, epoch, &metrics).await;
113                batch = vec![];
114            }
115        }
116        if !batch.is_empty() && unprocessed.is_empty() {
117            commit_checkpoints_v2(&state, batch, None, &metrics).await;
118            batch = vec![];
119        }
120    }
121    Ok(())
122}
123
124// Unwrap: Caller needs to make sure indexed_checkpoint_batch is not empty
125#[instrument(skip_all, fields(
126    first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
127    last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
128))]
129async fn commit_checkpoints<S>(
130    state: &S,
131    indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
132    epoch: Option<EpochToCommit>,
133    metrics: &IndexerMetrics,
134) where
135    S: IndexerStore + Clone + Sync + Send + 'static,
136{
137    let mut checkpoint_batch = vec![];
138    let mut tx_batch = vec![];
139    let mut events_batch = vec![];
140    let mut tx_indices_batch = vec![];
141    let mut event_indices_batch = vec![];
142    let mut display_updates_batch = BTreeMap::new();
143    let mut object_changes_batch = vec![];
144    let mut object_history_changes_batch = vec![];
145    let mut object_versions_batch = vec![];
146    let mut packages_batch = vec![];
147
148    for indexed_checkpoint in indexed_checkpoint_batch {
149        let CheckpointDataToCommit {
150            checkpoint,
151            transactions,
152            events,
153            event_indices,
154            tx_indices,
155            display_updates,
156            object_changes,
157            object_history_changes,
158            object_versions,
159            packages,
160            epoch: _,
161        } = indexed_checkpoint;
162        checkpoint_batch.push(checkpoint);
163        tx_batch.push(transactions);
164        events_batch.push(events);
165        tx_indices_batch.push(tx_indices);
166        event_indices_batch.push(event_indices);
167        display_updates_batch.extend(display_updates.into_iter());
168        object_changes_batch.push(object_changes);
169        object_history_changes_batch.push(object_history_changes);
170        object_versions_batch.push(object_versions);
171        packages_batch.push(packages);
172    }
173
174    let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
175    let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
176
177    let guard = metrics.checkpoint_db_commit_latency.start_timer();
178    let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
179
180    let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
181
182    let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
183    let event_indices_batch = event_indices_batch
184        .into_iter()
185        .flatten()
186        .collect::<Vec<_>>();
187    let object_versions_batch = object_versions_batch
188        .into_iter()
189        .flatten()
190        .collect::<Vec<_>>();
191    let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
192    let checkpoint_num = checkpoint_batch.len();
193    let tx_count = tx_batch.len();
194
195    {
196        let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
197        let mut persist_tasks = vec![
198            state.persist_transactions(tx_batch),
199            state.persist_tx_indices(tx_indices_batch),
200            state.persist_events(events_batch),
201            state.persist_event_indices(event_indices_batch),
202            state.persist_displays(display_updates_batch),
203            state.persist_packages(packages_batch),
204            state.persist_objects(object_changes_batch.clone()),
205            state.persist_object_history(object_history_changes_batch.clone()),
206            state.persist_object_versions(object_versions_batch.clone()),
207        ];
208        if let Some(epoch_data) = epoch.clone() {
209            persist_tasks.push(state.persist_epoch(epoch_data));
210        }
211        futures::future::join_all(persist_tasks)
212            .await
213            .into_iter()
214            .map(|res| {
215                if res.is_err() {
216                    error!("Failed to persist data with error: {:?}", res);
217                }
218                res
219            })
220            .collect::<IndexerResult<Vec<_>>>()
221            .expect("Persisting data into DB should not fail.");
222    }
223
224    let is_epoch_end = epoch.is_some();
225
226    // handle partitioning on epoch boundary
227    if let Some(epoch_data) = epoch {
228        state
229            .advance_epoch(epoch_data)
230            .await
231            .tap_err(|e| {
232                error!("Failed to advance epoch with error: {}", e.to_string());
233            })
234            .expect("Advancing epochs in DB should not fail.");
235        metrics.total_epoch_committed.inc();
236
237        // Refresh participation metrics after advancing epoch
238        state
239            .refresh_participation_metrics()
240            .await
241            .tap_err(|e| {
242                error!("Failed to update participation metrics: {e}");
243            })
244            .expect("Updating participation metrics should not fail.");
245    }
246
247    state
248        .persist_checkpoints(checkpoint_batch)
249        .await
250        .tap_err(|e| {
251            error!(
252                "Failed to persist checkpoint data with error: {}",
253                e.to_string()
254            );
255        })
256        .expect("Persisting data into DB should not fail.");
257
258    if is_epoch_end {
259        // The epoch has advanced so we update the configs for the new protocol version,
260        // if it has changed.
261        let chain_id = state
262            .get_chain_identifier()
263            .await
264            .expect("Failed to get chain identifier")
265            .expect("Chain identifier should have been indexed at this point");
266        let _ = state.persist_protocol_configs_and_feature_flags(chain_id);
267    }
268
269    let elapsed = guard.stop_and_record();
270
271    info!(
272        elapsed,
273        "Checkpoint {}-{} committed with {} transactions.",
274        first_checkpoint_seq,
275        last_checkpoint_seq,
276        tx_count,
277    );
278    metrics
279        .latest_tx_checkpoint_sequence_number
280        .set(last_checkpoint_seq as i64);
281    metrics
282        .total_tx_checkpoint_committed
283        .inc_by(checkpoint_num as u64);
284    metrics.total_transaction_committed.inc_by(tx_count as u64);
285    metrics
286        .transaction_per_checkpoint
287        .observe(tx_count as f64 / (last_checkpoint_seq - first_checkpoint_seq + 1) as f64);
288    // 1000.0 is not necessarily the batch size, it's to roughly map average tx
289    // commit latency to [0.1, 1] seconds, which is well covered by
290    // DB_COMMIT_LATENCY_SEC_BUCKETS.
291    metrics
292        .thousand_transaction_avg_db_commit_latency
293        .observe(elapsed * 1000.0 / tx_count as f64);
294}
295
296// Unwrap: Caller needs to make sure indexed_checkpoint_batch is not empty
297#[instrument(skip_all, fields(
298first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
299last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
300))]
301async fn commit_checkpoints_v2<S>(
302    state: &S,
303    indexed_checkpoint_batch: Vec<CheckpointDataToCommitV2>,
304    epoch: Option<EpochToCommit>,
305    metrics: &IndexerMetrics,
306) where
307    S: IndexerStoreExt + Clone + Sync + Send + 'static,
308{
309    let mut checkpoint_batch = vec![];
310    let mut tx_batch = vec![];
311    let mut events_batch = vec![];
312    let mut tx_indices_batch = vec![];
313    let mut event_indices_batch = vec![];
314    let mut display_updates_batch = BTreeMap::new();
315    let mut object_changes_batch = vec![];
316    let mut object_history_changes_batch = vec![];
317    let mut object_versions_batch = vec![];
318    let mut packages_batch = vec![];
319
320    for indexed_checkpoint in indexed_checkpoint_batch {
321        let CheckpointDataToCommitV2 {
322            checkpoint,
323            transactions,
324            events,
325            event_indices,
326            tx_indices,
327            display_updates,
328            object_changes,
329            object_history_changes,
330            object_versions,
331            packages,
332            epoch: _,
333        } = indexed_checkpoint;
334        checkpoint_batch.push(checkpoint);
335        tx_batch.push(transactions);
336        events_batch.push(events);
337        tx_indices_batch.push(tx_indices);
338        event_indices_batch.push(event_indices);
339        display_updates_batch.extend(display_updates.into_iter());
340        object_changes_batch.push(object_changes);
341        object_history_changes_batch.push(object_history_changes);
342        object_versions_batch.push(object_versions);
343        packages_batch.push(packages);
344    }
345
346    let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
347    let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
348
349    let guard = metrics.checkpoint_db_commit_latency.start_timer();
350    let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
351    let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
352
353    let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
354    let event_indices_batch = event_indices_batch
355        .into_iter()
356        .flatten()
357        .collect::<Vec<_>>();
358    let object_versions_batch = object_versions_batch
359        .into_iter()
360        .flatten()
361        .collect::<Vec<_>>();
362    let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
363    let checkpoint_num = checkpoint_batch.len();
364    let tx_count = tx_batch.len();
365
366    {
367        let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
368        let mut persist_tasks = vec![
369            state.persist_transactions(tx_batch),
370            state.persist_tx_indices_v2(tx_indices_batch),
371            state.persist_events(events_batch),
372            state.persist_event_indices(event_indices_batch),
373            state.persist_displays(display_updates_batch),
374            state.persist_packages(packages_batch),
375            state.persist_objects(object_changes_batch.clone()),
376            state.persist_object_history(object_history_changes_batch.clone()),
377            state.persist_object_versions(object_versions_batch.clone()),
378        ];
379        if let Some(epoch_data) = epoch.clone() {
380            persist_tasks.push(state.persist_epoch(epoch_data));
381        }
382        futures::future::join_all(persist_tasks)
383            .await
384            .into_iter()
385            .map(|res| {
386                if res.is_err() {
387                    error!("Failed to persist data with error: {:?}", res);
388                }
389                res
390            })
391            .collect::<IndexerResult<Vec<_>>>()
392            .expect("Persisting data into DB should not fail.");
393    }
394
395    let is_epoch_end = epoch.is_some();
396
397    // handle partitioning on epoch boundary
398    if let Some(epoch_data) = epoch {
399        state
400            .advance_epoch(epoch_data)
401            .await
402            .tap_err(|e| {
403                error!("Failed to advance epoch with error: {}", e.to_string());
404            })
405            .expect("Advancing epochs in DB should not fail.");
406        metrics.total_epoch_committed.inc();
407
408        // Refresh participation metrics after advancing epoch
409        state
410            .refresh_participation_metrics()
411            .await
412            .tap_err(|e| {
413                error!("Failed to update participation metrics: {e}");
414            })
415            .expect("Updating participation metrics should not fail.");
416    }
417
418    state
419        .persist_checkpoints(checkpoint_batch)
420        .await
421        .tap_err(|e| {
422            error!(
423                "Failed to persist checkpoint data with error: {}",
424                e.to_string()
425            );
426        })
427        .expect("Persisting data into DB should not fail.");
428
429    if is_epoch_end {
430        // The epoch has advanced so we update the configs for the new protocol version,
431        // if it has changed.
432        let chain_id = state
433            .get_chain_identifier()
434            .await
435            .expect("Failed to get chain identifier")
436            .expect("Chain identifier should have been indexed at this point");
437        let _ = state.persist_protocol_configs_and_feature_flags(chain_id);
438    }
439
440    let elapsed = guard.stop_and_record();
441
442    info!(
443        elapsed,
444        "Checkpoint {}-{} committed with {} transactions.",
445        first_checkpoint_seq,
446        last_checkpoint_seq,
447        tx_count,
448    );
449    metrics
450        .latest_tx_checkpoint_sequence_number
451        .set(last_checkpoint_seq as i64);
452    metrics
453        .total_tx_checkpoint_committed
454        .inc_by(checkpoint_num as u64);
455    metrics.total_transaction_committed.inc_by(tx_count as u64);
456    metrics
457        .transaction_per_checkpoint
458        .observe(tx_count as f64 / (last_checkpoint_seq - first_checkpoint_seq + 1) as f64);
459    // 1000.0 is not necessarily the batch size, it's to roughly map average tx
460    // commit latency to [0.1, 1] seconds, which is well covered by
461    // DB_COMMIT_LATENCY_SEC_BUCKETS.
462    metrics
463        .thousand_transaction_avg_db_commit_latency
464        .observe(elapsed * 1000.0 / tx_count as f64);
465}