iota_indexer/handlers/
committer.rs1use std::collections::{BTreeMap, HashMap};
6
7use iota_types::messages_checkpoint::CheckpointSequenceNumber;
8use tap::tap::TapFallible;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info, instrument};
11
12use super::{CheckpointDataToCommit, EpochToCommit};
13use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
14
15pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
16
17pub async fn start_tx_checkpoint_commit_task<S>(
18 state: S,
19 metrics: IndexerMetrics,
20 tx_indexing_receiver: iota_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
21 mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
22 cancel: CancellationToken,
23) -> IndexerResult<()>
24where
25 S: IndexerStore + Clone + Sync + Send + 'static,
26{
27 use futures::StreamExt;
28
29 info!("Indexer checkpoint commit task started...");
30 let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
31 .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
32 .parse::<usize>()
33 .unwrap();
34 info!("Using checkpoint commit batch size {checkpoint_commit_batch_size}");
35
36 let mut stream = iota_metrics::metered_channel::ReceiverStream::new(tx_indexing_receiver)
37 .ready_chunks(checkpoint_commit_batch_size);
38
39 let mut unprocessed = HashMap::new();
40 let mut batch = vec![];
41
42 while let Some(indexed_checkpoint_batch) = stream.next().await {
43 if cancel.is_cancelled() {
44 break;
45 }
46
47 for checkpoint in indexed_checkpoint_batch {
49 unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
50 }
51 while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
52 let epoch = checkpoint.epoch.clone();
53 batch.push(checkpoint);
54 next_checkpoint_sequence_number += 1;
55 if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
56 commit_checkpoints(&state, batch, epoch, &metrics).await;
57 batch = vec![];
58 }
59 }
60 if !batch.is_empty() && unprocessed.is_empty() {
61 commit_checkpoints(&state, batch, None, &metrics).await;
62 batch = vec![];
63 }
64 }
65 Ok(())
66}
67
68#[instrument(skip_all, fields(
70 first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
71 last = indexed_checkpoint_batch.last().as_ref().unwrap().checkpoint.sequence_number
72))]
73async fn commit_checkpoints<S>(
74 state: &S,
75 indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
76 epoch: Option<EpochToCommit>,
77 metrics: &IndexerMetrics,
78) where
79 S: IndexerStore + Clone + Sync + Send + 'static,
80{
81 let mut checkpoint_batch = vec![];
82 let mut tx_batch = vec![];
83 let mut events_batch = vec![];
84 let mut tx_indices_batch = vec![];
85 let mut event_indices_batch = vec![];
86 let mut display_updates_batch = BTreeMap::new();
87 let mut object_changes_batch = vec![];
88 let mut object_history_changes_batch = vec![];
89 let mut object_versions_batch = vec![];
90 let mut packages_batch = vec![];
91
92 for indexed_checkpoint in indexed_checkpoint_batch {
93 let CheckpointDataToCommit {
94 checkpoint,
95 transactions,
96 events,
97 event_indices,
98 tx_indices,
99 display_updates,
100 object_changes,
101 object_history_changes,
102 object_versions,
103 packages,
104 epoch: _,
105 } = indexed_checkpoint;
106 checkpoint_batch.push(checkpoint);
107 tx_batch.push(transactions);
108 events_batch.push(events);
109 tx_indices_batch.push(tx_indices);
110 event_indices_batch.push(event_indices);
111 display_updates_batch.extend(display_updates.into_iter());
112 object_changes_batch.push(object_changes);
113 object_history_changes_batch.push(object_history_changes);
114 object_versions_batch.push(object_versions);
115 packages_batch.push(packages);
116 }
117
118 let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
119 let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
120
121 let guard = metrics.checkpoint_db_commit_latency.start_timer();
122 let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
123 let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
124 let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
125 let event_indices_batch = event_indices_batch
126 .into_iter()
127 .flatten()
128 .collect::<Vec<_>>();
129 let object_versions_batch = object_versions_batch
130 .into_iter()
131 .flatten()
132 .collect::<Vec<_>>();
133 let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
134 let checkpoint_num = checkpoint_batch.len();
135 let tx_count = tx_batch.len();
136
137 {
138 let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
139 let mut persist_tasks = vec![
140 state.persist_transactions(tx_batch),
141 state.persist_tx_indices(tx_indices_batch),
142 state.persist_events(events_batch),
143 state.persist_event_indices(event_indices_batch),
144 state.persist_displays(display_updates_batch),
145 state.persist_packages(packages_batch),
146 state.persist_objects(object_changes_batch.clone()),
147 state.persist_object_history(object_history_changes_batch.clone()),
148 state.persist_object_versions(object_versions_batch.clone()),
149 ];
150 if let Some(epoch_data) = epoch.clone() {
151 persist_tasks.push(state.persist_epoch(epoch_data));
152 }
153 futures::future::join_all(persist_tasks)
154 .await
155 .into_iter()
156 .map(|res| {
157 if res.is_err() {
158 error!("Failed to persist data with error: {:?}", res);
159 }
160 res
161 })
162 .collect::<IndexerResult<Vec<_>>>()
163 .expect("Persisting data into DB should not fail.");
164 }
165
166 let is_epoch_end = epoch.is_some();
167
168 if let Some(epoch_data) = epoch {
170 state
171 .advance_epoch(epoch_data)
172 .await
173 .tap_err(|e| {
174 error!("Failed to advance epoch with error: {}", e.to_string());
175 })
176 .expect("Advancing epochs in DB should not fail.");
177 metrics.total_epoch_committed.inc();
178
179 state
181 .refresh_participation_metrics()
182 .await
183 .tap_err(|e| {
184 error!("Failed to update participation metrics: {e}");
185 })
186 .expect("Updating participation metrics should not fail.");
187 }
188
189 state
190 .persist_checkpoints(checkpoint_batch)
191 .await
192 .tap_err(|e| {
193 error!(
194 "Failed to persist checkpoint data with error: {}",
195 e.to_string()
196 );
197 })
198 .expect("Persisting data into DB should not fail.");
199
200 if is_epoch_end {
201 let chain_id = state
204 .get_chain_identifier()
205 .await
206 .expect("Failed to get chain identifier")
207 .expect("Chain identifier should have been indexed at this point");
208 let _ = state.persist_protocol_configs_and_feature_flags(chain_id);
209 }
210
211 let elapsed = guard.stop_and_record();
212
213 info!(
214 elapsed,
215 "Checkpoint {}-{} committed with {} transactions.",
216 first_checkpoint_seq,
217 last_checkpoint_seq,
218 tx_count,
219 );
220 metrics
221 .latest_tx_checkpoint_sequence_number
222 .set(last_checkpoint_seq as i64);
223 metrics
224 .total_tx_checkpoint_committed
225 .inc_by(checkpoint_num as u64);
226 metrics.total_transaction_committed.inc_by(tx_count as u64);
227 metrics
228 .transaction_per_checkpoint
229 .observe(tx_count as f64 / (last_checkpoint_seq - first_checkpoint_seq + 1) as f64);
230 metrics
234 .thousand_transaction_avg_db_commit_latency
235 .observe(elapsed * 1000.0 / tx_count as f64);
236}