iota_indexer/
test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::PathBuf;
6
7use diesel::connection::SimpleConnection;
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::{
14    IndexerMetrics,
15    config::{IngestionConfig, IotaNamesOptions, PruningOptions, SnapshotLagConfig},
16    db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
17    errors::IndexerError,
18    indexer::Indexer,
19    store::{PgIndexerAnalyticalStore, PgIndexerStore},
20};
21
22/// Type to create hooks to alter initial indexer DB state in tests.
23/// Those hooks are meant to be called after DB reset (if it occurs) and before
24/// indexer is started.
25///
26/// Example:
27///
28/// ```ignore
29/// let emulate_insertion_order_set_earlier_by_optimistic_indexing: DBInitHook =
30///     Box::new(move |pg_store: &PgIndexerStore| {
31///         transactional_blocking_with_retry!(
32///             &pg_store.blocking_cp(),
33///             |conn| {
34///                 insert_or_ignore_into!(
35///                     tx_insertion_order::table,
36///                     (
37///                         tx_insertion_order::dsl::tx_digest.eq(digest.inner().to_vec()),
38///                         tx_insertion_order::dsl::insertion_order.eq(123),
39///                     ),
40///                     conn
41///                 );
42///                 Ok::<(), IndexerError>(())
43///             },
44///             Duration::from_secs(60)
45///         )
46///             .unwrap()
47///     });
48///
49/// let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
50///     Arc::new(sim),
51///     data_ingestion_path,
52///     None,
53///     Some("indexer_ingestion_tests_db"),
54///     Some(emulate_insertion_order_set_earlier_by_optimistic_indexing),
55/// )
56/// .await;
57/// ```
58pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
59
60pub enum IndexerTypeConfig {
61    Reader {
62        reader_mode_rpc_url: String,
63    },
64    Writer {
65        snapshot_config: SnapshotLagConfig,
66        pruning_options: PruningOptions,
67    },
68    AnalyticalWorker,
69}
70
71impl IndexerTypeConfig {
72    pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
73        Self::Reader {
74            reader_mode_rpc_url,
75        }
76    }
77
78    pub fn writer_mode(
79        snapshot_config: Option<SnapshotLagConfig>,
80        epochs_to_keep: Option<u64>,
81    ) -> Self {
82        Self::Writer {
83            snapshot_config: snapshot_config.unwrap_or_default(),
84            pruning_options: PruningOptions { epochs_to_keep },
85        }
86    }
87}
88
89pub async fn start_test_indexer(
90    db_url: String,
91    reset_db: bool,
92    db_init_hook: Option<DBInitHook>,
93    rpc_url: String,
94    reader_writer_config: IndexerTypeConfig,
95    data_ingestion_path: Option<PathBuf>,
96) -> (
97    PgIndexerStore,
98    JoinHandle<Result<(), IndexerError>>,
99    CancellationToken,
100) {
101    let token = CancellationToken::new();
102    let (store, handle) = start_test_indexer_impl(
103        db_url,
104        reset_db,
105        db_init_hook,
106        rpc_url,
107        reader_writer_config,
108        data_ingestion_path,
109        token.clone(),
110    )
111    .await;
112    (store, handle, token)
113}
114
115/// Starts an indexer reader or writer for testing depending on the
116/// `reader_writer_config`.
117pub async fn start_test_indexer_impl(
118    db_url: String,
119    reset_db: bool,
120    db_init_hook: Option<DBInitHook>,
121    rpc_url: String,
122    reader_writer_config: IndexerTypeConfig,
123    data_ingestion_path: Option<PathBuf>,
124    cancel: CancellationToken,
125) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
126    let store = create_pg_store(&db_url, reset_db);
127    if reset_db {
128        crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
129    }
130    if let Some(db_init_hook) = db_init_hook {
131        db_init_hook(&store);
132    }
133
134    let registry = prometheus::Registry::default();
135    let handle = match reader_writer_config {
136        IndexerTypeConfig::Reader {
137            reader_mode_rpc_url,
138        } => {
139            let config = crate::config::JsonRpcConfig {
140                iota_names_options: IotaNamesOptions::default(),
141                rpc_address: reader_mode_rpc_url.parse().unwrap(),
142                rpc_client_url: rpc_url,
143            };
144            let pool = store.blocking_cp();
145            tokio::spawn(async move { Indexer::start_reader(&config, &registry, pool).await })
146        }
147        IndexerTypeConfig::Writer {
148            snapshot_config,
149            pruning_options,
150        } => {
151            let store_clone = store.clone();
152            let mut ingestion_config = IngestionConfig::default();
153            ingestion_config.sources.remote_store_url = data_ingestion_path
154                .is_none()
155                .then_some(format!("{rpc_url}/api/v1").parse().unwrap());
156            ingestion_config.sources.data_ingestion_path = data_ingestion_path;
157            ingestion_config.sources.rpc_client_url = Some(rpc_url.parse().unwrap());
158
159            init_metrics(&registry);
160            let indexer_metrics = IndexerMetrics::new(&registry);
161
162            tokio::spawn(async move {
163                Indexer::start_writer_with_config(
164                    &ingestion_config,
165                    store_clone,
166                    indexer_metrics,
167                    snapshot_config,
168                    pruning_options,
169                    cancel,
170                )
171                .await
172            })
173        }
174        IndexerTypeConfig::AnalyticalWorker => {
175            let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
176
177            init_metrics(&registry);
178            let indexer_metrics = IndexerMetrics::new(&registry);
179
180            tokio::spawn(
181                async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
182            )
183        }
184    };
185
186    (store, handle)
187}
188
189/// Manage a test database for integration tests.
190pub struct TestDatabase {
191    pub url: String,
192    db_name: String,
193    connection: PoolConnection,
194    pool_config: ConnectionPoolConfig,
195}
196
197impl TestDatabase {
198    pub fn new(db_url: String) -> Self {
199        // Reduce the connection pool size to 5 for testing
200        // to prevent maxing out
201        let pool_config = ConnectionPoolConfig {
202            pool_size: 5,
203            ..Default::default()
204        };
205
206        let db_name = db_url.split('/').next_back().unwrap().into();
207        let (default_url, _) = replace_db_name(&db_url, "postgres");
208        let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
209        let connection = blocking_pool.get().unwrap();
210        Self {
211            url: db_url,
212            db_name,
213            connection,
214            pool_config,
215        }
216    }
217
218    /// Drop the database in the server if it exists.
219    pub fn drop_if_exists(&mut self) {
220        self.connection
221            .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
222            .unwrap();
223    }
224
225    /// Create the database in the server.
226    pub fn create(&mut self) {
227        self.connection
228            .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
229            .unwrap();
230    }
231
232    /// Drop and recreate the database in the server.
233    pub fn recreate(&mut self) {
234        self.drop_if_exists();
235        self.create();
236    }
237
238    /// Create a new connection pool to the database.
239    pub fn to_connection_pool(&self) -> ConnectionPool {
240        new_connection_pool(&self.url, &self.pool_config).unwrap()
241    }
242
243    pub fn reset_db(&mut self) {
244        crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
245    }
246}
247
248pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
249    let registry = prometheus::Registry::default();
250    init_metrics(&registry);
251    let indexer_metrics = IndexerMetrics::new(&registry);
252
253    let mut test_db = TestDatabase::new(db_url.to_string());
254    if reset_database {
255        test_db.recreate();
256    }
257
258    PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics.clone())
259}
260
261fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
262    let pos = db_url.rfind('/').expect("Unable to find / in db_url");
263    let old_db_name = &db_url[pos + 1..];
264
265    (
266        format!("{}/{}", &db_url[..pos], new_db_name),
267        old_db_name.to_string(),
268    )
269}
270
271pub async fn force_delete_database(db_url: String) {
272    // Replace the database name with the default `postgres`, which should be the
273    // last string after `/` This is necessary because you can't drop a database
274    // while being connected to it. Hence switch to the default `postgres`
275    // database to drop the active database.
276    let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
277    let mut pool_config = ConnectionPoolConfig::default();
278    pool_config.set_pool_size(1);
279
280    let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
281    blocking_pool
282        .get()
283        .unwrap()
284        .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
285        .unwrap();
286}
287
288#[derive(Clone)]
289pub struct IotaTransactionBlockResponseBuilder<'a> {
290    response: IotaTransactionBlockResponse,
291    full_response: &'a IotaTransactionBlockResponse,
292}
293
294impl<'a> IotaTransactionBlockResponseBuilder<'a> {
295    pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
296        Self {
297            response: IotaTransactionBlockResponse::default(),
298            full_response,
299        }
300    }
301
302    pub fn with_input(mut self) -> Self {
303        self.response = IotaTransactionBlockResponse {
304            transaction: self.full_response.transaction.clone(),
305            ..self.response
306        };
307        self
308    }
309
310    pub fn with_raw_input(mut self) -> Self {
311        self.response = IotaTransactionBlockResponse {
312            raw_transaction: self.full_response.raw_transaction.clone(),
313            ..self.response
314        };
315        self
316    }
317
318    pub fn with_effects(mut self) -> Self {
319        self.response = IotaTransactionBlockResponse {
320            effects: self.full_response.effects.clone(),
321            ..self.response
322        };
323        self
324    }
325
326    pub fn with_events(mut self) -> Self {
327        self.response = IotaTransactionBlockResponse {
328            events: self.full_response.events.clone(),
329            ..self.response
330        };
331        self
332    }
333
334    pub fn with_balance_changes(mut self) -> Self {
335        self.response = IotaTransactionBlockResponse {
336            balance_changes: self.full_response.balance_changes.clone(),
337            ..self.response
338        };
339        self
340    }
341
342    pub fn with_object_changes(mut self) -> Self {
343        self.response = IotaTransactionBlockResponse {
344            object_changes: self.full_response.object_changes.clone(),
345            ..self.response
346        };
347        self
348    }
349
350    pub fn with_input_and_changes(mut self) -> Self {
351        self.response = IotaTransactionBlockResponse {
352            transaction: self.full_response.transaction.clone(),
353            balance_changes: self.full_response.balance_changes.clone(),
354            object_changes: self.full_response.object_changes.clone(),
355            ..self.response
356        };
357        self
358    }
359
360    pub fn build(self) -> IotaTransactionBlockResponse {
361        IotaTransactionBlockResponse {
362            transaction: self.response.transaction,
363            raw_transaction: self.response.raw_transaction,
364            effects: self.response.effects,
365            events: self.response.events,
366            balance_changes: self.response.balance_changes,
367            object_changes: self.response.object_changes,
368            // Use full response for any fields that aren't showable
369            ..self.full_response.clone()
370        }
371    }
372}