iota_indexer/
test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::PathBuf;
6
7use diesel::{QueryableByName, connection::SimpleConnection, sql_types::BigInt};
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::{
14    IndexerMetrics,
15    config::{IngestionConfig, IotaNamesOptions, RetentionConfig, SnapshotLagConfig},
16    db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
17    errors::IndexerError,
18    indexer::Indexer,
19    store::{PgIndexerAnalyticalStore, PgIndexerStore},
20};
21
22/// Type to create hooks to alter initial indexer DB state in tests.
23/// Those hooks are meant to be called after DB reset (if it occurs) and before
24/// indexer is started.
25///
26/// Example:
27///
28/// ```ignore
29/// let emulate_insertion_order_set_earlier_by_optimistic_indexing: DBInitHook =
30///     Box::new(move |pg_store: &PgIndexerStore| {
31///         transactional_blocking_with_retry!(
32///             &pg_store.blocking_cp(),
33///             |conn| {
34///                 insert_or_ignore_into!(
35///                     tx_insertion_order::table,
36///                     (
37///                         tx_insertion_order::dsl::tx_digest.eq(digest.inner().to_vec()),
38///                         tx_insertion_order::dsl::insertion_order.eq(123),
39///                     ),
40///                     conn
41///                 );
42///                 Ok::<(), IndexerError>(())
43///             },
44///             Duration::from_secs(60)
45///         )
46///             .unwrap()
47///     });
48///
49/// let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
50///     Arc::new(sim),
51///     data_ingestion_path,
52///     None,
53///     Some("indexer_ingestion_tests_db"),
54///     Some(emulate_insertion_order_set_earlier_by_optimistic_indexing),
55/// )
56/// .await;
57/// ```
58pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
59
60pub enum IndexerTypeConfig {
61    Reader {
62        reader_mode_rpc_url: String,
63    },
64    Writer {
65        snapshot_config: SnapshotLagConfig,
66        retention_config: Option<RetentionConfig>,
67    },
68    AnalyticalWorker,
69}
70
71impl IndexerTypeConfig {
72    pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
73        Self::Reader {
74            reader_mode_rpc_url,
75        }
76    }
77
78    pub fn writer_mode(
79        snapshot_config: Option<SnapshotLagConfig>,
80        epochs_to_keep: Option<u64>,
81    ) -> Self {
82        Self::Writer {
83            snapshot_config: snapshot_config.unwrap_or_default(),
84            retention_config: epochs_to_keep
85                .map(RetentionConfig::new_with_default_retention_only_for_testing),
86        }
87    }
88}
89
90pub async fn start_test_indexer(
91    db_url: String,
92    reset_db: bool,
93    db_init_hook: Option<DBInitHook>,
94    rpc_url: String,
95    reader_writer_config: IndexerTypeConfig,
96    data_ingestion_path: Option<PathBuf>,
97) -> (
98    PgIndexerStore,
99    JoinHandle<Result<(), IndexerError>>,
100    CancellationToken,
101) {
102    let token = CancellationToken::new();
103    let (store, handle) = start_test_indexer_impl(
104        db_url,
105        reset_db,
106        db_init_hook,
107        rpc_url,
108        reader_writer_config,
109        data_ingestion_path,
110        token.clone(),
111    )
112    .await;
113    (store, handle, token)
114}
115
116/// Starts an indexer reader or writer for testing depending on the
117/// `reader_writer_config`.
118pub async fn start_test_indexer_impl(
119    db_url: String,
120    reset_db: bool,
121    db_init_hook: Option<DBInitHook>,
122    rpc_url: String,
123    reader_writer_config: IndexerTypeConfig,
124    data_ingestion_path: Option<PathBuf>,
125    cancel: CancellationToken,
126) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
127    let store = create_pg_store(&db_url, reset_db);
128    if reset_db {
129        crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
130    }
131    if let Some(db_init_hook) = db_init_hook {
132        db_init_hook(&store);
133    }
134
135    let registry = prometheus::Registry::default();
136    init_metrics(&registry);
137    let indexer_metrics = IndexerMetrics::new(&registry);
138
139    let handle = match reader_writer_config {
140        IndexerTypeConfig::Reader {
141            reader_mode_rpc_url,
142        } => {
143            let config = crate::config::JsonRpcConfig {
144                iota_names_options: IotaNamesOptions::default(),
145                rpc_address: reader_mode_rpc_url.parse().unwrap(),
146                rpc_client_url: rpc_url,
147            };
148            let pool = store.blocking_cp();
149            let store_clone = store.clone();
150            tokio::spawn(async move {
151                Indexer::start_reader(&config, store_clone, &registry, pool, indexer_metrics).await
152            })
153        }
154        IndexerTypeConfig::Writer {
155            snapshot_config,
156            retention_config,
157        } => {
158            let store_clone = store.clone();
159            let mut ingestion_config = IngestionConfig::default();
160            ingestion_config.sources.remote_store_url = data_ingestion_path
161                .is_none()
162                .then_some(format!("{rpc_url}/api/v1").parse().unwrap());
163            ingestion_config.sources.data_ingestion_path = data_ingestion_path;
164            ingestion_config.sources.rpc_client_url = Some(rpc_url.parse().unwrap());
165
166            tokio::spawn(async move {
167                Indexer::start_writer_with_config(
168                    &ingestion_config,
169                    store_clone,
170                    indexer_metrics,
171                    snapshot_config,
172                    retention_config,
173                    cancel,
174                )
175                .await
176            })
177        }
178        IndexerTypeConfig::AnalyticalWorker => {
179            let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
180
181            tokio::spawn(
182                async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
183            )
184        }
185    };
186
187    (store, handle)
188}
189
190/// Manage a test database for integration tests.
191pub struct TestDatabase {
192    pub url: String,
193    db_name: String,
194    connection: PoolConnection,
195    pool_config: ConnectionPoolConfig,
196}
197
198impl TestDatabase {
199    pub fn new(db_url: String) -> Self {
200        // Reduce the connection pool size to 5 for testing
201        // to prevent maxing out
202        let pool_config = ConnectionPoolConfig {
203            pool_size: 5,
204            ..Default::default()
205        };
206
207        let db_name = db_url.split('/').next_back().unwrap().into();
208        let (default_url, _) = replace_db_name(&db_url, "postgres");
209        let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
210        let connection = blocking_pool.get().unwrap();
211        Self {
212            url: db_url,
213            db_name,
214            connection,
215            pool_config,
216        }
217    }
218
219    /// Drop the database in the server if it exists.
220    pub fn drop_if_exists(&mut self) {
221        self.connection
222            .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
223            .unwrap();
224    }
225
226    /// Create the database in the server.
227    pub fn create(&mut self) {
228        self.connection
229            .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
230            .unwrap();
231    }
232
233    /// Drop and recreate the database in the server.
234    pub fn recreate(&mut self) {
235        self.drop_if_exists();
236        self.create();
237    }
238
239    /// Create a new connection pool to the database.
240    pub fn to_connection_pool(&self) -> ConnectionPool {
241        new_connection_pool(&self.url, &self.pool_config).unwrap()
242    }
243
244    pub fn reset_db(&mut self) {
245        crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
246    }
247}
248
249pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
250    let registry = prometheus::Registry::default();
251    init_metrics(&registry);
252    let indexer_metrics = IndexerMetrics::new(&registry);
253
254    let mut test_db = TestDatabase::new(db_url.to_string());
255    if reset_database {
256        test_db.recreate();
257    }
258
259    PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics.clone())
260}
261
262fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
263    let pos = db_url.rfind('/').expect("Unable to find / in db_url");
264    let old_db_name = &db_url[pos + 1..];
265
266    (
267        format!("{}/{}", &db_url[..pos], new_db_name),
268        old_db_name.to_string(),
269    )
270}
271
272pub async fn force_delete_database(db_url: String) {
273    // Replace the database name with the default `postgres`, which should be the
274    // last string after `/` This is necessary because you can't drop a database
275    // while being connected to it. Hence switch to the default `postgres`
276    // database to drop the active database.
277    let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
278    let mut pool_config = ConnectionPoolConfig::default();
279    pool_config.set_pool_size(1);
280
281    let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
282    blocking_pool
283        .get()
284        .unwrap()
285        .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
286        .unwrap();
287}
288
289#[derive(Clone)]
290pub struct IotaTransactionBlockResponseBuilder<'a> {
291    response: IotaTransactionBlockResponse,
292    full_response: &'a IotaTransactionBlockResponse,
293}
294
295impl<'a> IotaTransactionBlockResponseBuilder<'a> {
296    pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
297        Self {
298            response: IotaTransactionBlockResponse::default(),
299            full_response,
300        }
301    }
302
303    pub fn with_input(mut self) -> Self {
304        self.response = IotaTransactionBlockResponse {
305            transaction: self.full_response.transaction.clone(),
306            ..self.response
307        };
308        self
309    }
310
311    pub fn with_raw_input(mut self) -> Self {
312        self.response = IotaTransactionBlockResponse {
313            raw_transaction: self.full_response.raw_transaction.clone(),
314            ..self.response
315        };
316        self
317    }
318
319    pub fn with_effects(mut self) -> Self {
320        self.response = IotaTransactionBlockResponse {
321            effects: self.full_response.effects.clone(),
322            ..self.response
323        };
324        self
325    }
326
327    pub fn with_events(mut self) -> Self {
328        self.response = IotaTransactionBlockResponse {
329            events: self.full_response.events.clone(),
330            ..self.response
331        };
332        self
333    }
334
335    pub fn with_balance_changes(mut self) -> Self {
336        self.response = IotaTransactionBlockResponse {
337            balance_changes: self.full_response.balance_changes.clone(),
338            ..self.response
339        };
340        self
341    }
342
343    pub fn with_object_changes(mut self) -> Self {
344        self.response = IotaTransactionBlockResponse {
345            object_changes: self.full_response.object_changes.clone(),
346            ..self.response
347        };
348        self
349    }
350
351    pub fn with_input_and_changes(mut self) -> Self {
352        self.response = IotaTransactionBlockResponse {
353            transaction: self.full_response.transaction.clone(),
354            balance_changes: self.full_response.balance_changes.clone(),
355            object_changes: self.full_response.object_changes.clone(),
356            ..self.response
357        };
358        self
359    }
360
361    pub fn build(self) -> IotaTransactionBlockResponse {
362        IotaTransactionBlockResponse {
363            transaction: self.response.transaction,
364            raw_transaction: self.response.raw_transaction,
365            effects: self.response.effects,
366            events: self.response.events,
367            balance_changes: self.response.balance_changes,
368            object_changes: self.response.object_changes,
369            // Use full response for any fields that aren't showable
370            ..self.full_response.clone()
371        }
372    }
373}
374
375/// Returns a database URL for testing purposes.
376/// It uses a default user and password, and connects to a local PostgreSQL
377/// instance.
378pub fn db_url(db_name: &str) -> String {
379    format!("postgres://postgres:postgrespw@localhost:5432/{db_name}")
380}
381
382/// Represents a row count result from a SQL query.
383#[derive(QueryableByName, Debug)]
384pub struct RowCount {
385    #[diesel(sql_type = BigInt)]
386    pub cnt: i64,
387}