iota_indexer/handlers/
committer.rs1use std::collections::{BTreeMap, HashMap};
6
7use iota_types::messages_checkpoint::CheckpointSequenceNumber;
8use tap::tap::TapFallible;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info, instrument};
11
12use super::{CheckpointDataToCommit, EpochToCommit};
13use crate::{
14 metrics::IndexerMetrics, models::transactions::TxInsertionOrder, store::IndexerStore,
15 types::IndexerResult,
16};
17
18pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
19
20pub async fn start_tx_checkpoint_commit_task<S>(
21 state: S,
22 metrics: IndexerMetrics,
23 tx_indexing_receiver: iota_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
24 mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
25 cancel: CancellationToken,
26) -> IndexerResult<()>
27where
28 S: IndexerStore + Clone + Sync + Send + 'static,
29{
30 use futures::StreamExt;
31
32 info!("Indexer checkpoint commit task started...");
33 let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
34 .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
35 .parse::<usize>()
36 .unwrap();
37 info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
38
39 let mut stream = iota_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
40 .ready_chunks(checkpoint_commit_batch_size);
41
42 let mut unprocessed = HashMap::new();
43 let mut batch = vec![];
44
45 while let Some(indexed_checkpoint_batch) = stream.next().await {
46 if cancel.is_cancelled() {
47 break;
48 }
49
50 for checkpoint in indexed_checkpoint_batch {
52 unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
53 }
54 while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
55 let epoch = checkpoint.epoch.clone();
56 batch.push(checkpoint);
57 next_checkpoint_sequence_number += 1;
58 if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
59 commit_checkpoints(&state, batch, epoch, &metrics).await;
60 batch = vec![];
61 }
62 }
63 if !batch.is_empty() && unprocessed.is_empty() {
64 commit_checkpoints(&state, batch, None, &metrics).await;
65 batch = vec![];
66 }
67 }
68 Ok(())
69}
70
71#[instrument(skip_all, fields(
73 first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
74 last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
75))]
76async fn commit_checkpoints<S>(
77 state: &S,
78 indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
79 epoch: Option<EpochToCommit>,
80 metrics: &IndexerMetrics,
81) where
82 S: IndexerStore + Clone + Sync + Send + 'static,
83{
84 let mut checkpoint_batch = vec![];
85 let mut tx_batch = vec![];
86 let mut events_batch = vec![];
87 let mut tx_indices_batch = vec![];
88 let mut event_indices_batch = vec![];
89 let mut display_updates_batch = BTreeMap::new();
90 let mut object_changes_batch = vec![];
91 let mut object_history_changes_batch = vec![];
92 let mut object_versions_batch = vec![];
93 let mut packages_batch = vec![];
94
95 for indexed_checkpoint in indexed_checkpoint_batch {
96 let CheckpointDataToCommit {
97 checkpoint,
98 transactions,
99 events,
100 event_indices,
101 tx_indices,
102 display_updates,
103 object_changes,
104 object_history_changes,
105 object_versions,
106 packages,
107 epoch: _,
108 } = indexed_checkpoint;
109 checkpoint_batch.push(checkpoint);
110 tx_batch.push(transactions);
111 events_batch.push(events);
112 tx_indices_batch.push(tx_indices);
113 event_indices_batch.push(event_indices);
114 display_updates_batch.extend(display_updates.into_iter());
115 object_changes_batch.push(object_changes);
116 object_history_changes_batch.push(object_history_changes);
117 object_versions_batch.push(object_versions);
118 packages_batch.push(packages);
119 }
120
121 let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
122 let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
123
124 let guard = metrics.checkpoint_db_commit_latency.start_timer();
125 let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
126 let tx_order_batch = tx_batch
127 .iter()
128 .map(|indexed_tx| TxInsertionOrder {
129 insertion_order: -1, tx_digest: indexed_tx.tx_digest.into_inner().to_vec(),
131 })
132 .collect::<Vec<_>>();
133 let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
134 let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
135 let event_indices_batch = event_indices_batch
136 .into_iter()
137 .flatten()
138 .collect::<Vec<_>>();
139 let object_versions_batch = object_versions_batch
140 .into_iter()
141 .flatten()
142 .collect::<Vec<_>>();
143 let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
144 let checkpoint_num = checkpoint_batch.len();
145 let tx_count = tx_batch.len();
146
147 {
148 let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
149 let mut persist_tasks = vec![
150 state.persist_transactions(tx_batch),
151 state.persist_tx_insertion_order(tx_order_batch),
152 state.persist_tx_indices(tx_indices_batch),
153 state.persist_events(events_batch),
154 state.persist_event_indices(event_indices_batch),
155 state.persist_displays(display_updates_batch),
156 state.persist_packages(packages_batch),
157 state.persist_objects(object_changes_batch.clone()),
158 state.persist_object_history(object_history_changes_batch.clone()),
159 state.persist_object_versions(object_versions_batch.clone()),
160 ];
161 if let Some(epoch_data) = epoch.clone() {
162 persist_tasks.push(state.persist_epoch(epoch_data));
163 }
164 futures::future::join_all(persist_tasks)
165 .await
166 .into_iter()
167 .map(|res| {
168 if res.is_err() {
169 error!("Failed to persist data with error: {:?}", res);
170 }
171 res
172 })
173 .collect::<IndexerResult<Vec<_>>>()
174 .expect("Persisting data into DB should not fail.");
175 }
176
177 let is_epoch_end = epoch.is_some();
178
179 if let Some(epoch_data) = epoch {
181 state
182 .advance_epoch(epoch_data)
183 .await
184 .tap_err(|e| {
185 error!("Failed to advance epoch with error: {}", e.to_string());
186 })
187 .expect("Advancing epochs in DB should not fail.");
188 metrics.total_epoch_committed.inc();
189
190 state
192 .refresh_participation_metrics()
193 .await
194 .tap_err(|e| {
195 error!("Failed to update participation metrics: {e}");
196 })
197 .expect("Updating participation metrics should not fail.");
198 }
199
200 state
201 .persist_checkpoints(checkpoint_batch)
202 .await
203 .tap_err(|e| {
204 error!(
205 "Failed to persist checkpoint data with error: {}",
206 e.to_string()
207 );
208 })
209 .expect("Persisting data into DB should not fail.");
210
211 if is_epoch_end {
212 let chain_id = state
215 .get_chain_identifier()
216 .await
217 .expect("Failed to get chain identifier")
218 .expect("Chain identifier should have been indexed at this point");
219 let _ = state.persist_protocol_configs_and_feature_flags(chain_id);
220 }
221
222 let elapsed = guard.stop_and_record();
223
224 info!(
225 elapsed,
226 "Checkpoint {}-{} committed with {} transactions.",
227 first_checkpoint_seq,
228 last_checkpoint_seq,
229 tx_count,
230 );
231 metrics
232 .latest_tx_checkpoint_sequence_number
233 .set(last_checkpoint_seq as i64);
234 metrics
235 .total_tx_checkpoint_committed
236 .inc_by(checkpoint_num as u64);
237 metrics.total_transaction_committed.inc_by(tx_count as u64);
238 metrics
239 .transaction_per_checkpoint
240 .observe(tx_count as f64 / (last_checkpoint_seq - first_checkpoint_seq + 1) as f64);
241 metrics
245 .thousand_transaction_avg_db_commit_latency
246 .observe(elapsed * 1000.0 / tx_count as f64);
247}