Skip to main content

iota_indexer/
test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::PathBuf;
6
7use diesel::{QueryableByName, connection::SimpleConnection, sql_types::BigInt};
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use url::Url;
13
14use crate::{
15    IndexerMetrics,
16    config::{IngestionConfig, IotaNamesOptions, PruningOptions, RetentionConfig},
17    db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
18    errors::IndexerError,
19    indexer::Indexer,
20    store::{PgIndexerAnalyticalStore, PgIndexerStore},
21};
22
23/// Shorter pruning delay used by the test indexer.
24const TEST_PRUNING_DELAY_MS: u64 = 1000; // 1 second
25
26/// Type to create hooks to alter initial indexer DB state in tests.
27/// Those hooks are meant to be called after DB reset (if it occurs) and before
28/// indexer is started.
29///
30/// Example:
31///
32/// ```ignore
33/// let emulate_insertion_order_set_earlier_by_optimistic_indexing: DBInitHook =
34///     Box::new(move |pg_store: &PgIndexerStore| {
35///         transactional_blocking_with_retry!(
36///             &pg_store.blocking_cp(),
37///             |conn| {
38///                 insert_or_ignore_into!(
39///                     tx_insertion_order::table,
40///                     (
41///                         tx_insertion_order::dsl::tx_digest.eq(digest.inner().to_vec()),
42///                         tx_insertion_order::dsl::insertion_order.eq(123),
43///                     ),
44///                     conn
45///                 );
46///                 Ok::<(), IndexerError>(())
47///             },
48///             Duration::from_secs(60)
49///         )
50///             .unwrap()
51///     });
52///
53/// let (_, pg_store, _) = start_simulacrum_grpc_with_write_indexer(
54///     Arc::new(sim),
55///     data_ingestion_path,
56///     None,
57///     Some("indexer_ingestion_tests_db"),
58///     Some(emulate_insertion_order_set_earlier_by_optimistic_indexing),
59/// )
60/// .await;
61/// ```
62pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
63
64pub enum IndexerTypeConfig {
65    Reader {
66        reader_mode_rpc_url: String,
67    },
68    Writer {
69        retention_config: Option<RetentionConfig>,
70        pruning_delay_ms: u64,
71        pruning_batch_size: u64,
72    },
73    AnalyticalWorker,
74}
75
76impl IndexerTypeConfig {
77    pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
78        Self::Reader {
79            reader_mode_rpc_url,
80        }
81    }
82
83    pub fn writer_mode(pruning_options: Option<PruningOptions>) -> Self {
84        let opts = pruning_options.unwrap_or_default();
85        Self::Writer {
86            retention_config: opts
87                .epochs_to_keep
88                .map(RetentionConfig::new_with_default_retention_only_for_testing),
89            pruning_delay_ms: TEST_PRUNING_DELAY_MS,
90            pruning_batch_size: opts.pruning_batch_size,
91        }
92    }
93}
94
95pub async fn start_test_indexer(
96    db_url: String,
97    reset_db: bool,
98    db_init_hook: Option<DBInitHook>,
99    rpc_url: String,
100    reader_writer_config: IndexerTypeConfig,
101    data_ingestion_path: Option<PathBuf>,
102) -> (
103    PgIndexerStore,
104    JoinHandle<Result<(), IndexerError>>,
105    CancellationToken,
106) {
107    let token = CancellationToken::new();
108    let (store, handle) = start_test_indexer_impl(
109        db_url,
110        reset_db,
111        db_init_hook,
112        rpc_url,
113        reader_writer_config,
114        data_ingestion_path,
115        token.clone(),
116    )
117    .await;
118    (store, handle, token)
119}
120
121/// Starts an indexer reader or writer for testing depending on the
122/// `reader_writer_config`.
123///
124/// # Note
125/// For [`IndexerTypeConfig::Writer`] when `data_ingestion_path` is `Some`, the
126/// data ingestion path will be exclusively used to ingest data into the
127/// indexer. To force the indexer to sync from the fullnode via gRPC, set
128/// `data_ingestion_path` to `None` and it will use the `rpc_url` to stream
129/// checkpoints from the fullnode gRPC endpoint.
130pub async fn start_test_indexer_impl(
131    db_url: String,
132    reset_db: bool,
133    db_init_hook: Option<DBInitHook>,
134    rpc_url: String,
135    reader_writer_config: IndexerTypeConfig,
136    data_ingestion_path: Option<PathBuf>,
137    cancel: CancellationToken,
138) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
139    let store = create_pg_store(&db_url, reset_db);
140    if reset_db {
141        crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
142    }
143    if let Some(db_init_hook) = db_init_hook {
144        db_init_hook(&store);
145    }
146
147    let registry = prometheus::Registry::default();
148    init_metrics(&registry);
149    let indexer_metrics = IndexerMetrics::new(&registry);
150
151    let handle = match reader_writer_config {
152        IndexerTypeConfig::Reader {
153            reader_mode_rpc_url,
154        } => {
155            let config = crate::config::JsonRpcConfig {
156                iota_names_options: IotaNamesOptions::default(),
157                historic_fallback_options: Default::default(),
158                rpc_address: reader_mode_rpc_url.parse().unwrap(),
159                rpc_client_url: rpc_url,
160            };
161            let pool = store.blocking_cp();
162            let store_clone = store.clone();
163            tokio::spawn(async move {
164                Indexer::start_reader(
165                    &config,
166                    store_clone,
167                    &registry,
168                    pool,
169                    indexer_metrics,
170                    CancellationToken::new(),
171                )
172                .await
173            })
174        }
175        IndexerTypeConfig::Writer {
176            retention_config,
177            pruning_delay_ms,
178            pruning_batch_size,
179        } => {
180            let fullnode_rpc_url = rpc_url.parse::<Url>().unwrap();
181            let store_clone = store.clone();
182            let mut ingestion_config = IngestionConfig::default();
183            ingestion_config.sources.remote_store_url =
184                data_ingestion_path.is_none().then_some(fullnode_rpc_url);
185            ingestion_config.sources.data_ingestion_path = data_ingestion_path;
186
187            tokio::spawn(async move {
188                Indexer::start_writer_with_config(
189                    &ingestion_config,
190                    store_clone,
191                    indexer_metrics,
192                    retention_config,
193                    pruning_delay_ms,
194                    pruning_batch_size,
195                    cancel,
196                )
197                .await
198            })
199        }
200        IndexerTypeConfig::AnalyticalWorker => {
201            let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
202
203            tokio::spawn(
204                async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
205            )
206        }
207    };
208
209    (store, handle)
210}
211
212/// Manage a test database for integration tests.
213pub struct TestDatabase {
214    pub url: String,
215    db_name: String,
216    connection: PoolConnection,
217    pool_config: ConnectionPoolConfig,
218}
219
220impl TestDatabase {
221    pub fn new(db_url: String) -> Self {
222        // Reduce the connection pool size to 5 for testing
223        // to prevent maxing out
224        let pool_config = ConnectionPoolConfig {
225            pool_size: 5,
226            ..Default::default()
227        };
228
229        let db_name = db_url.split('/').next_back().unwrap().into();
230        let (default_url, _) = replace_db_name(&db_url, "postgres");
231        let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
232        let connection = blocking_pool.get().unwrap();
233        Self {
234            url: db_url,
235            db_name,
236            connection,
237            pool_config,
238        }
239    }
240
241    /// Drop the database in the server if it exists.
242    pub fn drop_if_exists(&mut self) {
243        self.connection
244            .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
245            .unwrap();
246    }
247
248    /// Create the database in the server.
249    pub fn create(&mut self) {
250        self.connection
251            .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
252            .unwrap();
253    }
254
255    /// Drop and recreate the database in the server.
256    pub fn recreate(&mut self) {
257        self.drop_if_exists();
258        self.create();
259    }
260
261    /// Create a new connection pool to the database.
262    pub fn to_connection_pool(&self) -> ConnectionPool {
263        new_connection_pool(&self.url, &self.pool_config).unwrap()
264    }
265
266    pub fn reset_db(&mut self) {
267        crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
268    }
269}
270
271pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
272    let registry = prometheus::Registry::default();
273    init_metrics(&registry);
274    let indexer_metrics = IndexerMetrics::new(&registry);
275
276    let mut test_db = TestDatabase::new(db_url.to_string());
277    if reset_database {
278        test_db.recreate();
279    }
280
281    PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics)
282}
283
284fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
285    let pos = db_url.rfind('/').expect("unable to find / in db_url");
286    let old_db_name = &db_url[pos + 1..];
287
288    (
289        format!("{}/{}", &db_url[..pos], new_db_name),
290        old_db_name.to_string(),
291    )
292}
293
294pub async fn force_delete_database(db_url: String) {
295    // Replace the database name with the default `postgres`, which should be the
296    // last string after `/` This is necessary because you can't drop a database
297    // while being connected to it. Hence switch to the default `postgres`
298    // database to drop the active database.
299    let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
300    let mut pool_config = ConnectionPoolConfig::default();
301    pool_config.set_pool_size(1);
302
303    let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
304    blocking_pool
305        .get()
306        .unwrap()
307        .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
308        .unwrap();
309}
310
311#[derive(Clone)]
312pub struct IotaTransactionBlockResponseBuilder<'a> {
313    response: IotaTransactionBlockResponse,
314    full_response: &'a IotaTransactionBlockResponse,
315}
316
317impl<'a> IotaTransactionBlockResponseBuilder<'a> {
318    pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
319        Self {
320            response: IotaTransactionBlockResponse::default(),
321            full_response,
322        }
323    }
324
325    pub fn with_input(mut self) -> Self {
326        self.response = IotaTransactionBlockResponse {
327            transaction: self.full_response.transaction.clone(),
328            ..self.response
329        };
330        self
331    }
332
333    pub fn with_raw_input(mut self) -> Self {
334        self.response = IotaTransactionBlockResponse {
335            raw_transaction: self.full_response.raw_transaction.clone(),
336            ..self.response
337        };
338        self
339    }
340
341    pub fn with_effects(mut self) -> Self {
342        self.response = IotaTransactionBlockResponse {
343            effects: self.full_response.effects.clone(),
344            ..self.response
345        };
346        self
347    }
348
349    pub fn with_events(mut self) -> Self {
350        self.response = IotaTransactionBlockResponse {
351            events: self.full_response.events.clone(),
352            ..self.response
353        };
354        self
355    }
356
357    pub fn with_balance_changes(mut self) -> Self {
358        self.response = IotaTransactionBlockResponse {
359            balance_changes: self.full_response.balance_changes.clone(),
360            ..self.response
361        };
362        self
363    }
364
365    pub fn with_object_changes(mut self) -> Self {
366        self.response = IotaTransactionBlockResponse {
367            object_changes: self.full_response.object_changes.clone(),
368            ..self.response
369        };
370        self
371    }
372
373    pub fn with_input_and_changes(mut self) -> Self {
374        self.response = IotaTransactionBlockResponse {
375            transaction: self.full_response.transaction.clone(),
376            balance_changes: self.full_response.balance_changes.clone(),
377            object_changes: self.full_response.object_changes.clone(),
378            ..self.response
379        };
380        self
381    }
382
383    pub fn build(self) -> IotaTransactionBlockResponse {
384        IotaTransactionBlockResponse {
385            transaction: self.response.transaction,
386            raw_transaction: self.response.raw_transaction,
387            effects: self.response.effects,
388            events: self.response.events,
389            balance_changes: self.response.balance_changes,
390            object_changes: self.response.object_changes,
391            // Use full response for any fields that aren't showable
392            ..self.full_response.clone()
393        }
394    }
395}
396
397/// Returns a database URL for testing purposes.
398/// It uses a default user and password, and connects to a local PostgreSQL
399/// instance.
400pub fn db_url(db_name: &str) -> String {
401    format!("postgres://postgres:postgrespw@localhost:5432/{db_name}")
402}
403
404/// Represents a row count result from a SQL query.
405#[derive(QueryableByName, Debug)]
406pub struct RowCount {
407    #[diesel(sql_type = BigInt)]
408    pub cnt: i64,
409}