1use std::path::PathBuf;
6
7use diesel::{QueryableByName, connection::SimpleConnection, sql_types::BigInt};
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::{
14 IndexerMetrics,
15 config::{
16 IngestionConfig, IotaNamesOptions, PruningOptions, RetentionConfig, SnapshotLagConfig,
17 },
18 db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
19 errors::IndexerError,
20 indexer::Indexer,
21 store::{PgIndexerAnalyticalStore, PgIndexerStore},
22};
23
24pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
61
62pub enum IndexerTypeConfig {
63 Reader {
64 reader_mode_rpc_url: String,
65 },
66 Writer {
67 snapshot_config: SnapshotLagConfig,
68 retention_config: Option<RetentionConfig>,
69 optimistic_pruner_batch_size: Option<u64>,
70 },
71 AnalyticalWorker,
72}
73
74impl IndexerTypeConfig {
75 pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
76 Self::Reader {
77 reader_mode_rpc_url,
78 }
79 }
80
81 pub fn writer_mode(
82 snapshot_config: Option<SnapshotLagConfig>,
83 pruning_options: Option<PruningOptions>,
84 ) -> Self {
85 Self::Writer {
86 snapshot_config: snapshot_config.unwrap_or_default(),
87 retention_config: pruning_options.as_ref().and_then(|pruning_options| {
88 pruning_options
89 .epochs_to_keep
90 .map(RetentionConfig::new_with_default_retention_only_for_testing)
91 }),
92 optimistic_pruner_batch_size: pruning_options
93 .and_then(|pruning_options| pruning_options.optimistic_pruner_batch_size),
94 }
95 }
96}
97
98pub async fn start_test_indexer(
99 db_url: String,
100 reset_db: bool,
101 db_init_hook: Option<DBInitHook>,
102 rpc_url: String,
103 reader_writer_config: IndexerTypeConfig,
104 data_ingestion_path: Option<PathBuf>,
105) -> (
106 PgIndexerStore,
107 JoinHandle<Result<(), IndexerError>>,
108 CancellationToken,
109) {
110 let token = CancellationToken::new();
111 let (store, handle) = start_test_indexer_impl(
112 db_url,
113 reset_db,
114 db_init_hook,
115 rpc_url,
116 reader_writer_config,
117 data_ingestion_path,
118 token.clone(),
119 )
120 .await;
121 (store, handle, token)
122}
123
124pub async fn start_test_indexer_impl(
127 db_url: String,
128 reset_db: bool,
129 db_init_hook: Option<DBInitHook>,
130 rpc_url: String,
131 reader_writer_config: IndexerTypeConfig,
132 data_ingestion_path: Option<PathBuf>,
133 cancel: CancellationToken,
134) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
135 let store = create_pg_store(&db_url, reset_db);
136 if reset_db {
137 crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
138 }
139 if let Some(db_init_hook) = db_init_hook {
140 db_init_hook(&store);
141 }
142
143 let registry = prometheus::Registry::default();
144 init_metrics(®istry);
145 let indexer_metrics = IndexerMetrics::new(®istry);
146
147 let handle = match reader_writer_config {
148 IndexerTypeConfig::Reader {
149 reader_mode_rpc_url,
150 } => {
151 let config = crate::config::JsonRpcConfig {
152 iota_names_options: IotaNamesOptions::default(),
153 historic_fallback_options: Default::default(),
154 rpc_address: reader_mode_rpc_url.parse().unwrap(),
155 rpc_client_url: rpc_url,
156 };
157 let pool = store.blocking_cp();
158 let store_clone = store.clone();
159 tokio::spawn(async move {
160 Indexer::start_reader(&config, store_clone, ®istry, pool, indexer_metrics).await
161 })
162 }
163 IndexerTypeConfig::Writer {
164 snapshot_config,
165 retention_config,
166 optimistic_pruner_batch_size,
167 } => {
168 let store_clone = store.clone();
169 let mut ingestion_config = IngestionConfig::default();
170 ingestion_config.sources.remote_store_url = data_ingestion_path
171 .is_none()
172 .then_some(format!("{rpc_url}/api/v1").parse().unwrap());
173 ingestion_config.sources.data_ingestion_path = data_ingestion_path;
174 ingestion_config.sources.rpc_client_url = Some(rpc_url.parse().unwrap());
175
176 tokio::spawn(async move {
177 Indexer::start_writer_with_config(
178 &ingestion_config,
179 store_clone,
180 indexer_metrics,
181 snapshot_config,
182 retention_config,
183 optimistic_pruner_batch_size,
184 cancel,
185 )
186 .await
187 })
188 }
189 IndexerTypeConfig::AnalyticalWorker => {
190 let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
191
192 tokio::spawn(
193 async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
194 )
195 }
196 };
197
198 (store, handle)
199}
200
201pub struct TestDatabase {
203 pub url: String,
204 db_name: String,
205 connection: PoolConnection,
206 pool_config: ConnectionPoolConfig,
207}
208
209impl TestDatabase {
210 pub fn new(db_url: String) -> Self {
211 let pool_config = ConnectionPoolConfig {
214 pool_size: 5,
215 ..Default::default()
216 };
217
218 let db_name = db_url.split('/').next_back().unwrap().into();
219 let (default_url, _) = replace_db_name(&db_url, "postgres");
220 let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
221 let connection = blocking_pool.get().unwrap();
222 Self {
223 url: db_url,
224 db_name,
225 connection,
226 pool_config,
227 }
228 }
229
230 pub fn drop_if_exists(&mut self) {
232 self.connection
233 .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
234 .unwrap();
235 }
236
237 pub fn create(&mut self) {
239 self.connection
240 .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
241 .unwrap();
242 }
243
244 pub fn recreate(&mut self) {
246 self.drop_if_exists();
247 self.create();
248 }
249
250 pub fn to_connection_pool(&self) -> ConnectionPool {
252 new_connection_pool(&self.url, &self.pool_config).unwrap()
253 }
254
255 pub fn reset_db(&mut self) {
256 crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
257 }
258}
259
260pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
261 let registry = prometheus::Registry::default();
262 init_metrics(®istry);
263 let indexer_metrics = IndexerMetrics::new(®istry);
264
265 let mut test_db = TestDatabase::new(db_url.to_string());
266 if reset_database {
267 test_db.recreate();
268 }
269
270 PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics.clone())
271}
272
273fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
274 let pos = db_url.rfind('/').expect("unable to find / in db_url");
275 let old_db_name = &db_url[pos + 1..];
276
277 (
278 format!("{}/{}", &db_url[..pos], new_db_name),
279 old_db_name.to_string(),
280 )
281}
282
283pub async fn force_delete_database(db_url: String) {
284 let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
289 let mut pool_config = ConnectionPoolConfig::default();
290 pool_config.set_pool_size(1);
291
292 let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
293 blocking_pool
294 .get()
295 .unwrap()
296 .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
297 .unwrap();
298}
299
300#[derive(Clone)]
301pub struct IotaTransactionBlockResponseBuilder<'a> {
302 response: IotaTransactionBlockResponse,
303 full_response: &'a IotaTransactionBlockResponse,
304}
305
306impl<'a> IotaTransactionBlockResponseBuilder<'a> {
307 pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
308 Self {
309 response: IotaTransactionBlockResponse::default(),
310 full_response,
311 }
312 }
313
314 pub fn with_input(mut self) -> Self {
315 self.response = IotaTransactionBlockResponse {
316 transaction: self.full_response.transaction.clone(),
317 ..self.response
318 };
319 self
320 }
321
322 pub fn with_raw_input(mut self) -> Self {
323 self.response = IotaTransactionBlockResponse {
324 raw_transaction: self.full_response.raw_transaction.clone(),
325 ..self.response
326 };
327 self
328 }
329
330 pub fn with_effects(mut self) -> Self {
331 self.response = IotaTransactionBlockResponse {
332 effects: self.full_response.effects.clone(),
333 ..self.response
334 };
335 self
336 }
337
338 pub fn with_events(mut self) -> Self {
339 self.response = IotaTransactionBlockResponse {
340 events: self.full_response.events.clone(),
341 ..self.response
342 };
343 self
344 }
345
346 pub fn with_balance_changes(mut self) -> Self {
347 self.response = IotaTransactionBlockResponse {
348 balance_changes: self.full_response.balance_changes.clone(),
349 ..self.response
350 };
351 self
352 }
353
354 pub fn with_object_changes(mut self) -> Self {
355 self.response = IotaTransactionBlockResponse {
356 object_changes: self.full_response.object_changes.clone(),
357 ..self.response
358 };
359 self
360 }
361
362 pub fn with_input_and_changes(mut self) -> Self {
363 self.response = IotaTransactionBlockResponse {
364 transaction: self.full_response.transaction.clone(),
365 balance_changes: self.full_response.balance_changes.clone(),
366 object_changes: self.full_response.object_changes.clone(),
367 ..self.response
368 };
369 self
370 }
371
372 pub fn build(self) -> IotaTransactionBlockResponse {
373 IotaTransactionBlockResponse {
374 transaction: self.response.transaction,
375 raw_transaction: self.response.raw_transaction,
376 effects: self.response.effects,
377 events: self.response.events,
378 balance_changes: self.response.balance_changes,
379 object_changes: self.response.object_changes,
380 ..self.full_response.clone()
382 }
383 }
384}
385
386pub fn db_url(db_name: &str) -> String {
390 format!("postgres://postgres:postgrespw@localhost:5432/{db_name}")
391}
392
393#[derive(QueryableByName, Debug)]
395pub struct RowCount {
396 #[diesel(sql_type = BigInt)]
397 pub cnt: i64,
398}