iota_indexer/handlers/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::BTreeMap;
6
7use async_trait::async_trait;
8use futures::{FutureExt, StreamExt};
9use tokio_util::sync::CancellationToken;
10
11use crate::{
12    errors::IndexerError,
13    models::{display::StoredDisplay, obj_indices::StoredObjectVersion},
14    types::{
15        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent,
16        IndexedObject, IndexedPackage, IndexedTransaction, IndexerResult, TxIndex,
17    },
18};
19
20pub mod checkpoint_handler;
21pub mod committer;
22pub mod objects_snapshot_handler;
23pub mod pruner;
24pub mod tx_processor;
25
26pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
27pub(crate) const UNPROCESSED_CHECKPOINT_SIZE_LIMIT: usize = 1000;
28
29#[derive(Debug)]
30pub struct CheckpointDataToCommit {
31    pub checkpoint: IndexedCheckpoint,
32    pub transactions: Vec<IndexedTransaction>,
33    pub events: Vec<IndexedEvent>,
34    pub event_indices: Vec<EventIndex>,
35    pub tx_indices: Vec<TxIndex>,
36    pub display_updates: BTreeMap<String, StoredDisplay>,
37    pub object_changes: TransactionObjectChangesToCommit,
38    pub object_history_changes: TransactionObjectChangesToCommit,
39    pub object_versions: Vec<StoredObjectVersion>,
40    pub packages: Vec<IndexedPackage>,
41    pub epoch: Option<EpochToCommit>,
42}
43
44#[derive(Clone, Debug)]
45pub struct TransactionObjectChangesToCommit {
46    pub changed_objects: Vec<IndexedObject>,
47    pub deleted_objects: Vec<IndexedDeletedObject>,
48}
49
50#[derive(Clone, Debug)]
51pub struct EpochToCommit {
52    pub last_epoch: Option<IndexedEpochInfo>,
53    pub new_epoch: IndexedEpochInfo,
54    pub network_total_transactions: u64,
55}
56
57pub struct CommonHandler<T> {
58    handler: Box<dyn Handler<T>>,
59}
60
61impl<T> CommonHandler<T> {
62    pub fn new(handler: Box<dyn Handler<T>>) -> Self {
63        Self { handler }
64    }
65
66    async fn start_transform_and_load(
67        &self,
68        cp_receiver: iota_metrics::metered_channel::Receiver<(u64, T)>,
69        cancel: CancellationToken,
70    ) -> IndexerResult<()> {
71        let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
72            .ok()
73            .and_then(|val| val.parse().ok())
74            .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE);
75        let mut stream = iota_metrics::metered_channel::ReceiverStream::new(cp_receiver)
76            .ready_chunks(checkpoint_commit_batch_size);
77
78        let mut unprocessed = BTreeMap::new();
79        let mut tuple_batch = vec![];
80        let mut next_cp_to_process = self
81            .handler
82            .get_watermark_hi()
83            .await?
84            .map(|n| n.saturating_add(1))
85            .unwrap_or_default();
86
87        loop {
88            if cancel.is_cancelled() {
89                return Ok(());
90            }
91
92            // Try to fetch new data tuple from the stream
93            if unprocessed.len() >= UNPROCESSED_CHECKPOINT_SIZE_LIMIT {
94                tracing::debug!(
95                    "Unprocessed checkpoint size reached limit {UNPROCESSED_CHECKPOINT_SIZE_LIMIT}, skip reading from stream..."
96                );
97            } else {
98                // Try to fetch new data tuple from the stream
99                match stream.next().now_or_never() {
100                    Some(Some(tuple_chunk)) => {
101                        if cancel.is_cancelled() {
102                            return Ok(());
103                        }
104                        for (cp_seq, data) in tuple_chunk {
105                            unprocessed.insert(cp_seq, (cp_seq, data));
106                        }
107                    }
108                    Some(None) => break, // Stream has ended
109                    None => {}           // No new data tuple available right now
110                }
111            }
112
113            // Process unprocessed checkpoints, even no new checkpoints from stream
114            let checkpoint_lag_limiter = self.handler.get_max_committable_checkpoint().await?;
115            while next_cp_to_process <= checkpoint_lag_limiter {
116                if let Some(data_tuple) = unprocessed.remove(&next_cp_to_process) {
117                    tuple_batch.push(data_tuple);
118                    next_cp_to_process += 1;
119                } else {
120                    break;
121                }
122            }
123
124            if !tuple_batch.is_empty() && checkpoint_lag_limiter != 0 {
125                let tuple_batch = std::mem::take(&mut tuple_batch);
126                let (last_checkpoint_seq, _data) = tuple_batch.last().unwrap();
127                let last_checkpoint_seq = last_checkpoint_seq.to_owned();
128                let batch = tuple_batch
129                    .into_iter()
130                    .map(|(_cp_seq, data)| data)
131                    .collect();
132                self.handler.load(batch).await.map_err(|e| {
133                    IndexerError::PostgresWrite(format!(
134                        "Failed to load transformed data into DB for handler {}: {e}",
135                        self.handler.name()
136                    ))
137                })?;
138                self.handler.set_watermark_hi(last_checkpoint_seq).await?;
139            }
140        }
141        Err(IndexerError::ChannelClosed(format!(
142            "Checkpoint channel is closed unexpectedly for handler {}",
143            self.handler.name()
144        )))
145    }
146}
147
148#[async_trait]
149pub trait Handler<T>: Send + Sync {
150    /// return handler name
151    fn name(&self) -> String;
152
153    /// commit batch of transformed data to DB
154    async fn load(&self, batch: Vec<T>) -> IndexerResult<()>;
155
156    /// read high watermark of the table DB
157    async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>>;
158
159    /// set high watermark of the table DB, also update metrics.
160    async fn set_watermark_hi(&self, watermark_hi: u64) -> IndexerResult<()>;
161
162    /// By default, return u64::MAX, which means no extra waiting is needed
163    /// before committing; get max committable checkpoint, for handlers that
164    /// want to wait for some condition before committing, one use-case is
165    /// the objects snapshot handler, which waits for the lag between
166    /// snapshot and latest checkpoint to reach a certain threshold.
167    async fn get_max_committable_checkpoint(&self) -> IndexerResult<u64> {
168        Ok(u64::MAX)
169    }
170}