iota_indexer/
test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::PathBuf;
6
7use diesel::{QueryableByName, connection::SimpleConnection, sql_types::BigInt};
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::{
14    IndexerMetrics,
15    config::{
16        IngestionConfig, IotaNamesOptions, PruningOptions, RetentionConfig, SnapshotLagConfig,
17    },
18    db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
19    errors::IndexerError,
20    indexer::Indexer,
21    store::{PgIndexerAnalyticalStore, PgIndexerStore},
22};
23
24/// Type to create hooks to alter initial indexer DB state in tests.
25/// Those hooks are meant to be called after DB reset (if it occurs) and before
26/// indexer is started.
27///
28/// Example:
29///
30/// ```ignore
31/// let emulate_insertion_order_set_earlier_by_optimistic_indexing: DBInitHook =
32///     Box::new(move |pg_store: &PgIndexerStore| {
33///         transactional_blocking_with_retry!(
34///             &pg_store.blocking_cp(),
35///             |conn| {
36///                 insert_or_ignore_into!(
37///                     tx_insertion_order::table,
38///                     (
39///                         tx_insertion_order::dsl::tx_digest.eq(digest.inner().to_vec()),
40///                         tx_insertion_order::dsl::insertion_order.eq(123),
41///                     ),
42///                     conn
43///                 );
44///                 Ok::<(), IndexerError>(())
45///             },
46///             Duration::from_secs(60)
47///         )
48///             .unwrap()
49///     });
50///
51/// let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
52///     Arc::new(sim),
53///     data_ingestion_path,
54///     None,
55///     Some("indexer_ingestion_tests_db"),
56///     Some(emulate_insertion_order_set_earlier_by_optimistic_indexing),
57/// )
58/// .await;
59/// ```
60pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
61
62pub enum IndexerTypeConfig {
63    Reader {
64        reader_mode_rpc_url: String,
65    },
66    Writer {
67        snapshot_config: SnapshotLagConfig,
68        retention_config: Option<RetentionConfig>,
69        optimistic_pruner_batch_size: Option<u64>,
70    },
71    AnalyticalWorker,
72}
73
74impl IndexerTypeConfig {
75    pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
76        Self::Reader {
77            reader_mode_rpc_url,
78        }
79    }
80
81    pub fn writer_mode(
82        snapshot_config: Option<SnapshotLagConfig>,
83        pruning_options: Option<PruningOptions>,
84    ) -> Self {
85        Self::Writer {
86            snapshot_config: snapshot_config.unwrap_or_default(),
87            retention_config: pruning_options.as_ref().and_then(|pruning_options| {
88                pruning_options
89                    .epochs_to_keep
90                    .map(RetentionConfig::new_with_default_retention_only_for_testing)
91            }),
92            optimistic_pruner_batch_size: pruning_options
93                .and_then(|pruning_options| pruning_options.optimistic_pruner_batch_size),
94        }
95    }
96}
97
98pub async fn start_test_indexer(
99    db_url: String,
100    reset_db: bool,
101    db_init_hook: Option<DBInitHook>,
102    rpc_url: String,
103    reader_writer_config: IndexerTypeConfig,
104    data_ingestion_path: Option<PathBuf>,
105) -> (
106    PgIndexerStore,
107    JoinHandle<Result<(), IndexerError>>,
108    CancellationToken,
109) {
110    let token = CancellationToken::new();
111    let (store, handle) = start_test_indexer_impl(
112        db_url,
113        reset_db,
114        db_init_hook,
115        rpc_url,
116        reader_writer_config,
117        data_ingestion_path,
118        token.clone(),
119    )
120    .await;
121    (store, handle, token)
122}
123
124/// Starts an indexer reader or writer for testing depending on the
125/// `reader_writer_config`.
126pub async fn start_test_indexer_impl(
127    db_url: String,
128    reset_db: bool,
129    db_init_hook: Option<DBInitHook>,
130    rpc_url: String,
131    reader_writer_config: IndexerTypeConfig,
132    data_ingestion_path: Option<PathBuf>,
133    cancel: CancellationToken,
134) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
135    let store = create_pg_store(&db_url, reset_db);
136    if reset_db {
137        crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
138    }
139    if let Some(db_init_hook) = db_init_hook {
140        db_init_hook(&store);
141    }
142
143    let registry = prometheus::Registry::default();
144    init_metrics(&registry);
145    let indexer_metrics = IndexerMetrics::new(&registry);
146
147    let handle = match reader_writer_config {
148        IndexerTypeConfig::Reader {
149            reader_mode_rpc_url,
150        } => {
151            let config = crate::config::JsonRpcConfig {
152                iota_names_options: IotaNamesOptions::default(),
153                historic_fallback_options: Default::default(),
154                rpc_address: reader_mode_rpc_url.parse().unwrap(),
155                rpc_client_url: rpc_url,
156            };
157            let pool = store.blocking_cp();
158            let store_clone = store.clone();
159            tokio::spawn(async move {
160                Indexer::start_reader(&config, store_clone, &registry, pool, indexer_metrics).await
161            })
162        }
163        IndexerTypeConfig::Writer {
164            snapshot_config,
165            retention_config,
166            optimistic_pruner_batch_size,
167        } => {
168            let store_clone = store.clone();
169            let mut ingestion_config = IngestionConfig::default();
170            ingestion_config.sources.remote_store_url = data_ingestion_path
171                .is_none()
172                .then_some(format!("{rpc_url}/api/v1").parse().unwrap());
173            ingestion_config.sources.data_ingestion_path = data_ingestion_path;
174            ingestion_config.sources.rpc_client_url = Some(rpc_url.parse().unwrap());
175
176            tokio::spawn(async move {
177                Indexer::start_writer_with_config(
178                    &ingestion_config,
179                    store_clone,
180                    indexer_metrics,
181                    snapshot_config,
182                    retention_config,
183                    optimistic_pruner_batch_size,
184                    cancel,
185                )
186                .await
187            })
188        }
189        IndexerTypeConfig::AnalyticalWorker => {
190            let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
191
192            tokio::spawn(
193                async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
194            )
195        }
196    };
197
198    (store, handle)
199}
200
201/// Manage a test database for integration tests.
202pub struct TestDatabase {
203    pub url: String,
204    db_name: String,
205    connection: PoolConnection,
206    pool_config: ConnectionPoolConfig,
207}
208
209impl TestDatabase {
210    pub fn new(db_url: String) -> Self {
211        // Reduce the connection pool size to 5 for testing
212        // to prevent maxing out
213        let pool_config = ConnectionPoolConfig {
214            pool_size: 5,
215            ..Default::default()
216        };
217
218        let db_name = db_url.split('/').next_back().unwrap().into();
219        let (default_url, _) = replace_db_name(&db_url, "postgres");
220        let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
221        let connection = blocking_pool.get().unwrap();
222        Self {
223            url: db_url,
224            db_name,
225            connection,
226            pool_config,
227        }
228    }
229
230    /// Drop the database in the server if it exists.
231    pub fn drop_if_exists(&mut self) {
232        self.connection
233            .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
234            .unwrap();
235    }
236
237    /// Create the database in the server.
238    pub fn create(&mut self) {
239        self.connection
240            .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
241            .unwrap();
242    }
243
244    /// Drop and recreate the database in the server.
245    pub fn recreate(&mut self) {
246        self.drop_if_exists();
247        self.create();
248    }
249
250    /// Create a new connection pool to the database.
251    pub fn to_connection_pool(&self) -> ConnectionPool {
252        new_connection_pool(&self.url, &self.pool_config).unwrap()
253    }
254
255    pub fn reset_db(&mut self) {
256        crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
257    }
258}
259
260pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
261    let registry = prometheus::Registry::default();
262    init_metrics(&registry);
263    let indexer_metrics = IndexerMetrics::new(&registry);
264
265    let mut test_db = TestDatabase::new(db_url.to_string());
266    if reset_database {
267        test_db.recreate();
268    }
269
270    PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics.clone())
271}
272
273fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
274    let pos = db_url.rfind('/').expect("unable to find / in db_url");
275    let old_db_name = &db_url[pos + 1..];
276
277    (
278        format!("{}/{}", &db_url[..pos], new_db_name),
279        old_db_name.to_string(),
280    )
281}
282
283pub async fn force_delete_database(db_url: String) {
284    // Replace the database name with the default `postgres`, which should be the
285    // last string after `/` This is necessary because you can't drop a database
286    // while being connected to it. Hence switch to the default `postgres`
287    // database to drop the active database.
288    let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
289    let mut pool_config = ConnectionPoolConfig::default();
290    pool_config.set_pool_size(1);
291
292    let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
293    blocking_pool
294        .get()
295        .unwrap()
296        .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
297        .unwrap();
298}
299
300#[derive(Clone)]
301pub struct IotaTransactionBlockResponseBuilder<'a> {
302    response: IotaTransactionBlockResponse,
303    full_response: &'a IotaTransactionBlockResponse,
304}
305
306impl<'a> IotaTransactionBlockResponseBuilder<'a> {
307    pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
308        Self {
309            response: IotaTransactionBlockResponse::default(),
310            full_response,
311        }
312    }
313
314    pub fn with_input(mut self) -> Self {
315        self.response = IotaTransactionBlockResponse {
316            transaction: self.full_response.transaction.clone(),
317            ..self.response
318        };
319        self
320    }
321
322    pub fn with_raw_input(mut self) -> Self {
323        self.response = IotaTransactionBlockResponse {
324            raw_transaction: self.full_response.raw_transaction.clone(),
325            ..self.response
326        };
327        self
328    }
329
330    pub fn with_effects(mut self) -> Self {
331        self.response = IotaTransactionBlockResponse {
332            effects: self.full_response.effects.clone(),
333            ..self.response
334        };
335        self
336    }
337
338    pub fn with_events(mut self) -> Self {
339        self.response = IotaTransactionBlockResponse {
340            events: self.full_response.events.clone(),
341            ..self.response
342        };
343        self
344    }
345
346    pub fn with_balance_changes(mut self) -> Self {
347        self.response = IotaTransactionBlockResponse {
348            balance_changes: self.full_response.balance_changes.clone(),
349            ..self.response
350        };
351        self
352    }
353
354    pub fn with_object_changes(mut self) -> Self {
355        self.response = IotaTransactionBlockResponse {
356            object_changes: self.full_response.object_changes.clone(),
357            ..self.response
358        };
359        self
360    }
361
362    pub fn with_input_and_changes(mut self) -> Self {
363        self.response = IotaTransactionBlockResponse {
364            transaction: self.full_response.transaction.clone(),
365            balance_changes: self.full_response.balance_changes.clone(),
366            object_changes: self.full_response.object_changes.clone(),
367            ..self.response
368        };
369        self
370    }
371
372    pub fn build(self) -> IotaTransactionBlockResponse {
373        IotaTransactionBlockResponse {
374            transaction: self.response.transaction,
375            raw_transaction: self.response.raw_transaction,
376            effects: self.response.effects,
377            events: self.response.events,
378            balance_changes: self.response.balance_changes,
379            object_changes: self.response.object_changes,
380            // Use full response for any fields that aren't showable
381            ..self.full_response.clone()
382        }
383    }
384}
385
386/// Returns a database URL for testing purposes.
387/// It uses a default user and password, and connects to a local PostgreSQL
388/// instance.
389pub fn db_url(db_name: &str) -> String {
390    format!("postgres://postgres:postgrespw@localhost:5432/{db_name}")
391}
392
393/// Represents a row count result from a SQL query.
394#[derive(QueryableByName, Debug)]
395pub struct RowCount {
396    #[diesel(sql_type = BigInt)]
397    pub cnt: i64,
398}