iota_indexer/handlers/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, HashMap};
6
7use iota_types::messages_checkpoint::CheckpointSequenceNumber;
8use tap::tap::TapFallible;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info, instrument};
11
12use super::{CheckpointDataToCommit, EpochToCommit};
13use crate::{
14    metrics::IndexerMetrics, models::transactions::TxInsertionOrder, store::IndexerStore,
15    types::IndexerResult,
16};
17
18pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
19
20pub async fn start_tx_checkpoint_commit_task<S>(
21    state: S,
22    metrics: IndexerMetrics,
23    tx_indexing_receiver: iota_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
24    mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
25    cancel: CancellationToken,
26) -> IndexerResult<()>
27where
28    S: IndexerStore + Clone + Sync + Send + 'static,
29{
30    use futures::StreamExt;
31
32    info!("Indexer checkpoint commit task started...");
33    let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
34        .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
35        .parse::<usize>()
36        .unwrap();
37    info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
38
39    let mut stream = iota_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
40        .ready_chunks(checkpoint_commit_batch_size);
41
42    let mut unprocessed = HashMap::new();
43    let mut batch = vec![];
44
45    while let Some(indexed_checkpoint_batch) = stream.next().await {
46        if cancel.is_cancelled() {
47            break;
48        }
49
50        // split the batch into smaller batches per epoch to handle partitioning
51        for checkpoint in indexed_checkpoint_batch {
52            unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
53        }
54        while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
55            let epoch = checkpoint.epoch.clone();
56            batch.push(checkpoint);
57            next_checkpoint_sequence_number += 1;
58            if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
59                commit_checkpoints(&state, batch, epoch, &metrics).await;
60                batch = vec![];
61            }
62        }
63        if !batch.is_empty() && unprocessed.is_empty() {
64            commit_checkpoints(&state, batch, None, &metrics).await;
65            batch = vec![];
66        }
67    }
68    Ok(())
69}
70
71// Unwrap: Caller needs to make sure indexed_checkpoint_batch is not empty
72#[instrument(skip_all, fields(
73    first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
74    last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
75))]
76async fn commit_checkpoints<S>(
77    state: &S,
78    indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
79    epoch: Option<EpochToCommit>,
80    metrics: &IndexerMetrics,
81) where
82    S: IndexerStore + Clone + Sync + Send + 'static,
83{
84    let mut checkpoint_batch = vec![];
85    let mut tx_batch = vec![];
86    let mut events_batch = vec![];
87    let mut tx_indices_batch = vec![];
88    let mut event_indices_batch = vec![];
89    let mut display_updates_batch = BTreeMap::new();
90    let mut object_changes_batch = vec![];
91    let mut object_history_changes_batch = vec![];
92    let mut object_versions_batch = vec![];
93    let mut packages_batch = vec![];
94
95    for indexed_checkpoint in indexed_checkpoint_batch {
96        let CheckpointDataToCommit {
97            checkpoint,
98            transactions,
99            events,
100            event_indices,
101            tx_indices,
102            display_updates,
103            object_changes,
104            object_history_changes,
105            object_versions,
106            packages,
107            epoch: _,
108        } = indexed_checkpoint;
109        checkpoint_batch.push(checkpoint);
110        tx_batch.push(transactions);
111        events_batch.push(events);
112        tx_indices_batch.push(tx_indices);
113        event_indices_batch.push(event_indices);
114        display_updates_batch.extend(display_updates.into_iter());
115        object_changes_batch.push(object_changes);
116        object_history_changes_batch.push(object_history_changes);
117        object_versions_batch.push(object_versions);
118        packages_batch.push(packages);
119    }
120
121    let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
122    let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
123
124    let guard = metrics.checkpoint_db_commit_latency.start_timer();
125    let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
126    let tx_order_batch = tx_batch
127        .iter()
128        .map(|indexed_tx| TxInsertionOrder {
129            insertion_order: -1, // fill with any value since it's ignored upon insert
130            tx_digest: indexed_tx.tx_digest.into_inner().to_vec(),
131        })
132        .collect::<Vec<_>>();
133    let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
134    let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
135    let event_indices_batch = event_indices_batch
136        .into_iter()
137        .flatten()
138        .collect::<Vec<_>>();
139    let object_versions_batch = object_versions_batch
140        .into_iter()
141        .flatten()
142        .collect::<Vec<_>>();
143    let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
144    let checkpoint_num = checkpoint_batch.len();
145    let tx_count = tx_batch.len();
146
147    {
148        let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
149        let mut persist_tasks = vec![
150            state.persist_transactions(tx_batch),
151            state.persist_tx_insertion_order(tx_order_batch),
152            state.persist_tx_indices(tx_indices_batch),
153            state.persist_events(events_batch),
154            state.persist_event_indices(event_indices_batch),
155            state.persist_displays(display_updates_batch),
156            state.persist_packages(packages_batch),
157            state.persist_objects(object_changes_batch.clone()),
158            state.persist_object_history(object_history_changes_batch.clone()),
159            state.persist_object_versions(object_versions_batch.clone()),
160        ];
161        if let Some(epoch_data) = epoch.clone() {
162            persist_tasks.push(state.persist_epoch(epoch_data));
163        }
164        futures::future::join_all(persist_tasks)
165            .await
166            .into_iter()
167            .map(|res| {
168                if res.is_err() {
169                    error!("Failed to persist data with error: {:?}", res);
170                }
171                res
172            })
173            .collect::<IndexerResult<Vec<_>>>()
174            .expect("Persisting data into DB should not fail.");
175    }
176
177    let is_epoch_end = epoch.is_some();
178
179    // handle partitioning on epoch boundary
180    if let Some(epoch_data) = epoch {
181        state
182            .advance_epoch(epoch_data)
183            .await
184            .tap_err(|e| {
185                error!("Failed to advance epoch with error: {}", e.to_string());
186            })
187            .expect("Advancing epochs in DB should not fail.");
188        metrics.total_epoch_committed.inc();
189
190        // Refresh participation metrics after advancing epoch
191        state
192            .refresh_participation_metrics()
193            .await
194            .tap_err(|e| {
195                error!("Failed to update participation metrics: {e}");
196            })
197            .expect("Updating participation metrics should not fail.");
198    }
199
200    state
201        .persist_checkpoints(checkpoint_batch)
202        .await
203        .tap_err(|e| {
204            error!(
205                "Failed to persist checkpoint data with error: {}",
206                e.to_string()
207            );
208        })
209        .expect("Persisting data into DB should not fail.");
210
211    if is_epoch_end {
212        // The epoch has advanced so we update the configs for the new protocol version,
213        // if it has changed.
214        let chain_id = state
215            .get_chain_identifier()
216            .await
217            .expect("Failed to get chain identifier")
218            .expect("Chain identifier should have been indexed at this point");
219        let _ = state.persist_protocol_configs_and_feature_flags(chain_id);
220    }
221
222    let elapsed = guard.stop_and_record();
223
224    info!(
225        elapsed,
226        "Checkpoint {}-{} committed with {} transactions.",
227        first_checkpoint_seq,
228        last_checkpoint_seq,
229        tx_count,
230    );
231    metrics
232        .latest_tx_checkpoint_sequence_number
233        .set(last_checkpoint_seq as i64);
234    metrics
235        .total_tx_checkpoint_committed
236        .inc_by(checkpoint_num as u64);
237    metrics.total_transaction_committed.inc_by(tx_count as u64);
238    metrics
239        .transaction_per_checkpoint
240        .observe(tx_count as f64 / (last_checkpoint_seq - first_checkpoint_seq + 1) as f64);
241    // 1000.0 is not necessarily the batch size, it's to roughly map average tx
242    // commit latency to [0.1, 1] seconds, which is well covered by
243    // DB_COMMIT_LATENCY_SEC_BUCKETS.
244    metrics
245        .thousand_transaction_avg_db_commit_latency
246        .observe(elapsed * 1000.0 / tx_count as f64);
247}