iota_indexer/handlers/
mod.rs1use std::collections::BTreeMap;
6
7use async_trait::async_trait;
8use futures::{FutureExt, StreamExt};
9use tokio_util::sync::CancellationToken;
10
11use crate::{
12 errors::IndexerError,
13 models::{display::StoredDisplay, obj_indices::StoredObjectVersion},
14 types::{
15 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent,
16 IndexedObject, IndexedPackage, IndexedTransaction, IndexerResult, TxIndex,
17 },
18};
19
20pub mod checkpoint_handler;
21pub mod committer;
22pub mod objects_snapshot_handler;
23pub mod pruner;
24pub mod tx_processor;
25
26pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
27pub(crate) const UNPROCESSED_CHECKPOINT_SIZE_LIMIT: usize = 1000;
28
29#[derive(Debug)]
30pub struct CheckpointDataToCommit {
31 pub checkpoint: IndexedCheckpoint,
32 pub transactions: Vec<IndexedTransaction>,
33 pub events: Vec<IndexedEvent>,
34 pub event_indices: Vec<EventIndex>,
35 pub tx_indices: Vec<TxIndex>,
36 pub display_updates: BTreeMap<String, StoredDisplay>,
37 pub object_changes: TransactionObjectChangesToCommit,
38 pub object_history_changes: TransactionObjectChangesToCommit,
39 pub object_versions: Vec<StoredObjectVersion>,
40 pub packages: Vec<IndexedPackage>,
41 pub epoch: Option<EpochToCommit>,
42}
43
44#[derive(Clone, Debug)]
45pub struct TransactionObjectChangesToCommit {
46 pub changed_objects: Vec<IndexedObject>,
47 pub deleted_objects: Vec<IndexedDeletedObject>,
48}
49
50#[derive(Clone, Debug)]
51pub struct EpochToCommit {
52 pub last_epoch: Option<IndexedEpochInfo>,
53 pub new_epoch: IndexedEpochInfo,
54 pub network_total_transactions: u64,
55}
56
57pub struct CommonHandler<T> {
58 handler: Box<dyn Handler<T>>,
59}
60
61impl<T> CommonHandler<T> {
62 pub fn new(handler: Box<dyn Handler<T>>) -> Self {
63 Self { handler }
64 }
65
66 async fn start_transform_and_load(
67 &self,
68 cp_receiver: iota_metrics::metered_channel::Receiver<(u64, T)>,
69 cancel: CancellationToken,
70 ) -> IndexerResult<()> {
71 let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
72 .ok()
73 .and_then(|val| val.parse().ok())
74 .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE);
75 let mut stream = iota_metrics::metered_channel::ReceiverStream::new(cp_receiver)
76 .ready_chunks(checkpoint_commit_batch_size);
77
78 let mut unprocessed = BTreeMap::new();
79 let mut tuple_batch = vec![];
80 let mut next_cp_to_process = self
81 .handler
82 .get_watermark_hi()
83 .await?
84 .map(|n| n.saturating_add(1))
85 .unwrap_or_default();
86
87 loop {
88 if cancel.is_cancelled() {
89 return Ok(());
90 }
91
92 if unprocessed.len() >= UNPROCESSED_CHECKPOINT_SIZE_LIMIT {
94 tracing::debug!(
95 "Unprocessed checkpoint size reached limit {UNPROCESSED_CHECKPOINT_SIZE_LIMIT}, skip reading from stream..."
96 );
97 } else {
98 match stream.next().now_or_never() {
100 Some(Some(tuple_chunk)) => {
101 if cancel.is_cancelled() {
102 return Ok(());
103 }
104 for (cp_seq, data) in tuple_chunk {
105 unprocessed.insert(cp_seq, (cp_seq, data));
106 }
107 }
108 Some(None) => break, None => {} }
111 }
112
113 let checkpoint_lag_limiter = self.handler.get_max_committable_checkpoint().await?;
115 while next_cp_to_process <= checkpoint_lag_limiter {
116 if let Some(data_tuple) = unprocessed.remove(&next_cp_to_process) {
117 tuple_batch.push(data_tuple);
118 next_cp_to_process += 1;
119 } else {
120 break;
121 }
122 }
123
124 if !tuple_batch.is_empty() && checkpoint_lag_limiter != 0 {
125 let tuple_batch = std::mem::take(&mut tuple_batch);
126 let (last_checkpoint_seq, _data) = tuple_batch.last().unwrap();
127 let last_checkpoint_seq = last_checkpoint_seq.to_owned();
128 let batch = tuple_batch
129 .into_iter()
130 .map(|(_cp_seq, data)| data)
131 .collect();
132 self.handler.load(batch).await.map_err(|e| {
133 IndexerError::PostgresWrite(format!(
134 "Failed to load transformed data into DB for handler {}: {e}",
135 self.handler.name()
136 ))
137 })?;
138 self.handler.set_watermark_hi(last_checkpoint_seq).await?;
139 }
140 }
141 Err(IndexerError::ChannelClosed(format!(
142 "Checkpoint channel is closed unexpectedly for handler {}",
143 self.handler.name()
144 )))
145 }
146}
147
148#[async_trait]
149pub trait Handler<T>: Send + Sync {
150 fn name(&self) -> String;
152
153 async fn load(&self, batch: Vec<T>) -> IndexerResult<()>;
155
156 async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>>;
158
159 async fn set_watermark_hi(&self, watermark_hi: u64) -> IndexerResult<()>;
161
162 async fn get_max_committable_checkpoint(&self) -> IndexerResult<u64> {
168 Ok(u64::MAX)
169 }
170}