iota_indexer/
test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::PathBuf};
6
7use diesel::connection::SimpleConnection;
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use secrecy::{ExposeSecret, Secret};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tracing::info;
14
15use crate::{
16    IndexerConfig, IndexerMetrics,
17    db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool_with_config},
18    errors::IndexerError,
19    handlers::objects_snapshot_handler::SnapshotLagConfig,
20    indexer::Indexer,
21    store::{PgIndexerAnalyticalStore, PgIndexerStore},
22};
23
24/// Type to create hooks to alter initial indexer DB state in tests.
25/// Those hooks are meant to be called after DB reset (if it occurs) and before
26/// indexer is started.
27///
28/// Example:
29///
30/// ```ignore
31/// let emulate_insertion_order_set_earlier_by_optimistic_indexing: DBInitHook =
32///     Box::new(move |pg_store: &PgIndexerStore| {
33///         transactional_blocking_with_retry!(
34///             &pg_store.blocking_cp(),
35///             |conn| {
36///                 insert_or_ignore_into!(
37///                     tx_insertion_order::table,
38///                     (
39///                         tx_insertion_order::dsl::tx_digest.eq(digest.inner().to_vec()),
40///                         tx_insertion_order::dsl::insertion_order.eq(123),
41///                     ),
42///                     conn
43///                 );
44///                 Ok::<(), IndexerError>(())
45///             },
46///             Duration::from_secs(60)
47///         )
48///             .unwrap()
49///     });
50///
51/// let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
52///     Arc::new(sim),
53///     data_ingestion_path,
54///     None,
55///     Some("indexer_ingestion_tests_db"),
56///     Some(emulate_insertion_order_set_earlier_by_optimistic_indexing),
57/// )
58/// .await;
59/// ```
60pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
61
62pub enum IndexerTypeConfig {
63    Reader {
64        reader_mode_rpc_url: String,
65    },
66    Writer {
67        snapshot_config: SnapshotLagConfig,
68        epochs_to_keep: Option<u64>,
69    },
70    AnalyticalWorker,
71}
72
73impl IndexerTypeConfig {
74    pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
75        Self::Reader {
76            reader_mode_rpc_url,
77        }
78    }
79
80    pub fn writer_mode(
81        snapshot_config: Option<SnapshotLagConfig>,
82        epochs_to_keep: Option<u64>,
83    ) -> Self {
84        Self::Writer {
85            snapshot_config: snapshot_config.unwrap_or_default(),
86            epochs_to_keep,
87        }
88    }
89}
90
91pub async fn start_test_indexer(
92    db_url: String,
93    reset_db: bool,
94    db_init_hook: Option<DBInitHook>,
95    rpc_url: String,
96    reader_writer_config: IndexerTypeConfig,
97    data_ingestion_path: Option<PathBuf>,
98) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
99    start_test_indexer_impl(
100        db_url,
101        reset_db,
102        db_init_hook,
103        rpc_url,
104        reader_writer_config,
105        data_ingestion_path,
106        CancellationToken::new(),
107    )
108    .await
109}
110
111/// Starts an indexer reader or writer for testing depending on the
112/// `reader_writer_config`.
113pub async fn start_test_indexer_impl(
114    db_url: String,
115    reset_db: bool,
116    db_init_hook: Option<DBInitHook>,
117    rpc_url: String,
118    reader_writer_config: IndexerTypeConfig,
119    data_ingestion_path: Option<PathBuf>,
120    cancel: CancellationToken,
121) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
122    let mut config = IndexerConfig {
123        db_url: Some(db_url.clone().into()),
124        // As fallback sync mechanism enable Rest Api if `data_ingestion_path` was not provided
125        remote_store_url: data_ingestion_path
126            .is_none()
127            .then_some(format!("{rpc_url}/api/v1")),
128        rpc_client_url: rpc_url,
129        reset_db,
130        fullnode_sync_worker: true,
131        rpc_server_worker: false,
132        data_ingestion_path,
133        ..Default::default()
134    };
135
136    let store = create_pg_store(config.get_db_url().unwrap(), reset_db);
137    if config.reset_db {
138        crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
139    }
140    if let Some(db_init_hook) = db_init_hook {
141        db_init_hook(&store);
142    }
143
144    let registry = prometheus::Registry::default();
145    let handle = match reader_writer_config {
146        IndexerTypeConfig::Reader {
147            reader_mode_rpc_url,
148        } => {
149            let reader_mode_rpc_url = reader_mode_rpc_url
150                .parse::<SocketAddr>()
151                .expect("Unable to parse fullnode address");
152            config.fullnode_sync_worker = false;
153            config.rpc_server_worker = true;
154            config.rpc_server_url = reader_mode_rpc_url.ip().to_string();
155            config.rpc_server_port = reader_mode_rpc_url.port();
156            tokio::spawn(async move { Indexer::start_reader(&config, &registry, db_url).await })
157        }
158        IndexerTypeConfig::Writer {
159            snapshot_config,
160            epochs_to_keep,
161        } => {
162            let store_clone = store.clone();
163
164            init_metrics(&registry);
165            let indexer_metrics = IndexerMetrics::new(&registry);
166
167            tokio::spawn(async move {
168                Indexer::start_writer_with_config(
169                    &config,
170                    store_clone,
171                    indexer_metrics,
172                    snapshot_config,
173                    epochs_to_keep,
174                    cancel,
175                )
176                .await
177            })
178        }
179        IndexerTypeConfig::AnalyticalWorker => {
180            let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
181
182            init_metrics(&registry);
183            let indexer_metrics = IndexerMetrics::new(&registry);
184
185            tokio::spawn(
186                async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
187            )
188        }
189    };
190
191    (store, handle)
192}
193
194/// Manage a test database for integration tests.
195pub struct TestDatabase {
196    pub url: Secret<String>,
197    db_name: String,
198    connection: PoolConnection,
199    pool_config: ConnectionPoolConfig,
200}
201
202impl TestDatabase {
203    pub fn new(db_url: Secret<String>) -> Self {
204        let pool_config = ConnectionPoolConfig::default();
205        let db_name = db_url
206            .expose_secret()
207            .split('/')
208            .next_back()
209            .unwrap()
210            .into();
211        let (default_url, _) = replace_db_name(db_url.expose_secret(), "postgres");
212        let blocking_pool =
213            new_connection_pool_with_config(&default_url, Some(5), pool_config).unwrap();
214        let connection = blocking_pool.get().unwrap();
215        Self {
216            url: db_url,
217            db_name,
218            connection,
219            pool_config,
220        }
221    }
222
223    /// Drop the database in the server if it exists.
224    pub fn drop_if_exists(&mut self) {
225        self.connection
226            .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
227            .unwrap();
228    }
229
230    /// Create the database in the server.
231    pub fn create(&mut self) {
232        self.connection
233            .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
234            .unwrap();
235    }
236
237    /// Drop and recreate the database in the server.
238    pub fn recreate(&mut self) {
239        self.drop_if_exists();
240        self.create();
241    }
242
243    /// Create a new connection pool to the database.
244    pub fn to_connection_pool(&self) -> ConnectionPool {
245        new_connection_pool_with_config(self.url.expose_secret(), Some(5), self.pool_config)
246            .unwrap()
247    }
248
249    pub fn reset_db(&mut self) {
250        crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
251    }
252}
253
254pub fn create_pg_store(db_url: Secret<String>, reset_database: bool) -> PgIndexerStore {
255    // Reduce the connection pool size to 10 for testing
256    // to prevent maxing out
257    info!("Setting DB_POOL_SIZE to 10");
258    std::env::set_var("DB_POOL_SIZE", "10");
259
260    let registry = prometheus::Registry::default();
261
262    init_metrics(&registry);
263
264    let indexer_metrics = IndexerMetrics::new(&registry);
265
266    let mut test_db = TestDatabase::new(db_url);
267    if reset_database {
268        test_db.recreate();
269    }
270
271    PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics.clone())
272}
273
274fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
275    let pos = db_url.rfind('/').expect("Unable to find / in db_url");
276    let old_db_name = &db_url[pos + 1..];
277
278    (
279        format!("{}/{}", &db_url[..pos], new_db_name),
280        old_db_name.to_string(),
281    )
282}
283
284pub async fn force_delete_database(db_url: String) {
285    // Replace the database name with the default `postgres`, which should be the
286    // last string after `/` This is necessary because you can't drop a database
287    // while being connected to it. Hence switch to the default `postgres`
288    // database to drop the active database.
289    let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
290    let pool_config = ConnectionPoolConfig::default();
291
292    let blocking_pool =
293        new_connection_pool_with_config(&default_db_url, Some(5), pool_config).unwrap();
294    blocking_pool
295        .get()
296        .unwrap()
297        .batch_execute(&format!("DROP DATABASE IF EXISTS {} WITH (FORCE)", db_name))
298        .unwrap();
299}
300
301#[derive(Clone)]
302pub struct IotaTransactionBlockResponseBuilder<'a> {
303    response: IotaTransactionBlockResponse,
304    full_response: &'a IotaTransactionBlockResponse,
305}
306
307impl<'a> IotaTransactionBlockResponseBuilder<'a> {
308    pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
309        Self {
310            response: IotaTransactionBlockResponse::default(),
311            full_response,
312        }
313    }
314
315    pub fn with_input(mut self) -> Self {
316        self.response = IotaTransactionBlockResponse {
317            transaction: self.full_response.transaction.clone(),
318            ..self.response
319        };
320        self
321    }
322
323    pub fn with_raw_input(mut self) -> Self {
324        self.response = IotaTransactionBlockResponse {
325            raw_transaction: self.full_response.raw_transaction.clone(),
326            ..self.response
327        };
328        self
329    }
330
331    pub fn with_effects(mut self) -> Self {
332        self.response = IotaTransactionBlockResponse {
333            effects: self.full_response.effects.clone(),
334            ..self.response
335        };
336        self
337    }
338
339    pub fn with_events(mut self) -> Self {
340        self.response = IotaTransactionBlockResponse {
341            events: self.full_response.events.clone(),
342            ..self.response
343        };
344        self
345    }
346
347    pub fn with_balance_changes(mut self) -> Self {
348        self.response = IotaTransactionBlockResponse {
349            balance_changes: self.full_response.balance_changes.clone(),
350            ..self.response
351        };
352        self
353    }
354
355    pub fn with_object_changes(mut self) -> Self {
356        self.response = IotaTransactionBlockResponse {
357            object_changes: self.full_response.object_changes.clone(),
358            ..self.response
359        };
360        self
361    }
362
363    pub fn with_input_and_changes(mut self) -> Self {
364        self.response = IotaTransactionBlockResponse {
365            transaction: self.full_response.transaction.clone(),
366            balance_changes: self.full_response.balance_changes.clone(),
367            object_changes: self.full_response.object_changes.clone(),
368            ..self.response
369        };
370        self
371    }
372
373    pub fn build(self) -> IotaTransactionBlockResponse {
374        IotaTransactionBlockResponse {
375            transaction: self.response.transaction,
376            raw_transaction: self.response.raw_transaction,
377            effects: self.response.effects,
378            events: self.response.events,
379            balance_changes: self.response.balance_changes,
380            object_changes: self.response.object_changes,
381            // Use full response for any fields that aren't showable
382            ..self.full_response.clone()
383        }
384    }
385}