1use std::{net::SocketAddr, path::PathBuf};
6
7use diesel::connection::SimpleConnection;
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use secrecy::{ExposeSecret, Secret};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tracing::info;
14
15use crate::{
16 IndexerConfig, IndexerMetrics,
17 db::{ConnectionPool, ConnectionPoolConfig, PoolConnection, new_connection_pool_with_config},
18 errors::IndexerError,
19 handlers::objects_snapshot_handler::SnapshotLagConfig,
20 indexer::Indexer,
21 store::{PgIndexerAnalyticalStore, PgIndexerStore},
22};
23
24pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
61
62pub enum IndexerTypeConfig {
63 Reader {
64 reader_mode_rpc_url: String,
65 },
66 Writer {
67 snapshot_config: SnapshotLagConfig,
68 epochs_to_keep: Option<u64>,
69 },
70 AnalyticalWorker,
71}
72
73impl IndexerTypeConfig {
74 pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
75 Self::Reader {
76 reader_mode_rpc_url,
77 }
78 }
79
80 pub fn writer_mode(
81 snapshot_config: Option<SnapshotLagConfig>,
82 epochs_to_keep: Option<u64>,
83 ) -> Self {
84 Self::Writer {
85 snapshot_config: snapshot_config.unwrap_or_default(),
86 epochs_to_keep,
87 }
88 }
89}
90
91pub async fn start_test_indexer(
92 db_url: String,
93 reset_db: bool,
94 db_init_hook: Option<DBInitHook>,
95 rpc_url: String,
96 reader_writer_config: IndexerTypeConfig,
97 data_ingestion_path: Option<PathBuf>,
98) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
99 start_test_indexer_impl(
100 db_url,
101 reset_db,
102 db_init_hook,
103 rpc_url,
104 reader_writer_config,
105 data_ingestion_path,
106 CancellationToken::new(),
107 )
108 .await
109}
110
111pub async fn start_test_indexer_impl(
114 db_url: String,
115 reset_db: bool,
116 db_init_hook: Option<DBInitHook>,
117 rpc_url: String,
118 reader_writer_config: IndexerTypeConfig,
119 data_ingestion_path: Option<PathBuf>,
120 cancel: CancellationToken,
121) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
122 let mut config = IndexerConfig {
123 db_url: Some(db_url.clone().into()),
124 remote_store_url: data_ingestion_path
126 .is_none()
127 .then_some(format!("{rpc_url}/api/v1")),
128 rpc_client_url: rpc_url,
129 reset_db,
130 fullnode_sync_worker: true,
131 rpc_server_worker: false,
132 data_ingestion_path,
133 ..Default::default()
134 };
135
136 let store = create_pg_store(config.get_db_url().unwrap(), reset_db);
137 if config.reset_db {
138 crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
139 }
140 if let Some(db_init_hook) = db_init_hook {
141 db_init_hook(&store);
142 }
143
144 let registry = prometheus::Registry::default();
145 let handle = match reader_writer_config {
146 IndexerTypeConfig::Reader {
147 reader_mode_rpc_url,
148 } => {
149 let reader_mode_rpc_url = reader_mode_rpc_url
150 .parse::<SocketAddr>()
151 .expect("Unable to parse fullnode address");
152 config.fullnode_sync_worker = false;
153 config.rpc_server_worker = true;
154 config.rpc_server_url = reader_mode_rpc_url.ip().to_string();
155 config.rpc_server_port = reader_mode_rpc_url.port();
156 tokio::spawn(async move { Indexer::start_reader(&config, ®istry, db_url).await })
157 }
158 IndexerTypeConfig::Writer {
159 snapshot_config,
160 epochs_to_keep,
161 } => {
162 let store_clone = store.clone();
163
164 init_metrics(®istry);
165 let indexer_metrics = IndexerMetrics::new(®istry);
166
167 tokio::spawn(async move {
168 Indexer::start_writer_with_config(
169 &config,
170 store_clone,
171 indexer_metrics,
172 snapshot_config,
173 epochs_to_keep,
174 cancel,
175 )
176 .await
177 })
178 }
179 IndexerTypeConfig::AnalyticalWorker => {
180 let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
181
182 init_metrics(®istry);
183 let indexer_metrics = IndexerMetrics::new(®istry);
184
185 tokio::spawn(
186 async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
187 )
188 }
189 };
190
191 (store, handle)
192}
193
194pub struct TestDatabase {
196 pub url: Secret<String>,
197 db_name: String,
198 connection: PoolConnection,
199 pool_config: ConnectionPoolConfig,
200}
201
202impl TestDatabase {
203 pub fn new(db_url: Secret<String>) -> Self {
204 let pool_config = ConnectionPoolConfig::default();
205 let db_name = db_url
206 .expose_secret()
207 .split('/')
208 .next_back()
209 .unwrap()
210 .into();
211 let (default_url, _) = replace_db_name(db_url.expose_secret(), "postgres");
212 let blocking_pool =
213 new_connection_pool_with_config(&default_url, Some(5), pool_config).unwrap();
214 let connection = blocking_pool.get().unwrap();
215 Self {
216 url: db_url,
217 db_name,
218 connection,
219 pool_config,
220 }
221 }
222
223 pub fn drop_if_exists(&mut self) {
225 self.connection
226 .batch_execute(&format!("DROP DATABASE IF EXISTS {}", self.db_name))
227 .unwrap();
228 }
229
230 pub fn create(&mut self) {
232 self.connection
233 .batch_execute(&format!("CREATE DATABASE {}", self.db_name))
234 .unwrap();
235 }
236
237 pub fn recreate(&mut self) {
239 self.drop_if_exists();
240 self.create();
241 }
242
243 pub fn to_connection_pool(&self) -> ConnectionPool {
245 new_connection_pool_with_config(self.url.expose_secret(), Some(5), self.pool_config)
246 .unwrap()
247 }
248
249 pub fn reset_db(&mut self) {
250 crate::db::reset_database(&mut self.to_connection_pool().get().unwrap()).unwrap();
251 }
252}
253
254pub fn create_pg_store(db_url: Secret<String>, reset_database: bool) -> PgIndexerStore {
255 info!("Setting DB_POOL_SIZE to 10");
258 std::env::set_var("DB_POOL_SIZE", "10");
259
260 let registry = prometheus::Registry::default();
261
262 init_metrics(®istry);
263
264 let indexer_metrics = IndexerMetrics::new(®istry);
265
266 let mut test_db = TestDatabase::new(db_url);
267 if reset_database {
268 test_db.recreate();
269 }
270
271 PgIndexerStore::new(test_db.to_connection_pool(), indexer_metrics.clone())
272}
273
274fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
275 let pos = db_url.rfind('/').expect("Unable to find / in db_url");
276 let old_db_name = &db_url[pos + 1..];
277
278 (
279 format!("{}/{}", &db_url[..pos], new_db_name),
280 old_db_name.to_string(),
281 )
282}
283
284pub async fn force_delete_database(db_url: String) {
285 let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
290 let pool_config = ConnectionPoolConfig::default();
291
292 let blocking_pool =
293 new_connection_pool_with_config(&default_db_url, Some(5), pool_config).unwrap();
294 blocking_pool
295 .get()
296 .unwrap()
297 .batch_execute(&format!("DROP DATABASE IF EXISTS {} WITH (FORCE)", db_name))
298 .unwrap();
299}
300
301#[derive(Clone)]
302pub struct IotaTransactionBlockResponseBuilder<'a> {
303 response: IotaTransactionBlockResponse,
304 full_response: &'a IotaTransactionBlockResponse,
305}
306
307impl<'a> IotaTransactionBlockResponseBuilder<'a> {
308 pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
309 Self {
310 response: IotaTransactionBlockResponse::default(),
311 full_response,
312 }
313 }
314
315 pub fn with_input(mut self) -> Self {
316 self.response = IotaTransactionBlockResponse {
317 transaction: self.full_response.transaction.clone(),
318 ..self.response
319 };
320 self
321 }
322
323 pub fn with_raw_input(mut self) -> Self {
324 self.response = IotaTransactionBlockResponse {
325 raw_transaction: self.full_response.raw_transaction.clone(),
326 ..self.response
327 };
328 self
329 }
330
331 pub fn with_effects(mut self) -> Self {
332 self.response = IotaTransactionBlockResponse {
333 effects: self.full_response.effects.clone(),
334 ..self.response
335 };
336 self
337 }
338
339 pub fn with_events(mut self) -> Self {
340 self.response = IotaTransactionBlockResponse {
341 events: self.full_response.events.clone(),
342 ..self.response
343 };
344 self
345 }
346
347 pub fn with_balance_changes(mut self) -> Self {
348 self.response = IotaTransactionBlockResponse {
349 balance_changes: self.full_response.balance_changes.clone(),
350 ..self.response
351 };
352 self
353 }
354
355 pub fn with_object_changes(mut self) -> Self {
356 self.response = IotaTransactionBlockResponse {
357 object_changes: self.full_response.object_changes.clone(),
358 ..self.response
359 };
360 self
361 }
362
363 pub fn with_input_and_changes(mut self) -> Self {
364 self.response = IotaTransactionBlockResponse {
365 transaction: self.full_response.transaction.clone(),
366 balance_changes: self.full_response.balance_changes.clone(),
367 object_changes: self.full_response.object_changes.clone(),
368 ..self.response
369 };
370 self
371 }
372
373 pub fn build(self) -> IotaTransactionBlockResponse {
374 IotaTransactionBlockResponse {
375 transaction: self.response.transaction,
376 raw_transaction: self.response.raw_transaction,
377 effects: self.response.effects,
378 events: self.response.events,
379 balance_changes: self.response.balance_changes,
380 object_changes: self.response.object_changes,
381 ..self.full_response.clone()
383 }
384 }
385}