iota_indexer/handlers/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, HashMap};
6
7use iota_types::messages_checkpoint::CheckpointSequenceNumber;
8use tap::tap::TapFallible;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info, instrument};
11
12use super::{CheckpointDataToCommit, EpochToCommit};
13use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
14
15pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
16
17pub async fn start_tx_checkpoint_commit_task<S>(
18    state: S,
19    metrics: IndexerMetrics,
20    tx_indexing_receiver: iota_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
21    mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
22    cancel: CancellationToken,
23) -> IndexerResult<()>
24where
25    S: IndexerStore + Clone + Sync + Send + 'static,
26{
27    use futures::StreamExt;
28
29    info!("Indexer checkpoint commit task started...");
30    let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
31        .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
32        .parse::<usize>()
33        .unwrap();
34    info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
35
36    let mut stream = iota_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
37        .ready_chunks(checkpoint_commit_batch_size);
38
39    let mut unprocessed = HashMap::new();
40    let mut batch = vec![];
41
42    while let Some(indexed_checkpoint_batch) = stream.next().await {
43        if cancel.is_cancelled() {
44            break;
45        }
46
47        // split the batch into smaller batches per epoch to handle partitioning
48        for checkpoint in indexed_checkpoint_batch {
49            unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
50        }
51        while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
52            let epoch = checkpoint.epoch.clone();
53            batch.push(checkpoint);
54            next_checkpoint_sequence_number += 1;
55            if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
56                commit_checkpoints(&state, batch, epoch, &metrics).await;
57                batch = vec![];
58            }
59        }
60        if !batch.is_empty() && unprocessed.is_empty() {
61            commit_checkpoints(&state, batch, None, &metrics).await;
62            batch = vec![];
63        }
64    }
65    Ok(())
66}
67
68// Unwrap: Caller needs to make sure indexed_checkpoint_batch is not empty
69#[instrument(skip_all, fields(
70    first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
71    last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
72))]
73async fn commit_checkpoints<S>(
74    state: &S,
75    indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
76    epoch: Option<EpochToCommit>,
77    metrics: &IndexerMetrics,
78) where
79    S: IndexerStore + Clone + Sync + Send + 'static,
80{
81    let mut checkpoint_batch = vec![];
82    let mut tx_batch = vec![];
83    let mut events_batch = vec![];
84    let mut tx_indices_batch = vec![];
85    let mut event_indices_batch = vec![];
86    let mut display_updates_batch = BTreeMap::new();
87    let mut object_changes_batch = vec![];
88    let mut object_history_changes_batch = vec![];
89    let mut object_versions_batch = vec![];
90    let mut packages_batch = vec![];
91
92    for indexed_checkpoint in indexed_checkpoint_batch {
93        let CheckpointDataToCommit {
94            checkpoint,
95            transactions,
96            events,
97            event_indices,
98            tx_indices,
99            display_updates,
100            object_changes,
101            object_history_changes,
102            object_versions,
103            packages,
104            epoch: _,
105        } = indexed_checkpoint;
106        checkpoint_batch.push(checkpoint);
107        tx_batch.push(transactions);
108        events_batch.push(events);
109        tx_indices_batch.push(tx_indices);
110        event_indices_batch.push(event_indices);
111        display_updates_batch.extend(display_updates.into_iter());
112        object_changes_batch.push(object_changes);
113        object_history_changes_batch.push(object_history_changes);
114        object_versions_batch.push(object_versions);
115        packages_batch.push(packages);
116    }
117
118    let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
119    let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
120
121    let guard = metrics.checkpoint_db_commit_latency.start_timer();
122    let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
123    let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
124    let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
125    let event_indices_batch = event_indices_batch
126        .into_iter()
127        .flatten()
128        .collect::<Vec<_>>();
129    let object_versions_batch = object_versions_batch
130        .into_iter()
131        .flatten()
132        .collect::<Vec<_>>();
133    let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
134    let checkpoint_num = checkpoint_batch.len();
135    let tx_count = tx_batch.len();
136
137    {
138        let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
139        let mut persist_tasks = vec![
140            state.persist_transactions(tx_batch),
141            state.persist_tx_indices(tx_indices_batch),
142            state.persist_events(events_batch),
143            state.persist_event_indices(event_indices_batch),
144            state.persist_displays(display_updates_batch),
145            state.persist_packages(packages_batch),
146            state.persist_objects(object_changes_batch.clone()),
147            state.persist_object_history(object_history_changes_batch.clone()),
148            state.persist_object_versions(object_versions_batch.clone()),
149        ];
150        if let Some(epoch_data) = epoch.clone() {
151            persist_tasks.push(state.persist_epoch(epoch_data));
152        }
153        futures::future::join_all(persist_tasks)
154            .await
155            .into_iter()
156            .map(|res| {
157                if res.is_err() {
158                    error!("Failed to persist data with error: {:?}", res);
159                }
160                res
161            })
162            .collect::<IndexerResult<Vec<_>>>()
163            .expect("Persisting data into DB should not fail.");
164    }
165
166    let is_epoch_end = epoch.is_some();
167
168    // handle partitioning on epoch boundary
169    if let Some(epoch_data) = epoch {
170        state
171            .advance_epoch(epoch_data)
172            .await
173            .tap_err(|e| {
174                error!("Failed to advance epoch with error: {}", e.to_string());
175            })
176            .expect("Advancing epochs in DB should not fail.");
177        metrics.total_epoch_committed.inc();
178
179        // Refresh participation metrics after advancing epoch
180        state
181            .refresh_participation_metrics()
182            .await
183            .tap_err(|e| {
184                error!("Failed to update participation metrics: {e}");
185            })
186            .expect("Updating participation metrics should not fail.");
187    }
188
189    state
190        .persist_checkpoints(checkpoint_batch)
191        .await
192        .tap_err(|e| {
193            error!(
194                "Failed to persist checkpoint data with error: {}",
195                e.to_string()
196            );
197        })
198        .expect("Persisting data into DB should not fail.");
199
200    if is_epoch_end {
201        // The epoch has advanced so we update the configs for the new protocol version,
202        // if it has changed.
203        let chain_id = state
204            .get_chain_identifier()
205            .await
206            .expect("Failed to get chain identifier")
207            .expect("Chain identifier should have been indexed at this point");
208        let _ = state.persist_protocol_configs_and_feature_flags(chain_id);
209    }
210
211    let elapsed = guard.stop_and_record();
212
213    info!(
214        elapsed,
215        "Checkpoint {}-{} committed with {} transactions.",
216        first_checkpoint_seq,
217        last_checkpoint_seq,
218        tx_count,
219    );
220    metrics
221        .latest_tx_checkpoint_sequence_number
222        .set(last_checkpoint_seq as i64);
223    metrics
224        .total_tx_checkpoint_committed
225        .inc_by(checkpoint_num as u64);
226    metrics.total_transaction_committed.inc_by(tx_count as u64);
227    metrics
228        .transaction_per_checkpoint
229        .observe(tx_count as f64 / (last_checkpoint_seq - first_checkpoint_seq + 1) as f64);
230    // 1000.0 is not necessarily the batch size, it's to roughly map average tx
231    // commit latency to [0.1, 1] seconds, which is well covered by
232    // DB_COMMIT_LATENCY_SEC_BUCKETS.
233    metrics
234        .thousand_transaction_avg_db_commit_latency
235        .observe(elapsed * 1000.0 / tx_count as f64);
236}