1use std::{collections::HashMap, env};
6
7use anyhow::Result;
8use async_trait::async_trait;
9use iota_data_ingestion_core::{
10 DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, WorkerPool,
11};
12use iota_metrics::spawn_monitored_task;
13use iota_types::messages_checkpoint::CheckpointSequenceNumber;
14use prometheus::Registry;
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17
18use crate::{
19 IndexerConfig, build_json_rpc_server,
20 errors::IndexerError,
21 handlers::{
22 checkpoint_handler::new_handlers,
23 objects_snapshot_handler::{SnapshotLagConfig, start_objects_snapshot_handler},
24 pruner::Pruner,
25 },
26 indexer_reader::IndexerReader,
27 metrics::IndexerMetrics,
28 processors::processor_orchestrator::ProcessorOrchestrator,
29 store::{IndexerAnalyticalStore, IndexerStore, PgIndexerStore},
30};
31
32pub(crate) const DOWNLOAD_QUEUE_SIZE: usize = 200;
33const INGESTION_READER_TIMEOUT_SECS: u64 = 20;
34const CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT: usize = 20000000;
38
39pub struct Indexer;
40
41impl Indexer {
42 pub async fn start_writer(
43 config: &IndexerConfig,
44 store: PgIndexerStore,
45 metrics: IndexerMetrics,
46 ) -> Result<(), IndexerError> {
47 let snapshot_config = SnapshotLagConfig::default();
48 Indexer::start_writer_with_config(
49 config,
50 store,
51 metrics,
52 snapshot_config,
53 None,
54 CancellationToken::new(),
55 )
56 .await
57 }
58
59 pub async fn start_writer_with_config(
60 config: &IndexerConfig,
61 store: PgIndexerStore,
62 metrics: IndexerMetrics,
63 snapshot_config: SnapshotLagConfig,
64 epochs_to_keep: Option<u64>,
65 cancel: CancellationToken,
66 ) -> Result<(), IndexerError> {
67 info!(
68 "IOTA Indexer Writer (version {:?}) started...",
69 env!("CARGO_PKG_VERSION")
70 );
71
72 let primary_watermark = store
73 .get_latest_checkpoint_sequence_number()
74 .await
75 .expect("Failed to get latest tx checkpoint sequence number from DB")
76 .map(|seq| seq + 1)
77 .unwrap_or_default();
78 let download_queue_size = env::var("DOWNLOAD_QUEUE_SIZE")
79 .unwrap_or_else(|_| DOWNLOAD_QUEUE_SIZE.to_string())
80 .parse::<usize>()
81 .expect("Invalid DOWNLOAD_QUEUE_SIZE");
82 let ingestion_reader_timeout_secs = env::var("INGESTION_READER_TIMEOUT_SECS")
83 .unwrap_or_else(|_| INGESTION_READER_TIMEOUT_SECS.to_string())
84 .parse::<u64>()
85 .expect("Invalid INGESTION_READER_TIMEOUT_SECS");
86 let data_limit = std::env::var("CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT")
87 .unwrap_or(CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT.to_string())
88 .parse::<usize>()
89 .unwrap();
90 let extra_reader_options = ReaderOptions {
91 batch_size: download_queue_size,
92 timeout_secs: ingestion_reader_timeout_secs,
93 data_limit,
94 ..Default::default()
95 };
96
97 let (object_snapshot_worker, object_snapshot_watermark) = start_objects_snapshot_handler(
100 store.clone(),
101 metrics.clone(),
102 snapshot_config,
103 cancel.clone(),
104 )
105 .await?;
106
107 let epochs_to_keep = epochs_to_keep.or_else(|| {
108 std::env::var("EPOCHS_TO_KEEP")
109 .ok()
110 .and_then(|s| s.parse::<u64>().ok())
111 });
112 if let Some(epochs_to_keep) = epochs_to_keep {
113 info!(
114 "Starting indexer pruner with epochs to keep: {}",
115 epochs_to_keep
116 );
117 assert!(epochs_to_keep > 0, "Epochs to keep must be positive");
118 let pruner: Pruner = Pruner::new(store.clone(), epochs_to_keep, metrics.clone())?;
119 spawn_monitored_task!(pruner.start(CancellationToken::new()));
120 }
121
122 if let Some(chain_id) = IndexerStore::get_chain_identifier(&store).await? {
127 store.persist_protocol_configs_and_feature_flags(chain_id)?;
128 }
129
130 let mut executor = IndexerExecutor::new(
131 ShimIndexerProgressStore::new(vec![
132 ("primary".to_string(), primary_watermark),
133 ("object_snapshot".to_string(), object_snapshot_watermark),
134 ]),
135 1,
136 DataIngestionMetrics::new(&Registry::new()),
137 cancel.child_token(),
138 );
139 let worker = new_handlers(store, metrics, primary_watermark, cancel.clone()).await?;
140 let worker_pool = WorkerPool::new(
141 worker,
142 "primary".to_string(),
143 download_queue_size,
144 Default::default(),
145 );
146
147 executor.register(worker_pool).await?;
148
149 let worker_pool = WorkerPool::new(
150 object_snapshot_worker,
151 "object_snapshot".to_string(),
152 download_queue_size,
153 Default::default(),
154 );
155 executor.register(worker_pool).await?;
156 info!("Starting data ingestion executor...");
157 executor
158 .run(
159 config
160 .data_ingestion_path
161 .clone()
162 .unwrap_or(tempfile::tempdir().unwrap().into_path()),
163 config.remote_store_url.clone(),
164 vec![],
165 extra_reader_options,
166 )
167 .await?;
168 Ok(())
169 }
170
171 pub async fn start_reader(
172 config: &IndexerConfig,
173 registry: &Registry,
174 db_url: String,
175 ) -> Result<(), IndexerError> {
176 info!(
177 "IOTA Indexer Reader (version {:?}) started...",
178 env!("CARGO_PKG_VERSION")
179 );
180 let indexer_reader = IndexerReader::new(db_url)?;
181 let handle = build_json_rpc_server(registry, indexer_reader, config, None)
182 .await
183 .expect("Json rpc server should not run into errors upon start.");
184 tokio::spawn(async move { handle.stopped().await })
185 .await
186 .expect("Rpc server task failed");
187
188 Ok(())
189 }
190 pub async fn start_analytical_worker<
191 S: IndexerAnalyticalStore + Clone + Send + Sync + 'static,
192 >(
193 store: S,
194 metrics: IndexerMetrics,
195 ) -> Result<(), IndexerError> {
196 info!(
197 "IOTA Indexer Analytical Worker (version {:?}) started...",
198 env!("CARGO_PKG_VERSION")
199 );
200 let mut processor_orchestrator = ProcessorOrchestrator::new(store, metrics);
201 processor_orchestrator.run_forever().await;
202 Ok(())
203 }
204}
205
206struct ShimIndexerProgressStore {
207 watermarks: HashMap<String, CheckpointSequenceNumber>,
208}
209
210impl ShimIndexerProgressStore {
211 fn new(watermarks: Vec<(String, CheckpointSequenceNumber)>) -> Self {
212 Self {
213 watermarks: watermarks.into_iter().collect(),
214 }
215 }
216}
217
218#[async_trait]
219impl ProgressStore for ShimIndexerProgressStore {
220 type Error = IndexerError;
221
222 async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber, Self::Error> {
223 Ok(*self.watermarks.get(&task_name).expect("missing watermark"))
224 }
225
226 async fn save(&mut self, _: String, _: CheckpointSequenceNumber) -> Result<(), Self::Error> {
227 Ok(())
228 }
229}