1use std::path::PathBuf;
6
7use diesel::{QueryableByName, connection::SimpleConnection, sql_types::BigInt};
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use url::Url;
13
14use crate::{
15 IndexerMetrics,
16 config::{
17 IngestionConfig, IotaNamesOptions, PruningOptions, RetentionConfig, SnapshotLagConfig,
18 },
19 db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
20 errors::IndexerError,
21 indexer::Indexer,
22 store::{PgIndexerAnalyticalStore, PgIndexerStore},
23};
24
25pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
62
63pub enum IndexerTypeConfig {
64 Reader {
65 reader_mode_rpc_url: String,
66 },
67 Writer {
68 snapshot_config: SnapshotLagConfig,
69 retention_config: Option<RetentionConfig>,
70 },
71 AnalyticalWorker,
72}
73
74impl IndexerTypeConfig {
75 pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
76 Self::Reader {
77 reader_mode_rpc_url,
78 }
79 }
80
81 pub fn writer_mode(
82 snapshot_config: Option<SnapshotLagConfig>,
83 pruning_options: Option<PruningOptions>,
84 ) -> Self {
85 Self::Writer {
86 snapshot_config: snapshot_config.unwrap_or_default(),
87 retention_config: pruning_options.as_ref().and_then(|pruning_options| {
88 pruning_options
89 .epochs_to_keep
90 .map(RetentionConfig::new_with_default_retention_only_for_testing)
91 }),
92 }
93 }
94}
95
96pub async fn start_test_indexer(
97 db_url: String,
98 reset_db: bool,
99 db_init_hook: Option<DBInitHook>,
100 rpc_url: String,
101 reader_writer_config: IndexerTypeConfig,
102 data_ingestion_path: Option<PathBuf>,
103) -> (
104 PgIndexerStore,
105 JoinHandle<Result<(), IndexerError>>,
106 CancellationToken,
107) {
108 let token = CancellationToken::new();
109 let (store, handle) = start_test_indexer_impl(
110 db_url,
111 reset_db,
112 db_init_hook,
113 rpc_url,
114 reader_writer_config,
115 data_ingestion_path,
116 token.clone(),
117 )
118 .await;
119 (store, handle, token)
120}
121
122pub async fn start_test_indexer_impl(
132 db_url: String,
133 reset_db: bool,
134 db_init_hook: Option<DBInitHook>,
135 rpc_url: String,
136 reader_writer_config: IndexerTypeConfig,
137 data_ingestion_path: Option<PathBuf>,
138 cancel: CancellationToken,
139) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
140 let store = create_pg_store(&db_url, reset_db);
141 if reset_db {
142 crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
143 }
144 if let Some(db_init_hook) = db_init_hook {
145 db_init_hook(&store);
146 }
147
148 let registry = prometheus::Registry::default();
149 init_metrics(®istry);
150 let indexer_metrics = IndexerMetrics::new(®istry);
151
152 let handle = match reader_writer_config {
153 IndexerTypeConfig::Reader {
154 reader_mode_rpc_url,
155 } => {
156 let config = crate::config::JsonRpcConfig {
157 iota_names_options: IotaNamesOptions::default(),
158 historic_fallback_options: Default::default(),
159 rpc_address: reader_mode_rpc_url.parse().unwrap(),
160 rpc_client_url: rpc_url,
161 };
162 let pool = store.blocking_cp();
163 let store_clone = store.clone();
164 tokio::spawn(async move {
165 Indexer::start_reader(
166 &config,
167 store_clone,
168 ®istry,
169 pool,
170 indexer_metrics,
171 CancellationToken::new(),
172 )
173 .await
174 })
175 }
176 IndexerTypeConfig::Writer {
177 snapshot_config,
178 retention_config,
179 } => {
180 let fullnode_rpc_url = rpc_url.parse::<Url>().unwrap();
181 let store_clone = store.clone();
182 let mut ingestion_config = IngestionConfig::default();
183 ingestion_config.sources.remote_store_url =
184 data_ingestion_path.is_none().then_some(fullnode_rpc_url);
185 ingestion_config.sources.data_ingestion_path = data_ingestion_path;
186
187 tokio::spawn(async move {
188 Indexer::start_writer_with_config(
189 &ingestion_config,
190 store_clone,
191 indexer_metrics,
192 snapshot_config,
193 retention_config,
194 cancel,
195 )
196 .await
197 })
198 }
199 IndexerTypeConfig::AnalyticalWorker => {
200 let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
201
202 tokio::spawn(
203 async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
204 )
205 }
206 };
207
208 (store, handle)
209}
210
211pub struct TestDatabase {
213 pub url: String,
214 db_name: String,
215 connection: PoolConnection,
216 pool_config: ConnectionPoolConfig,
217}
218
219impl TestDatabase {
220 pub fn new(db_url: String) -> Self {
221 let pool_config = ConnectionPoolConfig {
224 pool_size: 5,
225 ..Default::default()
226 };
227
228 let db_name = db_url.split('/').next_back().unwrap().into();
229 let (default_url, _) = replace_db_name(&db_url, "postgres");
230 let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
231 let connection = blocking_pool.get().unwrap();
232 Self {
233 url: db_url,
234 db_name,
235 connection,
236 pool_config,
237 }
238 }
239
240 pub fn drop_if_exists(&mut self) {
242 self.connection
243 .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
244 .unwrap();
245 }
246
247 pub fn create(&mut self) {
249 self.connection
250 .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
251 .unwrap();
252 }
253
254 pub fn recreate(&mut self) {
256 self.drop_if_exists();
257 self.create();
258 }
259
260 pub fn to_connection_pool(&self) -> ConnectionPool {
262 new_connection_pool(&self.url, &self.pool_config).unwrap()
263 }
264
265 pub fn reset_db(&mut self) {
266 crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
267 }
268}
269
270pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
271 let registry = prometheus::Registry::default();
272 init_metrics(®istry);
273 let indexer_metrics = IndexerMetrics::new(®istry);
274
275 let mut test_db = TestDatabase::new(db_url.to_string());
276 if reset_database {
277 test_db.recreate();
278 }
279
280 PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics)
281}
282
283fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
284 let pos = db_url.rfind('/').expect("unable to find / in db_url");
285 let old_db_name = &db_url[pos + 1..];
286
287 (
288 format!("{}/{}", &db_url[..pos], new_db_name),
289 old_db_name.to_string(),
290 )
291}
292
293pub async fn force_delete_database(db_url: String) {
294 let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
299 let mut pool_config = ConnectionPoolConfig::default();
300 pool_config.set_pool_size(1);
301
302 let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
303 blocking_pool
304 .get()
305 .unwrap()
306 .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
307 .unwrap();
308}
309
310#[derive(Clone)]
311pub struct IotaTransactionBlockResponseBuilder<'a> {
312 response: IotaTransactionBlockResponse,
313 full_response: &'a IotaTransactionBlockResponse,
314}
315
316impl<'a> IotaTransactionBlockResponseBuilder<'a> {
317 pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
318 Self {
319 response: IotaTransactionBlockResponse::default(),
320 full_response,
321 }
322 }
323
324 pub fn with_input(mut self) -> Self {
325 self.response = IotaTransactionBlockResponse {
326 transaction: self.full_response.transaction.clone(),
327 ..self.response
328 };
329 self
330 }
331
332 pub fn with_raw_input(mut self) -> Self {
333 self.response = IotaTransactionBlockResponse {
334 raw_transaction: self.full_response.raw_transaction.clone(),
335 ..self.response
336 };
337 self
338 }
339
340 pub fn with_effects(mut self) -> Self {
341 self.response = IotaTransactionBlockResponse {
342 effects: self.full_response.effects.clone(),
343 ..self.response
344 };
345 self
346 }
347
348 pub fn with_events(mut self) -> Self {
349 self.response = IotaTransactionBlockResponse {
350 events: self.full_response.events.clone(),
351 ..self.response
352 };
353 self
354 }
355
356 pub fn with_balance_changes(mut self) -> Self {
357 self.response = IotaTransactionBlockResponse {
358 balance_changes: self.full_response.balance_changes.clone(),
359 ..self.response
360 };
361 self
362 }
363
364 pub fn with_object_changes(mut self) -> Self {
365 self.response = IotaTransactionBlockResponse {
366 object_changes: self.full_response.object_changes.clone(),
367 ..self.response
368 };
369 self
370 }
371
372 pub fn with_input_and_changes(mut self) -> Self {
373 self.response = IotaTransactionBlockResponse {
374 transaction: self.full_response.transaction.clone(),
375 balance_changes: self.full_response.balance_changes.clone(),
376 object_changes: self.full_response.object_changes.clone(),
377 ..self.response
378 };
379 self
380 }
381
382 pub fn build(self) -> IotaTransactionBlockResponse {
383 IotaTransactionBlockResponse {
384 transaction: self.response.transaction,
385 raw_transaction: self.response.raw_transaction,
386 effects: self.response.effects,
387 events: self.response.events,
388 balance_changes: self.response.balance_changes,
389 object_changes: self.response.object_changes,
390 ..self.full_response.clone()
392 }
393 }
394}
395
396pub fn db_url(db_name: &str) -> String {
400 format!("postgres://postgres:postgrespw@localhost:5432/{db_name}")
401}
402
403#[derive(QueryableByName, Debug)]
405pub struct RowCount {
406 #[diesel(sql_type = BigInt)]
407 pub cnt: i64,
408}