1use std::path::PathBuf;
6
7use diesel::{QueryableByName, connection::SimpleConnection, sql_types::BigInt};
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::{
14 IndexerMetrics,
15 config::{IngestionConfig, IotaNamesOptions, RetentionConfig, SnapshotLagConfig},
16 db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool},
17 errors::IndexerError,
18 indexer::Indexer,
19 store::{PgIndexerAnalyticalStore, PgIndexerStore},
20};
21
22pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
59
60pub enum IndexerTypeConfig {
61 Reader {
62 reader_mode_rpc_url: String,
63 },
64 Writer {
65 snapshot_config: SnapshotLagConfig,
66 retention_config: Option<RetentionConfig>,
67 },
68 AnalyticalWorker,
69}
70
71impl IndexerTypeConfig {
72 pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
73 Self::Reader {
74 reader_mode_rpc_url,
75 }
76 }
77
78 pub fn writer_mode(
79 snapshot_config: Option<SnapshotLagConfig>,
80 epochs_to_keep: Option<u64>,
81 ) -> Self {
82 Self::Writer {
83 snapshot_config: snapshot_config.unwrap_or_default(),
84 retention_config: epochs_to_keep
85 .map(RetentionConfig::new_with_default_retention_only_for_testing),
86 }
87 }
88}
89
90pub async fn start_test_indexer(
91 db_url: String,
92 reset_db: bool,
93 db_init_hook: Option<DBInitHook>,
94 rpc_url: String,
95 reader_writer_config: IndexerTypeConfig,
96 data_ingestion_path: Option<PathBuf>,
97) -> (
98 PgIndexerStore,
99 JoinHandle<Result<(), IndexerError>>,
100 CancellationToken,
101) {
102 let token = CancellationToken::new();
103 let (store, handle) = start_test_indexer_impl(
104 db_url,
105 reset_db,
106 db_init_hook,
107 rpc_url,
108 reader_writer_config,
109 data_ingestion_path,
110 token.clone(),
111 )
112 .await;
113 (store, handle, token)
114}
115
116pub async fn start_test_indexer_impl(
119 db_url: String,
120 reset_db: bool,
121 db_init_hook: Option<DBInitHook>,
122 rpc_url: String,
123 reader_writer_config: IndexerTypeConfig,
124 data_ingestion_path: Option<PathBuf>,
125 cancel: CancellationToken,
126) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
127 let store = create_pg_store(&db_url, reset_db);
128 if reset_db {
129 crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
130 }
131 if let Some(db_init_hook) = db_init_hook {
132 db_init_hook(&store);
133 }
134
135 let registry = prometheus::Registry::default();
136 init_metrics(®istry);
137 let indexer_metrics = IndexerMetrics::new(®istry);
138
139 let handle = match reader_writer_config {
140 IndexerTypeConfig::Reader {
141 reader_mode_rpc_url,
142 } => {
143 let config = crate::config::JsonRpcConfig {
144 iota_names_options: IotaNamesOptions::default(),
145 rpc_address: reader_mode_rpc_url.parse().unwrap(),
146 rpc_client_url: rpc_url,
147 };
148 let pool = store.blocking_cp();
149 let store_clone = store.clone();
150 tokio::spawn(async move {
151 Indexer::start_reader(&config, store_clone, ®istry, pool, indexer_metrics).await
152 })
153 }
154 IndexerTypeConfig::Writer {
155 snapshot_config,
156 retention_config,
157 } => {
158 let store_clone = store.clone();
159 let mut ingestion_config = IngestionConfig::default();
160 ingestion_config.sources.remote_store_url = data_ingestion_path
161 .is_none()
162 .then_some(format!("{rpc_url}/api/v1").parse().unwrap());
163 ingestion_config.sources.data_ingestion_path = data_ingestion_path;
164 ingestion_config.sources.rpc_client_url = Some(rpc_url.parse().unwrap());
165
166 tokio::spawn(async move {
167 Indexer::start_writer_with_config(
168 &ingestion_config,
169 store_clone,
170 indexer_metrics,
171 snapshot_config,
172 retention_config,
173 cancel,
174 )
175 .await
176 })
177 }
178 IndexerTypeConfig::AnalyticalWorker => {
179 let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
180
181 tokio::spawn(
182 async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
183 )
184 }
185 };
186
187 (store, handle)
188}
189
190pub struct TestDatabase {
192 pub url: String,
193 db_name: String,
194 connection: PoolConnection,
195 pool_config: ConnectionPoolConfig,
196}
197
198impl TestDatabase {
199 pub fn new(db_url: String) -> Self {
200 let pool_config = ConnectionPoolConfig {
203 pool_size: 5,
204 ..Default::default()
205 };
206
207 let db_name = db_url.split('/').next_back().unwrap().into();
208 let (default_url, _) = replace_db_name(&db_url, "postgres");
209 let blocking_pool = new_connection_pool(&default_url, &pool_config).unwrap();
210 let connection = blocking_pool.get().unwrap();
211 Self {
212 url: db_url,
213 db_name,
214 connection,
215 pool_config,
216 }
217 }
218
219 pub fn drop_if_exists(&mut self) {
221 self.connection
222 .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
223 .unwrap();
224 }
225
226 pub fn create(&mut self) {
228 self.connection
229 .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
230 .unwrap();
231 }
232
233 pub fn recreate(&mut self) {
235 self.drop_if_exists();
236 self.create();
237 }
238
239 pub fn to_connection_pool(&self) -> ConnectionPool {
241 new_connection_pool(&self.url, &self.pool_config).unwrap()
242 }
243
244 pub fn reset_db(&mut self) {
245 crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
246 }
247}
248
249pub fn create_pg_store(db_url: &str, reset_database: bool) -> PgIndexerStore {
250 let registry = prometheus::Registry::default();
251 init_metrics(®istry);
252 let indexer_metrics = IndexerMetrics::new(®istry);
253
254 let mut test_db = TestDatabase::new(db_url.to_string());
255 if reset_database {
256 test_db.recreate();
257 }
258
259 PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics.clone())
260}
261
262fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
263 let pos = db_url.rfind('/').expect("Unable to find / in db_url");
264 let old_db_name = &db_url[pos + 1..];
265
266 (
267 format!("{}/{}", &db_url[..pos], new_db_name),
268 old_db_name.to_string(),
269 )
270}
271
272pub async fn force_delete_database(db_url: String) {
273 let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
278 let mut pool_config = ConnectionPoolConfig::default();
279 pool_config.set_pool_size(1);
280
281 let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap();
282 blocking_pool
283 .get()
284 .unwrap()
285 .batch_execute(&format!("DROP DATABASE IF EXISTS {db_name} WITH (FORCE)"))
286 .unwrap();
287}
288
289#[derive(Clone)]
290pub struct IotaTransactionBlockResponseBuilder<'a> {
291 response: IotaTransactionBlockResponse,
292 full_response: &'a IotaTransactionBlockResponse,
293}
294
295impl<'a> IotaTransactionBlockResponseBuilder<'a> {
296 pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
297 Self {
298 response: IotaTransactionBlockResponse::default(),
299 full_response,
300 }
301 }
302
303 pub fn with_input(mut self) -> Self {
304 self.response = IotaTransactionBlockResponse {
305 transaction: self.full_response.transaction.clone(),
306 ..self.response
307 };
308 self
309 }
310
311 pub fn with_raw_input(mut self) -> Self {
312 self.response = IotaTransactionBlockResponse {
313 raw_transaction: self.full_response.raw_transaction.clone(),
314 ..self.response
315 };
316 self
317 }
318
319 pub fn with_effects(mut self) -> Self {
320 self.response = IotaTransactionBlockResponse {
321 effects: self.full_response.effects.clone(),
322 ..self.response
323 };
324 self
325 }
326
327 pub fn with_events(mut self) -> Self {
328 self.response = IotaTransactionBlockResponse {
329 events: self.full_response.events.clone(),
330 ..self.response
331 };
332 self
333 }
334
335 pub fn with_balance_changes(mut self) -> Self {
336 self.response = IotaTransactionBlockResponse {
337 balance_changes: self.full_response.balance_changes.clone(),
338 ..self.response
339 };
340 self
341 }
342
343 pub fn with_object_changes(mut self) -> Self {
344 self.response = IotaTransactionBlockResponse {
345 object_changes: self.full_response.object_changes.clone(),
346 ..self.response
347 };
348 self
349 }
350
351 pub fn with_input_and_changes(mut self) -> Self {
352 self.response = IotaTransactionBlockResponse {
353 transaction: self.full_response.transaction.clone(),
354 balance_changes: self.full_response.balance_changes.clone(),
355 object_changes: self.full_response.object_changes.clone(),
356 ..self.response
357 };
358 self
359 }
360
361 pub fn build(self) -> IotaTransactionBlockResponse {
362 IotaTransactionBlockResponse {
363 transaction: self.response.transaction,
364 raw_transaction: self.response.raw_transaction,
365 effects: self.response.effects,
366 events: self.response.events,
367 balance_changes: self.response.balance_changes,
368 object_changes: self.response.object_changes,
369 ..self.full_response.clone()
371 }
372 }
373}
374
375pub fn db_url(db_name: &str) -> String {
379 format!("postgres://postgres:postgrespw@localhost:5432/{db_name}")
380}
381
382#[derive(QueryableByName, Debug)]
384pub struct RowCount {
385 #[diesel(sql_type = BigInt)]
386 pub cnt: i64,
387}