1use std::path::PathBuf;
6
7use diesel::{QueryableByName, connection::SimpleConnection, sql_types::BigInt};
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use url::Url;
13
14use crate::{
15 IndexerMetrics,
16 config::{IngestionConfig, IotaNamesOptions, PruningOptions, RetentionConfig},
17 db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
18 errors::IndexerError,
19 indexer::Indexer,
20 store::{PgIndexerAnalyticalStore, PgIndexerStore},
21};
22
23const TEST_PRUNING_DELAY_MS: u64 = 1000; pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
63
64pub enum IndexerTypeConfig {
65 Reader {
66 reader_mode_rpc_url: String,
67 },
68 Writer {
69 retention_config: Option<RetentionConfig>,
70 pruning_delay_ms: u64,
71 pruning_batch_size: u64,
72 },
73 AnalyticalWorker,
74}
75
76impl IndexerTypeConfig {
77 pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
78 Self::Reader {
79 reader_mode_rpc_url,
80 }
81 }
82
83 pub fn writer_mode(pruning_options: Option<PruningOptions>) -> Self {
84 let opts = pruning_options.unwrap_or_default();
85 Self::Writer {
86 retention_config: opts
87 .epochs_to_keep
88 .map(RetentionConfig::new_with_default_retention_only_for_testing),
89 pruning_delay_ms: TEST_PRUNING_DELAY_MS,
90 pruning_batch_size: opts.pruning_batch_size,
91 }
92 }
93}
94
95pub async fn start_test_indexer(
96 db_url: String,
97 reset_db: bool,
98 db_init_hook: Option<DBInitHook>,
99 rpc_url: String,
100 reader_writer_config: IndexerTypeConfig,
101 data_ingestion_path: Option<PathBuf>,
102) -> (
103 PgIndexerStore,
104 JoinHandle<Result<(), IndexerError>>,
105 CancellationToken,
106) {
107 let token = CancellationToken::new();
108 let (store, handle) = start_test_indexer_impl(
109 db_url,
110 reset_db,
111 db_init_hook,
112 rpc_url,
113 reader_writer_config,
114 data_ingestion_path,
115 token.clone(),
116 )
117 .await;
118 (store, handle, token)
119}
120
121pub async fn start_test_indexer_impl(
131 db_url: String,
132 reset_db: bool,
133 db_init_hook: Option<DBInitHook>,
134 rpc_url: String,
135 reader_writer_config: IndexerTypeConfig,
136 data_ingestion_path: Option<PathBuf>,
137 cancel: CancellationToken,
138) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
139 let store = create_pg_store(&db_url, reset_db);
140 if reset_db {
141 crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
142 }
143 if let Some(db_init_hook) = db_init_hook {
144 db_init_hook(&store);
145 }
146
147 let registry = prometheus::Registry::default();
148 init_metrics(®istry);
149 let indexer_metrics = IndexerMetrics::new(®istry);
150
151 let handle = match reader_writer_config {
152 IndexerTypeConfig::Reader {
153 reader_mode_rpc_url,
154 } => {
155 let config = crate::config::JsonRpcConfig {
156 iota_names_options: IotaNamesOptions::default(),
157 historic_fallback_options: Default::default(),
158 rpc_address: reader_mode_rpc_url.parse().unwrap(),
159 rpc_client_url: rpc_url,
160 };
161 let pool = store.blocking_cp();
162 let store_clone = store.clone();
163 tokio::spawn(async move {
164 Indexer::start_reader(
165 &config,
166 store_clone,
167 ®istry,
168 pool,
169 indexer_metrics,
170 CancellationToken::new(),
171 )
172 .await
173 })
174 }
175 IndexerTypeConfig::Writer {
176 retention_config,
177 pruning_delay_ms,
178 pruning_batch_size,
179 } => {
180 let fullnode_rpc_url = rpc_url.parse::<Url>().unwrap();
181 let store_clone = store.clone();
182 let mut ingestion_config = IngestionConfig::default();
183 ingestion_config.sources.remote_store_url =
184 data_ingestion_path.is_none().then_some(fullnode_rpc_url);
185 ingestion_config.sources.data_ingestion_path = data_ingestion_path;
186
187 tokio::spawn(async move {
188 Indexer::start_writer_with_config(
189 &ingestion_config,
190 store_clone,
191 indexer_metrics,
192 retention_config,
193 pruning_delay_ms,
194 pruning_batch_size,
195 cancel,
196 )
197 .await
198 })
199 }
200 IndexerTypeConfig::AnalyticalWorker => {
201 let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
202
203 tokio::spawn(
204 async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
205 )
206 }
207 };
208
209 (store, handle)
210}
211
212pub struct TestDatabase {
214 pub url: String,
215 db_name: String,
216 connection: PoolConnection,
217 pool_config: ConnectionPoolConfig,
218}
219
220impl TestDatabase {
221 pub fn new(db_url: String) -> Self {
222 let pool_config = ConnectionPoolConfig {
225 pool_size: 5,
226 ..Default::default()
227 };
228
229 let db_name = db_url.split('/').next_back().unwrap().into();
230 let (default_url, _) = replace_db_name(&db_url, "postgres");
231 let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
232 let connection = blocking_pool.get().unwrap();
233 Self {
234 url: db_url,
235 db_name,
236 connection,
237 pool_config,
238 }
239 }
240
241 pub fn drop_if_exists(&mut self) {
243 self.connection
244 .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
245 .unwrap();
246 }
247
248 pub fn create(&mut self) {
250 self.connection
251 .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
252 .unwrap();
253 }
254
255 pub fn recreate(&mut self) {
257 self.drop_if_exists();
258 self.create();
259 }
260
261 pub fn to_connection_pool(&self) -> ConnectionPool {
263 new_connection_pool(&self.url, &self.pool_config).unwrap()
264 }
265
266 pub fn reset_db(&mut self) {
267 crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
268 }
269}
270
271pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
272 let registry = prometheus::Registry::default();
273 init_metrics(®istry);
274 let indexer_metrics = IndexerMetrics::new(®istry);
275
276 let mut test_db = TestDatabase::new(db_url.to_string());
277 if reset_database {
278 test_db.recreate();
279 }
280
281 PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics)
282}
283
284fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
285 let pos = db_url.rfind('/').expect("unable to find / in db_url");
286 let old_db_name = &db_url[pos + 1..];
287
288 (
289 format!("{}/{}", &db_url[..pos], new_db_name),
290 old_db_name.to_string(),
291 )
292}
293
294pub async fn force_delete_database(db_url: String) {
295 let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
300 let mut pool_config = ConnectionPoolConfig::default();
301 pool_config.set_pool_size(1);
302
303 let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
304 blocking_pool
305 .get()
306 .unwrap()
307 .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
308 .unwrap();
309}
310
311#[derive(Clone)]
312pub struct IotaTransactionBlockResponseBuilder<'a> {
313 response: IotaTransactionBlockResponse,
314 full_response: &'a IotaTransactionBlockResponse,
315}
316
317impl<'a> IotaTransactionBlockResponseBuilder<'a> {
318 pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
319 Self {
320 response: IotaTransactionBlockResponse::default(),
321 full_response,
322 }
323 }
324
325 pub fn with_input(mut self) -> Self {
326 self.response = IotaTransactionBlockResponse {
327 transaction: self.full_response.transaction.clone(),
328 ..self.response
329 };
330 self
331 }
332
333 pub fn with_raw_input(mut self) -> Self {
334 self.response = IotaTransactionBlockResponse {
335 raw_transaction: self.full_response.raw_transaction.clone(),
336 ..self.response
337 };
338 self
339 }
340
341 pub fn with_effects(mut self) -> Self {
342 self.response = IotaTransactionBlockResponse {
343 effects: self.full_response.effects.clone(),
344 ..self.response
345 };
346 self
347 }
348
349 pub fn with_events(mut self) -> Self {
350 self.response = IotaTransactionBlockResponse {
351 events: self.full_response.events.clone(),
352 ..self.response
353 };
354 self
355 }
356
357 pub fn with_balance_changes(mut self) -> Self {
358 self.response = IotaTransactionBlockResponse {
359 balance_changes: self.full_response.balance_changes.clone(),
360 ..self.response
361 };
362 self
363 }
364
365 pub fn with_object_changes(mut self) -> Self {
366 self.response = IotaTransactionBlockResponse {
367 object_changes: self.full_response.object_changes.clone(),
368 ..self.response
369 };
370 self
371 }
372
373 pub fn with_input_and_changes(mut self) -> Self {
374 self.response = IotaTransactionBlockResponse {
375 transaction: self.full_response.transaction.clone(),
376 balance_changes: self.full_response.balance_changes.clone(),
377 object_changes: self.full_response.object_changes.clone(),
378 ..self.response
379 };
380 self
381 }
382
383 pub fn build(self) -> IotaTransactionBlockResponse {
384 IotaTransactionBlockResponse {
385 transaction: self.response.transaction,
386 raw_transaction: self.response.raw_transaction,
387 effects: self.response.effects,
388 events: self.response.events,
389 balance_changes: self.response.balance_changes,
390 object_changes: self.response.object_changes,
391 ..self.full_response.clone()
393 }
394 }
395}
396
397pub fn db_url(db_name: &str) -> String {
401 format!("postgres://postgres:postgrespw@localhost:5432/{db_name}")
402}
403
404#[derive(QueryableByName, Debug)]
406pub struct RowCount {
407 #[diesel(sql_type = BigInt)]
408 pub cnt: i64,
409}