iota_indexer/
test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::PathBuf};
6
7use diesel::connection::SimpleConnection;
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use secrecy::{ExposeSecret, Secret};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tracing::info;
14
15use crate::{
16    IndexerConfig, IndexerMetrics,
17    db::{ConnectionPoolConfig, new_connection_pool_with_config},
18    errors::IndexerError,
19    handlers::objects_snapshot_handler::SnapshotLagConfig,
20    indexer::Indexer,
21    store::{PgIndexerAnalyticalStore, PgIndexerStore},
22};
23
24/// Type to create hooks to alter initial indexer DB state in tests.
25/// Those hooks are meant to be called after DB reset (if it occurs) and before
26/// indexer is started.
27///
28/// Example:
29///
30/// ```ignore
31/// let emulate_insertion_order_set_earlier_by_optimistic_indexing: DBInitHook =
32///     Box::new(move |pg_store: &PgIndexerStore| {
33///         transactional_blocking_with_retry!(
34///             &pg_store.blocking_cp(),
35///             |conn| {
36///                 insert_or_ignore_into!(
37///                     tx_insertion_order::table,
38///                     (
39///                         tx_insertion_order::dsl::tx_digest.eq(digest.inner().to_vec()),
40///                         tx_insertion_order::dsl::insertion_order.eq(123),
41///                     ),
42///                     conn
43///                 );
44///                 Ok::<(), IndexerError>(())
45///             },
46///             Duration::from_secs(60)
47///         )
48///             .unwrap()
49///     });
50///
51/// let (_, pg_store, _) = start_simulacrum_rest_api_with_write_indexer(
52///     Arc::new(sim),
53///     data_ingestion_path,
54///     None,
55///     Some("indexer_ingestion_tests_db"),
56///     Some(emulate_insertion_order_set_earlier_by_optimistic_indexing),
57/// )
58/// .await;
59/// ```
60pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
61
62pub enum IndexerTypeConfig {
63    Reader {
64        reader_mode_rpc_url: String,
65    },
66    Writer {
67        snapshot_config: SnapshotLagConfig,
68        epochs_to_keep: Option<u64>,
69    },
70    AnalyticalWorker,
71}
72
73impl IndexerTypeConfig {
74    pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
75        Self::Reader {
76            reader_mode_rpc_url,
77        }
78    }
79
80    pub fn writer_mode(
81        snapshot_config: Option<SnapshotLagConfig>,
82        epochs_to_keep: Option<u64>,
83    ) -> Self {
84        Self::Writer {
85            snapshot_config: snapshot_config.unwrap_or_default(),
86            epochs_to_keep,
87        }
88    }
89}
90
91pub async fn start_test_indexer(
92    db_url: String,
93    reset_db: bool,
94    db_init_hook: Option<DBInitHook>,
95    rpc_url: String,
96    reader_writer_config: IndexerTypeConfig,
97    data_ingestion_path: Option<PathBuf>,
98) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
99    start_test_indexer_impl(
100        db_url,
101        reset_db,
102        db_init_hook,
103        rpc_url,
104        reader_writer_config,
105        data_ingestion_path,
106        CancellationToken::new(),
107    )
108    .await
109}
110
111/// Starts an indexer reader or writer for testing depending on the
112/// `reader_writer_config`.
113pub async fn start_test_indexer_impl(
114    db_url: String,
115    reset_db: bool,
116    db_init_hook: Option<DBInitHook>,
117    rpc_url: String,
118    reader_writer_config: IndexerTypeConfig,
119    data_ingestion_path: Option<PathBuf>,
120    cancel: CancellationToken,
121) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
122    let mut config = IndexerConfig {
123        db_url: Some(db_url.clone().into()),
124        // As fallback sync mechanism enable Rest Api if `data_ingestion_path` was not provided
125        remote_store_url: data_ingestion_path
126            .is_none()
127            .then_some(format!("{rpc_url}/api/v1")),
128        rpc_client_url: rpc_url,
129        reset_db,
130        fullnode_sync_worker: true,
131        rpc_server_worker: false,
132        data_ingestion_path,
133        ..Default::default()
134    };
135
136    let store = create_pg_store(config.get_db_url().unwrap(), reset_db);
137    if config.reset_db {
138        crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
139    }
140    if let Some(db_init_hook) = db_init_hook {
141        db_init_hook(&store);
142    }
143
144    let registry = prometheus::Registry::default();
145    let handle = match reader_writer_config {
146        IndexerTypeConfig::Reader {
147            reader_mode_rpc_url,
148        } => {
149            let reader_mode_rpc_url = reader_mode_rpc_url
150                .parse::<SocketAddr>()
151                .expect("Unable to parse fullnode address");
152            config.fullnode_sync_worker = false;
153            config.rpc_server_worker = true;
154            config.rpc_server_url = reader_mode_rpc_url.ip().to_string();
155            config.rpc_server_port = reader_mode_rpc_url.port();
156            tokio::spawn(async move { Indexer::start_reader(&config, &registry, db_url).await })
157        }
158        IndexerTypeConfig::Writer {
159            snapshot_config,
160            epochs_to_keep,
161        } => {
162            let store_clone = store.clone();
163
164            init_metrics(&registry);
165            let indexer_metrics = IndexerMetrics::new(&registry);
166
167            tokio::spawn(async move {
168                Indexer::start_writer_with_config(
169                    &config,
170                    store_clone,
171                    indexer_metrics,
172                    snapshot_config,
173                    epochs_to_keep,
174                    cancel,
175                )
176                .await
177            })
178        }
179        IndexerTypeConfig::AnalyticalWorker => {
180            let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
181
182            init_metrics(&registry);
183            let indexer_metrics = IndexerMetrics::new(&registry);
184
185            tokio::spawn(
186                async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
187            )
188        }
189    };
190
191    (store, handle)
192}
193
194pub fn create_pg_store(db_url: Secret<String>, reset_database: bool) -> PgIndexerStore {
195    // Reduce the connection pool size to 10 for testing
196    // to prevent maxing out
197    info!("Setting DB_POOL_SIZE to 10");
198    std::env::set_var("DB_POOL_SIZE", "10");
199
200    // Set connection timeout for tests to 1 second
201    let pool_config = ConnectionPoolConfig::default();
202
203    let registry = prometheus::Registry::default();
204
205    init_metrics(&registry);
206
207    let indexer_metrics = IndexerMetrics::new(&registry);
208
209    let mut parsed_url = db_url.clone();
210    if reset_database {
211        let db_name = parsed_url.expose_secret().split('/').next_back().unwrap();
212        // Switch to default to create a new database
213        let (default_db_url, _) = replace_db_name(parsed_url.expose_secret(), "postgres");
214
215        // Open in default mode
216        let blocking_pool =
217            new_connection_pool_with_config(&default_db_url, Some(5), pool_config).unwrap();
218        let mut default_conn = blocking_pool.get().unwrap();
219
220        // Delete the old db if it exists
221        default_conn
222            .batch_execute(&format!("DROP DATABASE IF EXISTS {}", db_name))
223            .unwrap();
224
225        // Create the new db
226        default_conn
227            .batch_execute(&format!("CREATE DATABASE {}", db_name))
228            .unwrap();
229        parsed_url = replace_db_name(parsed_url.expose_secret(), db_name)
230            .0
231            .into();
232    }
233
234    let blocking_pool =
235        new_connection_pool_with_config(parsed_url.expose_secret(), Some(5), pool_config).unwrap();
236    PgIndexerStore::new(blocking_pool.clone(), indexer_metrics.clone())
237}
238
239fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
240    let pos = db_url.rfind('/').expect("Unable to find / in db_url");
241    let old_db_name = &db_url[pos + 1..];
242
243    (
244        format!("{}/{}", &db_url[..pos], new_db_name),
245        old_db_name.to_string(),
246    )
247}
248
249pub async fn force_delete_database(db_url: String) {
250    // Replace the database name with the default `postgres`, which should be the
251    // last string after `/` This is necessary because you can't drop a database
252    // while being connected to it. Hence switch to the default `postgres`
253    // database to drop the active database.
254    let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
255    let pool_config = ConnectionPoolConfig::default();
256
257    let blocking_pool =
258        new_connection_pool_with_config(&default_db_url, Some(5), pool_config).unwrap();
259    blocking_pool
260        .get()
261        .unwrap()
262        .batch_execute(&format!("DROP DATABASE IF EXISTS {} WITH (FORCE)", db_name))
263        .unwrap();
264}
265
266#[derive(Clone)]
267pub struct IotaTransactionBlockResponseBuilder<'a> {
268    response: IotaTransactionBlockResponse,
269    full_response: &'a IotaTransactionBlockResponse,
270}
271
272impl<'a> IotaTransactionBlockResponseBuilder<'a> {
273    pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
274        Self {
275            response: IotaTransactionBlockResponse::default(),
276            full_response,
277        }
278    }
279
280    pub fn with_input(mut self) -> Self {
281        self.response = IotaTransactionBlockResponse {
282            transaction: self.full_response.transaction.clone(),
283            ..self.response
284        };
285        self
286    }
287
288    pub fn with_raw_input(mut self) -> Self {
289        self.response = IotaTransactionBlockResponse {
290            raw_transaction: self.full_response.raw_transaction.clone(),
291            ..self.response
292        };
293        self
294    }
295
296    pub fn with_effects(mut self) -> Self {
297        self.response = IotaTransactionBlockResponse {
298            effects: self.full_response.effects.clone(),
299            ..self.response
300        };
301        self
302    }
303
304    pub fn with_events(mut self) -> Self {
305        self.response = IotaTransactionBlockResponse {
306            events: self.full_response.events.clone(),
307            ..self.response
308        };
309        self
310    }
311
312    pub fn with_balance_changes(mut self) -> Self {
313        self.response = IotaTransactionBlockResponse {
314            balance_changes: self.full_response.balance_changes.clone(),
315            ..self.response
316        };
317        self
318    }
319
320    pub fn with_object_changes(mut self) -> Self {
321        self.response = IotaTransactionBlockResponse {
322            object_changes: self.full_response.object_changes.clone(),
323            ..self.response
324        };
325        self
326    }
327
328    pub fn with_input_and_changes(mut self) -> Self {
329        self.response = IotaTransactionBlockResponse {
330            transaction: self.full_response.transaction.clone(),
331            balance_changes: self.full_response.balance_changes.clone(),
332            object_changes: self.full_response.object_changes.clone(),
333            ..self.response
334        };
335        self
336    }
337
338    pub fn build(self) -> IotaTransactionBlockResponse {
339        IotaTransactionBlockResponse {
340            transaction: self.response.transaction,
341            raw_transaction: self.response.raw_transaction,
342            effects: self.response.effects,
343            events: self.response.events,
344            balance_changes: self.response.balance_changes,
345            object_changes: self.response.object_changes,
346            // Use full response for any fields that aren't showable
347            ..self.full_response.clone()
348        }
349    }
350}