iota_indexer/store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub(crate) use indexer_analytics_store::IndexerAnalyticalStore;
6pub(crate) use indexer_store::*;
7pub use pg_indexer_analytical_store::PgIndexerAnalyticalStore;
8pub use pg_indexer_store::PgIndexerStore;
9
10mod indexer_analytics_store;
11pub mod indexer_store;
12pub mod package_resolver;
13mod pg_indexer_analytical_store;
14mod pg_indexer_store;
15pub mod pg_partition_manager;
16
17pub mod diesel_macro {
18    thread_local! {
19        pub static CALLED_FROM_BLOCKING_POOL: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
20    }
21
22    #[macro_export]
23    macro_rules! read_only_repeatable_blocking {
24        ($pool:expr, $query:expr) => {{
25            use downcast::Any;
26            use $crate::db::{PoolConnection, get_pool_connection};
27
28            let mut pool_conn = get_pool_connection($pool)?;
29            pool_conn
30                .as_any_mut()
31                .downcast_mut::<PoolConnection>()
32                .unwrap()
33                .build_transaction()
34                .read_only()
35                .repeatable_read()
36                .run($query)
37                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
38        }};
39    }
40
41    #[macro_export]
42    macro_rules! read_only_blocking {
43        ($pool:expr, $query:expr) => {{
44            use downcast::Any;
45            use $crate::db::{PoolConnection, get_pool_connection};
46
47            let mut pool_conn = get_pool_connection($pool)?;
48            pool_conn
49                .as_any_mut()
50                .downcast_mut::<PoolConnection>()
51                .unwrap()
52                .build_transaction()
53                .read_only()
54                .run($query)
55                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
56        }};
57    }
58
59    #[macro_export]
60    macro_rules! transactional_blocking_with_retry {
61        ($pool:expr, $query:expr, $max_elapsed:expr) => {{
62            use $crate::{
63                db::{PoolConnection, get_pool_connection},
64                errors::IndexerError,
65            };
66            let mut backoff = backoff::ExponentialBackoff::default();
67            backoff.max_elapsed_time = Some($max_elapsed);
68            let result = match backoff::retry(backoff, || {
69                let mut pool_conn =
70                    get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
71                        err: IndexerError::PostgresWrite(e.to_string()),
72                        retry_after: None,
73                    })?;
74                pool_conn
75                    .as_any_mut()
76                    .downcast_mut::<PoolConnection>()
77                    .unwrap()
78                    .build_transaction()
79                    .read_write()
80                    .run($query)
81                    .map_err(|e| {
82                        tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
83                        backoff::Error::Transient {
84                            err: IndexerError::PostgresWrite(e.to_string()),
85                            retry_after: None,
86                        }
87                    })
88            }) {
89                Ok(v) => Ok(v),
90                Err(backoff::Error::Transient { err, .. }) => Err(err),
91                Err(backoff::Error::Permanent(err)) => Err(err),
92            };
93
94            result
95        }};
96    }
97
98    #[macro_export]
99    macro_rules! transactional_blocking_with_retry_with_conditional_abort {
100        ($pool:expr, $query:expr, $abort_condition:expr, $max_elapsed:expr) => {{
101            use $crate::{
102                db::{PoolConnection, get_pool_connection},
103                errors::IndexerError,
104            };
105            let mut backoff = backoff::ExponentialBackoff::default();
106            backoff.max_elapsed_time = Some($max_elapsed);
107            let result = match backoff::retry(backoff, || {
108                let mut pool_conn =
109                    get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
110                        err: IndexerError::PostgresWrite(e.to_string()),
111                        retry_after: None,
112                    })?;
113                pool_conn
114                    .as_any_mut()
115                    .downcast_mut::<PoolConnection>()
116                    .unwrap()
117                    .build_transaction()
118                    .read_write()
119                    .run($query)
120                    .map_err(|e| {
121                        tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
122                        if $abort_condition(&e) {
123                            backoff::Error::Permanent(e)
124                        } else {
125                            backoff::Error::Transient {
126                                err: IndexerError::PostgresWrite(e.to_string()),
127                                retry_after: None,
128                            }
129                        }
130                    })
131            }) {
132                Ok(v) => Ok(v),
133                Err(backoff::Error::Transient { err, .. }) => Err(err),
134                Err(backoff::Error::Permanent(err)) => Err(err),
135            };
136
137            result
138        }};
139    }
140
141    #[macro_export]
142    macro_rules! spawn_read_only_blocking {
143        ($pool:expr, $query:expr, $repeatable_read:expr) => {{
144            use downcast::Any;
145            use $crate::{
146                db::{PoolConnection, get_pool_connection},
147                errors::IndexerError,
148                store::diesel_macro::CALLED_FROM_BLOCKING_POOL,
149            };
150            let current_span = tracing::Span::current();
151            tokio::task::spawn_blocking(move || {
152                CALLED_FROM_BLOCKING_POOL
153                    .with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
154                let _guard = current_span.enter();
155                let mut pool_conn = get_pool_connection($pool).unwrap();
156
157                if $repeatable_read {
158                    pool_conn
159                        .as_any_mut()
160                        .downcast_mut::<PoolConnection>()
161                        .unwrap()
162                        .build_transaction()
163                        .read_only()
164                        .repeatable_read()
165                        .run($query)
166                        .map_err(|e| IndexerError::PostgresRead(e.to_string()))
167                } else {
168                    pool_conn
169                        .as_any_mut()
170                        .downcast_mut::<PoolConnection>()
171                        .unwrap()
172                        .build_transaction()
173                        .read_only()
174                        .run($query)
175                        .map_err(|e| IndexerError::PostgresRead(e.to_string()))
176                }
177            })
178            .await
179            .expect("Blocking call failed")
180        }};
181    }
182
183    #[macro_export]
184    macro_rules! insert_or_ignore_into {
185        ($table:expr, $values:expr, $conn:expr) => {{
186            use diesel::RunQueryDsl;
187            let error_message = concat!("Failed to write to ", stringify!($table), " DB");
188
189            diesel::insert_into($table)
190                .values($values)
191                .on_conflict_do_nothing()
192                .execute($conn)
193                .map_err(IndexerError::from)
194                .context(error_message)?;
195        }};
196    }
197
198    #[macro_export]
199    macro_rules! on_conflict_do_update {
200        ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $conn:expr) => {{
201            use diesel::{ExpressionMethods, RunQueryDsl};
202
203            diesel::insert_into($table)
204                .values($values)
205                .on_conflict($target)
206                .do_update()
207                .set($pg_columns)
208                .execute($conn)?;
209        }};
210    }
211
212    #[macro_export]
213    macro_rules! on_conflict_do_update_with_condition {
214        ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{
215            use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl};
216
217            diesel::insert_into($table)
218                .values($values)
219                .on_conflict($target)
220                .do_update()
221                .set($pg_columns)
222                .filter($condition)
223                .execute($conn)?;
224        }};
225    }
226
227    #[macro_export]
228    macro_rules! run_query {
229        ($pool:expr, $query:expr) => {{
230            blocking_call_is_ok_or_panic!();
231            read_only_blocking!($pool, $query)
232        }};
233    }
234
235    #[macro_export]
236    macro_rules! run_query_repeatable {
237        ($pool:expr, $query:expr) => {{
238            blocking_call_is_ok_or_panic!();
239            read_only_repeatable_blocking!($pool, $query)
240        }};
241    }
242
243    #[macro_export]
244    macro_rules! run_query_async {
245        ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, false) }};
246    }
247
248    #[macro_export]
249    macro_rules! run_query_repeatable_async {
250        ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, true) }};
251    }
252
253    /// Check that we are in a context conducive to making blocking calls.
254    /// This is done by either:
255    /// - Checking that we are not inside a tokio runtime context
256    ///
257    /// Or:
258    /// - If we are inside a tokio runtime context, ensure that the call went
259    ///   through `IndexerReader::spawn_blocking` which properly moves the
260    ///   blocking call to a blocking thread pool.
261    #[macro_export]
262    macro_rules! blocking_call_is_ok_or_panic {
263        () => {{
264            use $crate::store::diesel_macro::CALLED_FROM_BLOCKING_POOL;
265            if tokio::runtime::Handle::try_current().is_ok()
266                && !CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow())
267            {
268                panic!(
269                    "You are calling a blocking DB operation directly on an async thread. \
270                        Please use IndexerReader::spawn_blocking instead to move the \
271                        operation to a blocking thread"
272                );
273            }
274        }};
275    }
276
277    /// This macro provides a standardized way to bulk insert data into database
278    /// tables with built-in performance monitoring, error handling, and
279    /// retry logic. It automatically subdivides large chunks into smaller
280    /// batches to optimize database performance and avoid overwhelming
281    /// individual transactions.
282    ///
283    /// # Parameters
284    ///
285    /// * `$table` - The target database table (e.g., `events::table`,
286    ///   `transactions::table`)
287    /// * `$chunk` - Collection of data to persist (must implement `.len()` and
288    ///   `.chunks()`)
289    /// * `$pool` - Database connection pool reference
290    ///
291    /// # Behavior
292    ///
293    /// 1. **Performance Timing**: Records operation duration for monitoring
294    /// 2. **Automatic Batching**: Splits data into chunks of
295    ///    `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX` rows
296    /// 3. **Transaction Safety**: Uses `transactional_blocking_with_retry!` for
297    ///    atomic operations
298    /// 4. **Conflict Resolution**: Employs `INSERT ... ON CONFLICT DO NOTHING`
299    ///    strategy
300    /// 5. **Retry Logic**: Automatically retries failed operations with timeout
301    ///    of `PG_DB_COMMIT_SLEEP_DURATION`
302    /// 6. **Comprehensive Logging**: Logs success/failure with timing and row
303    ///    count information
304    ///
305    /// # Examples
306    ///
307    /// ```rust,ignore
308    /// let event_batch = vec![/* event data */];
309    /// // Persist event data
310    /// persist_chunk_into_table!(
311    ///     events::table,
312    ///     event_batch,
313    ///     &connection_pool
314    /// ).unwrap();
315    ///
316    /// let sender_data = vec![/* sender data */];
317    /// // Persist transaction senders
318    /// persist_chunk_into_table!(
319    ///     tx_senders::table,
320    ///     sender_data,
321    ///     &blocking_pool
322    /// ).unwrap();
323    /// ```
324    #[macro_export]
325    macro_rules! persist_chunk_into_table {
326        ($table:expr, $chunk:expr, $pool:expr) => {{
327            let now = std::time::Instant::now();
328            let chunk_len = $chunk.len();
329            transactional_blocking_with_retry!(
330                $pool,
331                |conn| {
332                    persist_chunk_into_table_in_existing_connection!($table, $chunk, conn);
333                    Ok::<(), IndexerError>(())
334                },
335                PG_DB_COMMIT_SLEEP_DURATION
336            )
337            .tap_ok(|_| {
338                let elapsed = now.elapsed().as_secs_f64();
339                info!(
340                    elapsed,
341                    "Persisted {} rows to {}",
342                    chunk_len,
343                    stringify!($table),
344                );
345            })
346            .tap_err(|e| {
347                tracing::error!("Failed to persist {} with error: {}", stringify!($table), e);
348            })
349        }};
350    }
351
352    #[macro_export]
353    macro_rules! persist_chunk_into_table_in_existing_connection {
354        ($table:expr, $chunk:expr, $conn:expr) => {{
355            for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
356                insert_or_ignore_into!($table, chunk, $conn);
357            }
358        }};
359    }
360
361    pub use blocking_call_is_ok_or_panic;
362    pub use read_only_blocking;
363    pub use read_only_repeatable_blocking;
364    pub use run_query;
365    pub use run_query_async;
366    pub use run_query_repeatable;
367    pub use run_query_repeatable_async;
368    pub use spawn_read_only_blocking;
369    pub use transactional_blocking_with_retry;
370}