iota_indexer/
test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::PathBuf;
6
7use diesel::{QueryableByName, connection::SimpleConnection, sql_types::BigInt};
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use url::Url;
13
14use crate::{
15    IndexerMetrics,
16    config::{
17        IngestionConfig, IotaNamesOptions, PruningOptions, RetentionConfig, SnapshotLagConfig,
18    },
19    db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
20    errors::IndexerError,
21    indexer::Indexer,
22    store::{PgIndexerAnalyticalStore, PgIndexerStore},
23};
24
25/// Type to create hooks to alter initial indexer DB state in tests.
26/// Those hooks are meant to be called after DB reset (if it occurs) and before
27/// indexer is started.
28///
29/// Example:
30///
31/// ```ignore
32/// let emulate_insertion_order_set_earlier_by_optimistic_indexing: DBInitHook =
33///     Box::new(move |pg_store: &PgIndexerStore| {
34///         transactional_blocking_with_retry!(
35///             &pg_store.blocking_cp(),
36///             |conn| {
37///                 insert_or_ignore_into!(
38///                     tx_insertion_order::table,
39///                     (
40///                         tx_insertion_order::dsl::tx_digest.eq(digest.inner().to_vec()),
41///                         tx_insertion_order::dsl::insertion_order.eq(123),
42///                     ),
43///                     conn
44///                 );
45///                 Ok::<(), IndexerError>(())
46///             },
47///             Duration::from_secs(60)
48///         )
49///             .unwrap()
50///     });
51///
52/// let (_, pg_store, _) = start_simulacrum_grpc_with_write_indexer(
53///     Arc::new(sim),
54///     data_ingestion_path,
55///     None,
56///     Some("indexer_ingestion_tests_db"),
57///     Some(emulate_insertion_order_set_earlier_by_optimistic_indexing),
58/// )
59/// .await;
60/// ```
61pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
62
63pub enum IndexerTypeConfig {
64    Reader {
65        reader_mode_rpc_url: String,
66    },
67    Writer {
68        snapshot_config: SnapshotLagConfig,
69        retention_config: Option<RetentionConfig>,
70    },
71    AnalyticalWorker,
72}
73
74impl IndexerTypeConfig {
75    pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
76        Self::Reader {
77            reader_mode_rpc_url,
78        }
79    }
80
81    pub fn writer_mode(
82        snapshot_config: Option<SnapshotLagConfig>,
83        pruning_options: Option<PruningOptions>,
84    ) -> Self {
85        Self::Writer {
86            snapshot_config: snapshot_config.unwrap_or_default(),
87            retention_config: pruning_options.as_ref().and_then(|pruning_options| {
88                pruning_options
89                    .epochs_to_keep
90                    .map(RetentionConfig::new_with_default_retention_only_for_testing)
91            }),
92        }
93    }
94}
95
96pub async fn start_test_indexer(
97    db_url: String,
98    reset_db: bool,
99    db_init_hook: Option<DBInitHook>,
100    rpc_url: String,
101    reader_writer_config: IndexerTypeConfig,
102    data_ingestion_path: Option<PathBuf>,
103) -> (
104    PgIndexerStore,
105    JoinHandle<Result<(), IndexerError>>,
106    CancellationToken,
107) {
108    let token = CancellationToken::new();
109    let (store, handle) = start_test_indexer_impl(
110        db_url,
111        reset_db,
112        db_init_hook,
113        rpc_url,
114        reader_writer_config,
115        data_ingestion_path,
116        token.clone(),
117    )
118    .await;
119    (store, handle, token)
120}
121
122/// Starts an indexer reader or writer for testing depending on the
123/// `reader_writer_config`.
124///
125/// # Note
126/// For [`IndexerTypeConfig::Writer`] when `data_ingestion_path` is `Some`, the
127/// data ingestion path will be exclusively used to ingest data into the
128/// indexer. To force the indexer to sync from the fullnode via gRPC, set
129/// `data_ingestion_path` to `None` and it will use the `rpc_url` to stream
130/// checkpoints from the fullnode gRPC endpoint.
131pub async fn start_test_indexer_impl(
132    db_url: String,
133    reset_db: bool,
134    db_init_hook: Option<DBInitHook>,
135    rpc_url: String,
136    reader_writer_config: IndexerTypeConfig,
137    data_ingestion_path: Option<PathBuf>,
138    cancel: CancellationToken,
139) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
140    let store = create_pg_store(&db_url, reset_db);
141    if reset_db {
142        crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
143    }
144    if let Some(db_init_hook) = db_init_hook {
145        db_init_hook(&store);
146    }
147
148    let registry = prometheus::Registry::default();
149    init_metrics(&registry);
150    let indexer_metrics = IndexerMetrics::new(&registry);
151
152    let handle = match reader_writer_config {
153        IndexerTypeConfig::Reader {
154            reader_mode_rpc_url,
155        } => {
156            let config = crate::config::JsonRpcConfig {
157                iota_names_options: IotaNamesOptions::default(),
158                historic_fallback_options: Default::default(),
159                rpc_address: reader_mode_rpc_url.parse().unwrap(),
160                rpc_client_url: rpc_url,
161            };
162            let pool = store.blocking_cp();
163            let store_clone = store.clone();
164            tokio::spawn(async move {
165                Indexer::start_reader(
166                    &config,
167                    store_clone,
168                    &registry,
169                    pool,
170                    indexer_metrics,
171                    CancellationToken::new(),
172                )
173                .await
174            })
175        }
176        IndexerTypeConfig::Writer {
177            snapshot_config,
178            retention_config,
179        } => {
180            let fullnode_rpc_url = rpc_url.parse::<Url>().unwrap();
181            let store_clone = store.clone();
182            let mut ingestion_config = IngestionConfig::default();
183            ingestion_config.sources.remote_store_url =
184                data_ingestion_path.is_none().then_some(fullnode_rpc_url);
185            ingestion_config.sources.data_ingestion_path = data_ingestion_path;
186
187            tokio::spawn(async move {
188                Indexer::start_writer_with_config(
189                    &ingestion_config,
190                    store_clone,
191                    indexer_metrics,
192                    snapshot_config,
193                    retention_config,
194                    cancel,
195                )
196                .await
197            })
198        }
199        IndexerTypeConfig::AnalyticalWorker => {
200            let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
201
202            tokio::spawn(
203                async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
204            )
205        }
206    };
207
208    (store, handle)
209}
210
211/// Manage a test database for integration tests.
212pub struct TestDatabase {
213    pub url: String,
214    db_name: String,
215    connection: PoolConnection,
216    pool_config: ConnectionPoolConfig,
217}
218
219impl TestDatabase {
220    pub fn new(db_url: String) -> Self {
221        // Reduce the connection pool size to 5 for testing
222        // to prevent maxing out
223        let pool_config = ConnectionPoolConfig {
224            pool_size: 5,
225            ..Default::default()
226        };
227
228        let db_name = db_url.split('/').next_back().unwrap().into();
229        let (default_url, _) = replace_db_name(&db_url, "postgres");
230        let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
231        let connection = blocking_pool.get().unwrap();
232        Self {
233            url: db_url,
234            db_name,
235            connection,
236            pool_config,
237        }
238    }
239
240    /// Drop the database in the server if it exists.
241    pub fn drop_if_exists(&mut self) {
242        self.connection
243            .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
244            .unwrap();
245    }
246
247    /// Create the database in the server.
248    pub fn create(&mut self) {
249        self.connection
250            .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
251            .unwrap();
252    }
253
254    /// Drop and recreate the database in the server.
255    pub fn recreate(&mut self) {
256        self.drop_if_exists();
257        self.create();
258    }
259
260    /// Create a new connection pool to the database.
261    pub fn to_connection_pool(&self) -> ConnectionPool {
262        new_connection_pool(&self.url, &self.pool_config).unwrap()
263    }
264
265    pub fn reset_db(&mut self) {
266        crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
267    }
268}
269
270pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
271    let registry = prometheus::Registry::default();
272    init_metrics(&registry);
273    let indexer_metrics = IndexerMetrics::new(&registry);
274
275    let mut test_db = TestDatabase::new(db_url.to_string());
276    if reset_database {
277        test_db.recreate();
278    }
279
280    PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics)
281}
282
283fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
284    let pos = db_url.rfind('/').expect("unable to find / in db_url");
285    let old_db_name = &db_url[pos + 1..];
286
287    (
288        format!("{}/{}", &db_url[..pos], new_db_name),
289        old_db_name.to_string(),
290    )
291}
292
293pub async fn force_delete_database(db_url: String) {
294    // Replace the database name with the default `postgres`, which should be the
295    // last string after `/` This is necessary because you can't drop a database
296    // while being connected to it. Hence switch to the default `postgres`
297    // database to drop the active database.
298    let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
299    let mut pool_config = ConnectionPoolConfig::default();
300    pool_config.set_pool_size(1);
301
302    let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
303    blocking_pool
304        .get()
305        .unwrap()
306        .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
307        .unwrap();
308}
309
310#[derive(Clone)]
311pub struct IotaTransactionBlockResponseBuilder<'a> {
312    response: IotaTransactionBlockResponse,
313    full_response: &'a IotaTransactionBlockResponse,
314}
315
316impl<'a> IotaTransactionBlockResponseBuilder<'a> {
317    pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
318        Self {
319            response: IotaTransactionBlockResponse::default(),
320            full_response,
321        }
322    }
323
324    pub fn with_input(mut self) -> Self {
325        self.response = IotaTransactionBlockResponse {
326            transaction: self.full_response.transaction.clone(),
327            ..self.response
328        };
329        self
330    }
331
332    pub fn with_raw_input(mut self) -> Self {
333        self.response = IotaTransactionBlockResponse {
334            raw_transaction: self.full_response.raw_transaction.clone(),
335            ..self.response
336        };
337        self
338    }
339
340    pub fn with_effects(mut self) -> Self {
341        self.response = IotaTransactionBlockResponse {
342            effects: self.full_response.effects.clone(),
343            ..self.response
344        };
345        self
346    }
347
348    pub fn with_events(mut self) -> Self {
349        self.response = IotaTransactionBlockResponse {
350            events: self.full_response.events.clone(),
351            ..self.response
352        };
353        self
354    }
355
356    pub fn with_balance_changes(mut self) -> Self {
357        self.response = IotaTransactionBlockResponse {
358            balance_changes: self.full_response.balance_changes.clone(),
359            ..self.response
360        };
361        self
362    }
363
364    pub fn with_object_changes(mut self) -> Self {
365        self.response = IotaTransactionBlockResponse {
366            object_changes: self.full_response.object_changes.clone(),
367            ..self.response
368        };
369        self
370    }
371
372    pub fn with_input_and_changes(mut self) -> Self {
373        self.response = IotaTransactionBlockResponse {
374            transaction: self.full_response.transaction.clone(),
375            balance_changes: self.full_response.balance_changes.clone(),
376            object_changes: self.full_response.object_changes.clone(),
377            ..self.response
378        };
379        self
380    }
381
382    pub fn build(self) -> IotaTransactionBlockResponse {
383        IotaTransactionBlockResponse {
384            transaction: self.response.transaction,
385            raw_transaction: self.response.raw_transaction,
386            effects: self.response.effects,
387            events: self.response.events,
388            balance_changes: self.response.balance_changes,
389            object_changes: self.response.object_changes,
390            // Use full response for any fields that aren't showable
391            ..self.full_response.clone()
392        }
393    }
394}
395
396/// Returns a database URL for testing purposes.
397/// It uses a default user and password, and connects to a local PostgreSQL
398/// instance.
399pub fn db_url(db_name: &str) -> String {
400    format!("postgres://postgres:postgrespw@localhost:5432/{db_name}")
401}
402
403/// Represents a row count result from a SQL query.
404#[derive(QueryableByName, Debug)]
405pub struct RowCount {
406    #[diesel(sql_type = BigInt)]
407    pub cnt: i64,
408}