1use std::path::PathBuf;
6
7use diesel::connection::SimpleConnection;
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::{
14 IndexerMetrics,
15 config::{IngestionConfig, IotaNamesOptions, PruningOptions, SnapshotLagConfig},
16 db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
17 errors::IndexerError,
18 indexer::Indexer,
19 store::{PgIndexerAnalyticalStore, PgIndexerStore},
20};
21
22pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
59
60pub enum IndexerTypeConfig {
61 Reader {
62 reader_mode_rpc_url: String,
63 },
64 Writer {
65 snapshot_config: SnapshotLagConfig,
66 pruning_options: PruningOptions,
67 },
68 AnalyticalWorker,
69}
70
71impl IndexerTypeConfig {
72 pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
73 Self::Reader {
74 reader_mode_rpc_url,
75 }
76 }
77
78 pub fn writer_mode(
79 snapshot_config: Option<SnapshotLagConfig>,
80 epochs_to_keep: Option<u64>,
81 ) -> Self {
82 Self::Writer {
83 snapshot_config: snapshot_config.unwrap_or_default(),
84 pruning_options: PruningOptions { epochs_to_keep },
85 }
86 }
87}
88
89pub async fn start_test_indexer(
90 db_url: String,
91 reset_db: bool,
92 db_init_hook: Option<DBInitHook>,
93 rpc_url: String,
94 reader_writer_config: IndexerTypeConfig,
95 data_ingestion_path: Option<PathBuf>,
96) -> (
97 PgIndexerStore,
98 JoinHandle<Result<(), IndexerError>>,
99 CancellationToken,
100) {
101 let token = CancellationToken::new();
102 let (store, handle) = start_test_indexer_impl(
103 db_url,
104 reset_db,
105 db_init_hook,
106 rpc_url,
107 reader_writer_config,
108 data_ingestion_path,
109 token.clone(),
110 )
111 .await;
112 (store, handle, token)
113}
114
115pub async fn start_test_indexer_impl(
118 db_url: String,
119 reset_db: bool,
120 db_init_hook: Option<DBInitHook>,
121 rpc_url: String,
122 reader_writer_config: IndexerTypeConfig,
123 data_ingestion_path: Option<PathBuf>,
124 cancel: CancellationToken,
125) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
126 let store = create_pg_store(&db_url, reset_db);
127 if reset_db {
128 crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
129 }
130 if let Some(db_init_hook) = db_init_hook {
131 db_init_hook(&store);
132 }
133
134 let registry = prometheus::Registry::default();
135 let handle = match reader_writer_config {
136 IndexerTypeConfig::Reader {
137 reader_mode_rpc_url,
138 } => {
139 let config = crate::config::JsonRpcConfig {
140 iota_names_options: IotaNamesOptions::default(),
141 rpc_address: reader_mode_rpc_url.parse().unwrap(),
142 rpc_client_url: rpc_url,
143 };
144 let pool = store.blocking_cp();
145 tokio::spawn(async move { Indexer::start_reader(&config, ®istry, pool).await })
146 }
147 IndexerTypeConfig::Writer {
148 snapshot_config,
149 pruning_options,
150 } => {
151 let store_clone = store.clone();
152 let mut ingestion_config = IngestionConfig::default();
153 ingestion_config.sources.remote_store_url = data_ingestion_path
154 .is_none()
155 .then_some(format!("{rpc_url}/api/v1").parse().unwrap());
156 ingestion_config.sources.data_ingestion_path = data_ingestion_path;
157 ingestion_config.sources.rpc_client_url = Some(rpc_url.parse().unwrap());
158
159 init_metrics(®istry);
160 let indexer_metrics = IndexerMetrics::new(®istry);
161
162 tokio::spawn(async move {
163 Indexer::start_writer_with_config(
164 &ingestion_config,
165 store_clone,
166 indexer_metrics,
167 snapshot_config,
168 pruning_options,
169 cancel,
170 )
171 .await
172 })
173 }
174 IndexerTypeConfig::AnalyticalWorker => {
175 let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
176
177 init_metrics(®istry);
178 let indexer_metrics = IndexerMetrics::new(®istry);
179
180 tokio::spawn(
181 async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
182 )
183 }
184 };
185
186 (store, handle)
187}
188
189pub struct TestDatabase {
191 pub url: String,
192 db_name: String,
193 connection: PoolConnection,
194 pool_config: ConnectionPoolConfig,
195}
196
197impl TestDatabase {
198 pub fn new(db_url: String) -> Self {
199 let pool_config = ConnectionPoolConfig {
202 pool_size: 5,
203 ..Default::default()
204 };
205
206 let db_name = db_url.split('/').next_back().unwrap().into();
207 let (default_url, _) = replace_db_name(&db_url, "postgres");
208 let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
209 let connection = blocking_pool.get().unwrap();
210 Self {
211 url: db_url,
212 db_name,
213 connection,
214 pool_config,
215 }
216 }
217
218 pub fn drop_if_exists(&mut self) {
220 self.connection
221 .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
222 .unwrap();
223 }
224
225 pub fn create(&mut self) {
227 self.connection
228 .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
229 .unwrap();
230 }
231
232 pub fn recreate(&mut self) {
234 self.drop_if_exists();
235 self.create();
236 }
237
238 pub fn to_connection_pool(&self) -> ConnectionPool {
240 new_connection_pool(&self.url, &self.pool_config).unwrap()
241 }
242
243 pub fn reset_db(&mut self) {
244 crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
245 }
246}
247
248pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
249 let registry = prometheus::Registry::default();
250 init_metrics(®istry);
251 let indexer_metrics = IndexerMetrics::new(®istry);
252
253 let mut test_db = TestDatabase::new(db_url.to_string());
254 if reset_database {
255 test_db.recreate();
256 }
257
258 PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics.clone())
259}
260
261fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
262 let pos = db_url.rfind('/').expect("Unable to find / in db_url");
263 let old_db_name = &db_url[pos + 1..];
264
265 (
266 format!("{}/{}", &db_url[..pos], new_db_name),
267 old_db_name.to_string(),
268 )
269}
270
271pub async fn force_delete_database(db_url: String) {
272 let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
277 let mut pool_config = ConnectionPoolConfig::default();
278 pool_config.set_pool_size(1);
279
280 let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
281 blocking_pool
282 .get()
283 .unwrap()
284 .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
285 .unwrap();
286}
287
288#[derive(Clone)]
289pub struct IotaTransactionBlockResponseBuilder<'a> {
290 response: IotaTransactionBlockResponse,
291 full_response: &'a IotaTransactionBlockResponse,
292}
293
294impl<'a> IotaTransactionBlockResponseBuilder<'a> {
295 pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
296 Self {
297 response: IotaTransactionBlockResponse::default(),
298 full_response,
299 }
300 }
301
302 pub fn with_input(mut self) -> Self {
303 self.response = IotaTransactionBlockResponse {
304 transaction: self.full_response.transaction.clone(),
305 ..self.response
306 };
307 self
308 }
309
310 pub fn with_raw_input(mut self) -> Self {
311 self.response = IotaTransactionBlockResponse {
312 raw_transaction: self.full_response.raw_transaction.clone(),
313 ..self.response
314 };
315 self
316 }
317
318 pub fn with_effects(mut self) -> Self {
319 self.response = IotaTransactionBlockResponse {
320 effects: self.full_response.effects.clone(),
321 ..self.response
322 };
323 self
324 }
325
326 pub fn with_events(mut self) -> Self {
327 self.response = IotaTransactionBlockResponse {
328 events: self.full_response.events.clone(),
329 ..self.response
330 };
331 self
332 }
333
334 pub fn with_balance_changes(mut self) -> Self {
335 self.response = IotaTransactionBlockResponse {
336 balance_changes: self.full_response.balance_changes.clone(),
337 ..self.response
338 };
339 self
340 }
341
342 pub fn with_object_changes(mut self) -> Self {
343 self.response = IotaTransactionBlockResponse {
344 object_changes: self.full_response.object_changes.clone(),
345 ..self.response
346 };
347 self
348 }
349
350 pub fn with_input_and_changes(mut self) -> Self {
351 self.response = IotaTransactionBlockResponse {
352 transaction: self.full_response.transaction.clone(),
353 balance_changes: self.full_response.balance_changes.clone(),
354 object_changes: self.full_response.object_changes.clone(),
355 ..self.response
356 };
357 self
358 }
359
360 pub fn build(self) -> IotaTransactionBlockResponse {
361 IotaTransactionBlockResponse {
362 transaction: self.response.transaction,
363 raw_transaction: self.response.raw_transaction,
364 effects: self.response.effects,
365 events: self.response.events,
366 balance_changes: self.response.balance_changes,
367 object_changes: self.response.object_changes,
368 ..self.full_response.clone()
370 }
371 }
372}