1use std::{net::SocketAddr, path::PathBuf};
6
7use diesel::connection::SimpleConnection;
8use iota_json_rpc_types::IotaTransactionBlockResponse;
9use iota_metrics::init_metrics;
10use secrecy::{ExposeSecret, Secret};
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13use tracing::info;
14
15use crate::{
16 IndexerConfig, IndexerMetrics,
17 db::{ConnectionPoolConfig, new_connection_pool_with_config},
18 errors::IndexerError,
19 handlers::objects_snapshot_handler::SnapshotLagConfig,
20 indexer::Indexer,
21 store::{PgIndexerAnalyticalStore, PgIndexerStore},
22};
23
24pub type DBInitHook = Box<dyn FnOnce(&PgIndexerStore) + Send>;
61
62pub enum IndexerTypeConfig {
63 Reader {
64 reader_mode_rpc_url: String,
65 },
66 Writer {
67 snapshot_config: SnapshotLagConfig,
68 epochs_to_keep: Option<u64>,
69 },
70 AnalyticalWorker,
71}
72
73impl IndexerTypeConfig {
74 pub fn reader_mode(reader_mode_rpc_url: String) -> Self {
75 Self::Reader {
76 reader_mode_rpc_url,
77 }
78 }
79
80 pub fn writer_mode(
81 snapshot_config: Option<SnapshotLagConfig>,
82 epochs_to_keep: Option<u64>,
83 ) -> Self {
84 Self::Writer {
85 snapshot_config: snapshot_config.unwrap_or_default(),
86 epochs_to_keep,
87 }
88 }
89}
90
91pub async fn start_test_indexer(
92 db_url: String,
93 reset_db: bool,
94 db_init_hook: Option<DBInitHook>,
95 rpc_url: String,
96 reader_writer_config: IndexerTypeConfig,
97 data_ingestion_path: Option<PathBuf>,
98) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
99 start_test_indexer_impl(
100 db_url,
101 reset_db,
102 db_init_hook,
103 rpc_url,
104 reader_writer_config,
105 data_ingestion_path,
106 CancellationToken::new(),
107 )
108 .await
109}
110
111pub async fn start_test_indexer_impl(
114 db_url: String,
115 reset_db: bool,
116 db_init_hook: Option<DBInitHook>,
117 rpc_url: String,
118 reader_writer_config: IndexerTypeConfig,
119 data_ingestion_path: Option<PathBuf>,
120 cancel: CancellationToken,
121) -> (PgIndexerStore, JoinHandle<Result<(), IndexerError>>) {
122 let mut config = IndexerConfig {
123 db_url: Some(db_url.clone().into()),
124 remote_store_url: data_ingestion_path
126 .is_none()
127 .then_some(format!("{rpc_url}/api/v1")),
128 rpc_client_url: rpc_url,
129 reset_db,
130 fullnode_sync_worker: true,
131 rpc_server_worker: false,
132 data_ingestion_path,
133 ..Default::default()
134 };
135
136 let store = create_pg_store(config.get_db_url().unwrap(), reset_db);
137 if config.reset_db {
138 crate::db::reset_database(&mut store.blocking_cp().get().unwrap()).unwrap();
139 }
140 if let Some(db_init_hook) = db_init_hook {
141 db_init_hook(&store);
142 }
143
144 let registry = prometheus::Registry::default();
145 let handle = match reader_writer_config {
146 IndexerTypeConfig::Reader {
147 reader_mode_rpc_url,
148 } => {
149 let reader_mode_rpc_url = reader_mode_rpc_url
150 .parse::<SocketAddr>()
151 .expect("Unable to parse fullnode address");
152 config.fullnode_sync_worker = false;
153 config.rpc_server_worker = true;
154 config.rpc_server_url = reader_mode_rpc_url.ip().to_string();
155 config.rpc_server_port = reader_mode_rpc_url.port();
156 tokio::spawn(async move { Indexer::start_reader(&config, ®istry, db_url).await })
157 }
158 IndexerTypeConfig::Writer {
159 snapshot_config,
160 epochs_to_keep,
161 } => {
162 let store_clone = store.clone();
163
164 init_metrics(®istry);
165 let indexer_metrics = IndexerMetrics::new(®istry);
166
167 tokio::spawn(async move {
168 Indexer::start_writer_with_config(
169 &config,
170 store_clone,
171 indexer_metrics,
172 snapshot_config,
173 epochs_to_keep,
174 cancel,
175 )
176 .await
177 })
178 }
179 IndexerTypeConfig::AnalyticalWorker => {
180 let store = PgIndexerAnalyticalStore::new(store.blocking_cp());
181
182 init_metrics(®istry);
183 let indexer_metrics = IndexerMetrics::new(®istry);
184
185 tokio::spawn(
186 async move { Indexer::start_analytical_worker(store, indexer_metrics).await },
187 )
188 }
189 };
190
191 (store, handle)
192}
193
194pub fn create_pg_store(db_url: Secret<String>, reset_database: bool) -> PgIndexerStore {
195 info!("Setting DB_POOL_SIZE to 10");
198 std::env::set_var("DB_POOL_SIZE", "10");
199
200 let pool_config = ConnectionPoolConfig::default();
202
203 let registry = prometheus::Registry::default();
204
205 init_metrics(®istry);
206
207 let indexer_metrics = IndexerMetrics::new(®istry);
208
209 let mut parsed_url = db_url.clone();
210 if reset_database {
211 let db_name = parsed_url.expose_secret().split('/').next_back().unwrap();
212 let (default_db_url, _) = replace_db_name(parsed_url.expose_secret(), "postgres");
214
215 let blocking_pool =
217 new_connection_pool_with_config(&default_db_url, Some(5), pool_config).unwrap();
218 let mut default_conn = blocking_pool.get().unwrap();
219
220 default_conn
222 .batch_execute(&format!("DROP DATABASE IF EXISTS {}", db_name))
223 .unwrap();
224
225 default_conn
227 .batch_execute(&format!("CREATE DATABASE {}", db_name))
228 .unwrap();
229 parsed_url = replace_db_name(parsed_url.expose_secret(), db_name)
230 .0
231 .into();
232 }
233
234 let blocking_pool =
235 new_connection_pool_with_config(parsed_url.expose_secret(), Some(5), pool_config).unwrap();
236 PgIndexerStore::new(blocking_pool.clone(), indexer_metrics.clone())
237}
238
239fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) {
240 let pos = db_url.rfind('/').expect("Unable to find / in db_url");
241 let old_db_name = &db_url[pos + 1..];
242
243 (
244 format!("{}/{}", &db_url[..pos], new_db_name),
245 old_db_name.to_string(),
246 )
247}
248
249pub async fn force_delete_database(db_url: String) {
250 let (default_db_url, db_name) = replace_db_name(&db_url, "postgres");
255 let pool_config = ConnectionPoolConfig::default();
256
257 let blocking_pool =
258 new_connection_pool_with_config(&default_db_url, Some(5), pool_config).unwrap();
259 blocking_pool
260 .get()
261 .unwrap()
262 .batch_execute(&format!("DROP DATABASE IF EXISTS {} WITH (FORCE)", db_name))
263 .unwrap();
264}
265
266#[derive(Clone)]
267pub struct IotaTransactionBlockResponseBuilder<'a> {
268 response: IotaTransactionBlockResponse,
269 full_response: &'a IotaTransactionBlockResponse,
270}
271
272impl<'a> IotaTransactionBlockResponseBuilder<'a> {
273 pub fn new(full_response: &'a IotaTransactionBlockResponse) -> Self {
274 Self {
275 response: IotaTransactionBlockResponse::default(),
276 full_response,
277 }
278 }
279
280 pub fn with_input(mut self) -> Self {
281 self.response = IotaTransactionBlockResponse {
282 transaction: self.full_response.transaction.clone(),
283 ..self.response
284 };
285 self
286 }
287
288 pub fn with_raw_input(mut self) -> Self {
289 self.response = IotaTransactionBlockResponse {
290 raw_transaction: self.full_response.raw_transaction.clone(),
291 ..self.response
292 };
293 self
294 }
295
296 pub fn with_effects(mut self) -> Self {
297 self.response = IotaTransactionBlockResponse {
298 effects: self.full_response.effects.clone(),
299 ..self.response
300 };
301 self
302 }
303
304 pub fn with_events(mut self) -> Self {
305 self.response = IotaTransactionBlockResponse {
306 events: self.full_response.events.clone(),
307 ..self.response
308 };
309 self
310 }
311
312 pub fn with_balance_changes(mut self) -> Self {
313 self.response = IotaTransactionBlockResponse {
314 balance_changes: self.full_response.balance_changes.clone(),
315 ..self.response
316 };
317 self
318 }
319
320 pub fn with_object_changes(mut self) -> Self {
321 self.response = IotaTransactionBlockResponse {
322 object_changes: self.full_response.object_changes.clone(),
323 ..self.response
324 };
325 self
326 }
327
328 pub fn with_input_and_changes(mut self) -> Self {
329 self.response = IotaTransactionBlockResponse {
330 transaction: self.full_response.transaction.clone(),
331 balance_changes: self.full_response.balance_changes.clone(),
332 object_changes: self.full_response.object_changes.clone(),
333 ..self.response
334 };
335 self
336 }
337
338 pub fn build(self) -> IotaTransactionBlockResponse {
339 IotaTransactionBlockResponse {
340 transaction: self.response.transaction,
341 raw_transaction: self.response.raw_transaction,
342 effects: self.response.effects,
343 events: self.response.events,
344 balance_changes: self.response.balance_changes,
345 object_changes: self.response.object_changes,
346 ..self.full_response.clone()
348 }
349 }
350}