iota_indexer/
indexer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, env};
6
7use anyhow::Result;
8use async_trait::async_trait;
9use iota_data_ingestion_core::{
10    DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, WorkerPool,
11};
12use iota_metrics::spawn_monitored_task;
13use iota_types::messages_checkpoint::CheckpointSequenceNumber;
14use prometheus::Registry;
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17
18use crate::{
19    IndexerConfig, build_json_rpc_server,
20    errors::IndexerError,
21    handlers::{
22        checkpoint_handler::new_handlers,
23        objects_snapshot_handler::{SnapshotLagConfig, start_objects_snapshot_handler},
24        pruner::Pruner,
25    },
26    indexer_reader::IndexerReader,
27    metrics::IndexerMetrics,
28    processors::processor_orchestrator::ProcessorOrchestrator,
29    store::{IndexerAnalyticalStore, IndexerStore, PgIndexerStore},
30};
31
32pub(crate) const DOWNLOAD_QUEUE_SIZE: usize = 200;
33const INGESTION_READER_TIMEOUT_SECS: u64 = 20;
34// Limit indexing parallelism on big checkpoints to avoid OOM,
35// by limiting the total size of batch checkpoints to ~20MB.
36// On testnet, most checkpoints are < 200KB, some can go up to 50MB.
37const CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT: usize = 20000000;
38
39pub struct Indexer;
40
41impl Indexer {
42    pub async fn start_writer(
43        config: &IndexerConfig,
44        store: PgIndexerStore,
45        metrics: IndexerMetrics,
46    ) -> Result<(), IndexerError> {
47        let snapshot_config = SnapshotLagConfig::default();
48        Indexer::start_writer_with_config(
49            config,
50            store,
51            metrics,
52            snapshot_config,
53            None,
54            CancellationToken::new(),
55        )
56        .await
57    }
58
59    pub async fn start_writer_with_config(
60        config: &IndexerConfig,
61        store: PgIndexerStore,
62        metrics: IndexerMetrics,
63        snapshot_config: SnapshotLagConfig,
64        epochs_to_keep: Option<u64>,
65        cancel: CancellationToken,
66    ) -> Result<(), IndexerError> {
67        info!(
68            "IOTA Indexer Writer (version {:?}) started...",
69            env!("CARGO_PKG_VERSION")
70        );
71
72        let primary_watermark = store
73            .get_latest_checkpoint_sequence_number()
74            .await
75            .expect("Failed to get latest tx checkpoint sequence number from DB")
76            .map(|seq| seq + 1)
77            .unwrap_or_default();
78        let download_queue_size = env::var("DOWNLOAD_QUEUE_SIZE")
79            .unwrap_or_else(|_| DOWNLOAD_QUEUE_SIZE.to_string())
80            .parse::<usize>()
81            .expect("Invalid DOWNLOAD_QUEUE_SIZE");
82        let ingestion_reader_timeout_secs = env::var("INGESTION_READER_TIMEOUT_SECS")
83            .unwrap_or_else(|_| INGESTION_READER_TIMEOUT_SECS.to_string())
84            .parse::<u64>()
85            .expect("Invalid INGESTION_READER_TIMEOUT_SECS");
86        let data_limit = std::env::var("CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT")
87            .unwrap_or(CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT.to_string())
88            .parse::<usize>()
89            .unwrap();
90        let extra_reader_options = ReaderOptions {
91            batch_size: download_queue_size,
92            timeout_secs: ingestion_reader_timeout_secs,
93            data_limit,
94            ..Default::default()
95        };
96
97        // Start objects snapshot processor, which is a separate pipeline with its
98        // ingestion pipeline.
99        let (object_snapshot_worker, object_snapshot_watermark) = start_objects_snapshot_handler(
100            store.clone(),
101            metrics.clone(),
102            snapshot_config,
103            cancel.clone(),
104        )
105        .await?;
106
107        let epochs_to_keep = epochs_to_keep.or_else(|| {
108            std::env::var("EPOCHS_TO_KEEP")
109                .ok()
110                .and_then(|s| s.parse::<u64>().ok())
111        });
112        if let Some(epochs_to_keep) = epochs_to_keep {
113            info!(
114                "Starting indexer pruner with epochs to keep: {}",
115                epochs_to_keep
116            );
117            assert!(epochs_to_keep > 0, "Epochs to keep must be positive");
118            let pruner: Pruner = Pruner::new(store.clone(), epochs_to_keep, metrics.clone())?;
119            spawn_monitored_task!(pruner.start(CancellationToken::new()));
120        }
121
122        // If we already have chain identifier indexed (i.e. the first checkpoint has
123        // been indexed), then we persist protocol configs for protocol versions
124        // not yet in the db. Otherwise, we would do the persisting in
125        // `commit_checkpoint` while the first cp is being indexed.
126        if let Some(chain_id) = IndexerStore::get_chain_identifier(&store).await? {
127            store.persist_protocol_configs_and_feature_flags(chain_id)?;
128        }
129
130        let mut executor = IndexerExecutor::new(
131            ShimIndexerProgressStore::new(vec![
132                ("primary".to_string(), primary_watermark),
133                ("object_snapshot".to_string(), object_snapshot_watermark),
134            ]),
135            1,
136            DataIngestionMetrics::new(&Registry::new()),
137            cancel.child_token(),
138        );
139        let worker = new_handlers(store, metrics, primary_watermark, cancel.clone()).await?;
140        let worker_pool = WorkerPool::new(
141            worker,
142            "primary".to_string(),
143            download_queue_size,
144            Default::default(),
145        );
146
147        executor.register(worker_pool).await?;
148
149        let worker_pool = WorkerPool::new(
150            object_snapshot_worker,
151            "object_snapshot".to_string(),
152            download_queue_size,
153            Default::default(),
154        );
155        executor.register(worker_pool).await?;
156        info!("Starting data ingestion executor...");
157        executor
158            .run(
159                config
160                    .data_ingestion_path
161                    .clone()
162                    .unwrap_or(tempfile::tempdir().unwrap().into_path()),
163                config.remote_store_url.clone(),
164                vec![],
165                extra_reader_options,
166            )
167            .await?;
168        Ok(())
169    }
170
171    pub async fn start_reader(
172        config: &IndexerConfig,
173        registry: &Registry,
174        db_url: String,
175    ) -> Result<(), IndexerError> {
176        info!(
177            "IOTA Indexer Reader (version {:?}) started...",
178            env!("CARGO_PKG_VERSION")
179        );
180        let indexer_reader = IndexerReader::new(db_url)?;
181        let handle = build_json_rpc_server(registry, indexer_reader, config, None)
182            .await
183            .expect("Json rpc server should not run into errors upon start.");
184        tokio::spawn(async move { handle.stopped().await })
185            .await
186            .expect("Rpc server task failed");
187
188        Ok(())
189    }
190    pub async fn start_analytical_worker<
191        S: IndexerAnalyticalStore + Clone + Send + Sync + 'static,
192    >(
193        store: S,
194        metrics: IndexerMetrics,
195    ) -> Result<(), IndexerError> {
196        info!(
197            "IOTA Indexer Analytical Worker (version {:?}) started...",
198            env!("CARGO_PKG_VERSION")
199        );
200        let mut processor_orchestrator = ProcessorOrchestrator::new(store, metrics);
201        processor_orchestrator.run_forever().await;
202        Ok(())
203    }
204}
205
206struct ShimIndexerProgressStore {
207    watermarks: HashMap<String, CheckpointSequenceNumber>,
208}
209
210impl ShimIndexerProgressStore {
211    fn new(watermarks: Vec<(String, CheckpointSequenceNumber)>) -> Self {
212        Self {
213            watermarks: watermarks.into_iter().collect(),
214        }
215    }
216}
217
218#[async_trait]
219impl ProgressStore for ShimIndexerProgressStore {
220    type Error = IndexerError;
221
222    async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber, Self::Error> {
223        Ok(*self.watermarks.get(&task_name).expect("missing watermark"))
224    }
225
226    async fn save(&mut self, _: String, _: CheckpointSequenceNumber) -> Result<(), Self::Error> {
227        Ok(())
228    }
229}