iota_indexer/store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub(crate) use indexer_analytics_store::IndexerAnalyticalStore;
6pub(crate) use indexer_store::*;
7pub use pg_indexer_analytical_store::PgIndexerAnalyticalStore;
8pub use pg_indexer_store::{PgIndexerStore, TxGlobalOrderCursor};
9
10mod indexer_analytics_store;
11pub mod indexer_store;
12pub mod package_resolver;
13mod pg_indexer_analytical_store;
14mod pg_indexer_store;
15pub mod pg_partition_manager;
16
17pub mod diesel_macro {
18    thread_local! {
19        pub static CALLED_FROM_BLOCKING_POOL: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
20    }
21
22    /// Marks the current thread as being in a blocking pool.
23    ///
24    /// Call this at the start of any `spawn_blocking` closure that will perform
25    /// blocking DB operations.
26    pub fn mark_in_blocking_pool() {
27        CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
28    }
29
30    #[macro_export]
31    macro_rules! read_only_repeatable_blocking {
32        ($pool:expr, $query:expr) => {{
33            use downcast::Any;
34            use $crate::db::{PoolConnection, get_pool_connection};
35
36            let mut pool_conn = get_pool_connection($pool)?;
37            pool_conn
38                .as_any_mut()
39                .downcast_mut::<PoolConnection>()
40                .unwrap()
41                .build_transaction()
42                .read_only()
43                .repeatable_read()
44                .run($query)
45                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
46        }};
47    }
48
49    /// Runs a blocking SQL query.
50    ///
51    /// In an async context, it must be wrapped in an spawn blocking task.
52    #[macro_export]
53    macro_rules! read_only_blocking {
54        ($pool:expr, $query:expr) => {{
55            use downcast::Any;
56            use $crate::db::{PoolConnection, get_pool_connection};
57
58            let mut pool_conn = get_pool_connection($pool)?;
59            pool_conn
60                .as_any_mut()
61                .downcast_mut::<PoolConnection>()
62                .unwrap()
63                .build_transaction()
64                .read_only()
65                .run($query)
66                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
67        }};
68    }
69
70    /// Runs a blocking SQL query.
71    ///
72    /// In an async context, it must be wrapped in an spawn blocking task.
73    #[macro_export]
74    macro_rules! transactional_blocking_with_retry {
75        ($pool:expr, $query:expr, $max_elapsed:expr) => {{
76            use $crate::{
77                db::{PoolConnection, get_pool_connection},
78                errors::IndexerError,
79            };
80            let mut backoff = backoff::ExponentialBackoff::default();
81            backoff.max_elapsed_time = Some($max_elapsed);
82            let result = match backoff::retry(backoff, || {
83                let mut pool_conn =
84                    get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
85                        err: IndexerError::PostgresWrite(e.to_string()),
86                        retry_after: None,
87                    })?;
88                pool_conn
89                    .as_any_mut()
90                    .downcast_mut::<PoolConnection>()
91                    .unwrap()
92                    .build_transaction()
93                    .read_write()
94                    .run($query)
95                    .map_err(|e| {
96                        tracing::error!("error with persisting data into DB: {e:?}, retrying...");
97                        backoff::Error::Transient {
98                            err: IndexerError::PostgresWrite(e.to_string()),
99                            retry_after: None,
100                        }
101                    })
102            }) {
103                Ok(v) => Ok(v),
104                Err(backoff::Error::Transient { err, .. }) => Err(err),
105                Err(backoff::Error::Permanent(err)) => Err(err),
106            };
107
108            result
109        }};
110    }
111
112    /// Runs a blocking SQL query.
113    ///
114    /// In an async context, it must be wrapped in an spawn blocking task.
115    #[macro_export]
116    macro_rules! transactional_blocking_with_retry_with_conditional_abort {
117        ($pool:expr, $query:expr, $abort_condition:expr, $max_elapsed:expr) => {{
118            use $crate::{
119                db::{PoolConnection, get_pool_connection},
120                errors::IndexerError,
121            };
122            let mut backoff = backoff::ExponentialBackoff::default();
123            backoff.max_elapsed_time = Some($max_elapsed);
124            let result = match backoff::retry(backoff, || {
125                let mut pool_conn =
126                    get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
127                        err: IndexerError::PostgresWrite(e.to_string()),
128                        retry_after: None,
129                    })?;
130                pool_conn
131                    .as_any_mut()
132                    .downcast_mut::<PoolConnection>()
133                    .unwrap()
134                    .build_transaction()
135                    .read_write()
136                    .run($query)
137                    .map_err(|e| {
138                        tracing::error!("error with persisting data into DB: {e:?}, retrying...");
139                        if $abort_condition(&e) {
140                            backoff::Error::Permanent(e)
141                        } else {
142                            backoff::Error::Transient {
143                                err: IndexerError::PostgresWrite(e.to_string()),
144                                retry_after: None,
145                            }
146                        }
147                    })
148            }) {
149                Ok(v) => Ok(v),
150                Err(backoff::Error::Transient { err, .. }) => Err(err),
151                Err(backoff::Error::Permanent(err)) => Err(err),
152            };
153
154            result
155        }};
156    }
157
158    /// Runs an async SQL query wrapped in a spawn blocking task.
159    #[macro_export]
160    macro_rules! spawn_read_only_blocking {
161        ($pool:expr, $query:expr, $repeatable_read:expr) => {{
162            use downcast::Any;
163            use $crate::{
164                db::{PoolConnection, get_pool_connection},
165                errors::IndexerError,
166                store::diesel_macro::mark_in_blocking_pool,
167            };
168            let current_span = tracing::Span::current();
169            tokio::task::spawn_blocking(move || {
170                mark_in_blocking_pool();
171                let _guard = current_span.enter();
172                let mut pool_conn = get_pool_connection($pool).unwrap();
173
174                if $repeatable_read {
175                    pool_conn
176                        .as_any_mut()
177                        .downcast_mut::<PoolConnection>()
178                        .unwrap()
179                        .build_transaction()
180                        .read_only()
181                        .repeatable_read()
182                        .run($query)
183                        .map_err(|e| IndexerError::PostgresRead(e.to_string()))
184                } else {
185                    pool_conn
186                        .as_any_mut()
187                        .downcast_mut::<PoolConnection>()
188                        .unwrap()
189                        .build_transaction()
190                        .read_only()
191                        .run($query)
192                        .map_err(|e| IndexerError::PostgresRead(e.to_string()))
193                }
194            })
195            .await
196            .expect("blocking call failed")
197        }};
198    }
199
200    #[macro_export]
201    macro_rules! insert_or_ignore_into {
202        ($table:expr, $values:expr, $conn:expr) => {{
203            use diesel::RunQueryDsl;
204            let error_message = concat!("failed to write to ", stringify!($table), " DB");
205
206            diesel::insert_into($table)
207                .values($values)
208                .on_conflict_do_nothing()
209                .execute($conn)
210                .map_err(IndexerError::from)
211                .context(error_message)?;
212        }};
213    }
214
215    #[macro_export]
216    macro_rules! on_conflict_do_update {
217        ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $conn:expr) => {{
218            use diesel::{ExpressionMethods, RunQueryDsl};
219
220            diesel::insert_into($table)
221                .values($values)
222                .on_conflict($target)
223                .do_update()
224                .set($pg_columns)
225                .execute($conn)?;
226        }};
227    }
228
229    #[macro_export]
230    macro_rules! on_conflict_do_update_with_condition {
231        ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{
232            use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl};
233
234            diesel::insert_into($table)
235                .values($values)
236                .on_conflict($target)
237                .do_update()
238                .set($pg_columns)
239                .filter($condition)
240                .execute($conn)?;
241        }};
242    }
243
244    /// Runs a blocking SQL query.
245    ///
246    /// In an async context, it must be wrapped in an spawn blocking task.
247    #[macro_export]
248    macro_rules! run_query {
249        ($pool:expr, $query:expr) => {{
250            blocking_call_is_ok_or_panic!();
251            read_only_blocking!($pool, $query)
252        }};
253    }
254
255    /// Runs a blocking SQL query.
256    ///
257    /// In an async context, it must be wrapped in an spawn blocking task.
258    #[macro_export]
259    macro_rules! run_query_repeatable {
260        ($pool:expr, $query:expr) => {{
261            blocking_call_is_ok_or_panic!();
262            read_only_repeatable_blocking!($pool, $query)
263        }};
264    }
265
266    /// Runs a blocking SQL query.
267    ///
268    /// In an async context, it must be wrapped in an spawn blocking task.
269    #[macro_export]
270    macro_rules! run_query_with_retry {
271        ($pool:expr, $query:expr, $max_elapsed:expr) => {{
272            blocking_call_is_ok_or_panic!();
273            let mut backoff = backoff::ExponentialBackoff::default();
274            backoff.max_elapsed_time = Some($max_elapsed);
275            let result = match backoff::retry(backoff, || {
276                read_only_blocking!($pool, $query).map_err(|e| {
277                    tracing::error!("error with reading data from DB: {e:?}, retrying...");
278                    backoff::Error::Transient {
279                        err: e,
280                        retry_after: None,
281                    }
282                })
283            }) {
284                Ok(v) => Ok(v),
285                Err(backoff::Error::Transient { err, .. }) => Err(err),
286                Err(backoff::Error::Permanent(err)) => Err(err),
287            };
288
289            result
290        }};
291    }
292
293    /// Runs an async SQL query wrapped in a spawn blocking task.
294    #[macro_export]
295    macro_rules! run_query_async {
296        ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, false) }};
297    }
298
299    #[macro_export]
300    macro_rules! run_query_repeatable_async {
301        ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, true) }};
302    }
303
304    /// Check that we are in a context conducive to making blocking calls.
305    /// This is done by either:
306    /// - Checking that we are not inside a tokio runtime context
307    ///
308    /// Or:
309    /// - If we are inside a tokio runtime context, ensure that the call went
310    ///   through `IndexerReader::spawn_blocking` which properly moves the
311    ///   blocking call to a blocking thread pool.
312    #[macro_export]
313    macro_rules! blocking_call_is_ok_or_panic {
314        () => {{
315            use $crate::store::diesel_macro::CALLED_FROM_BLOCKING_POOL;
316            if tokio::runtime::Handle::try_current().is_ok()
317                && !CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow())
318            {
319                panic!(
320                    "you are calling a blocking DB operation directly on an async thread. \
321                        Please use IndexerReader::spawn_blocking instead to move the \
322                        operation to a blocking thread"
323                );
324            }
325        }};
326    }
327
328    /// This macro provides a standardized way to bulk insert data into database
329    /// tables with built-in performance monitoring, error handling, and
330    /// retry logic. It automatically subdivides large chunks into smaller
331    /// batches to optimize database performance and avoid overwhelming
332    /// individual transactions.
333    ///
334    /// # Parameters
335    ///
336    /// * `$table` - The target database table (e.g., `events::table`,
337    ///   `transactions::table`)
338    /// * `$chunk` - Collection of data to persist (must implement `.len()` and
339    ///   `.chunks()`)
340    /// * `$pool` - Database connection pool reference
341    ///
342    /// # Behavior
343    ///
344    /// 1. **Performance Timing**: Records operation duration for monitoring
345    /// 2. **Automatic Batching**: Splits data into chunks of
346    ///    `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX` rows
347    /// 3. **Transaction Safety**: Uses `transactional_blocking_with_retry!` for
348    ///    atomic operations
349    /// 4. **Conflict Resolution**: Employs `INSERT ... ON CONFLICT DO NOTHING`
350    ///    strategy
351    /// 5. **Retry Logic**: Automatically retries failed operations with timeout
352    ///    of `PG_DB_COMMIT_SLEEP_DURATION`
353    /// 6. **Comprehensive Logging**: Logs success/failure with timing and row
354    ///    count information
355    ///
356    /// # Examples
357    ///
358    /// ```rust,ignore
359    /// let event_batch = vec![/* event data */];
360    /// // Persist event data
361    /// persist_chunk_into_table!(
362    ///     events::table,
363    ///     event_batch,
364    ///     &connection_pool
365    /// ).unwrap();
366    ///
367    /// let sender_data = vec![/* sender data */];
368    /// // Persist transaction senders
369    /// persist_chunk_into_table!(
370    ///     tx_senders::table,
371    ///     sender_data,
372    ///     &blocking_pool
373    /// ).unwrap();
374    /// ```
375    #[macro_export]
376    macro_rules! persist_chunk_into_table {
377        ($table:expr, $chunk:expr, $pool:expr) => {{
378            let now = std::time::Instant::now();
379            let chunk_len = $chunk.len();
380            transactional_blocking_with_retry!(
381                $pool,
382                |conn| {
383                    persist_chunk_into_table_in_existing_connection!($table, $chunk, conn);
384                    Ok::<(), IndexerError>(())
385                },
386                PG_DB_COMMIT_SLEEP_DURATION
387            )
388            .tap_ok(|_| {
389                let elapsed = now.elapsed().as_secs_f64();
390                info!(
391                    elapsed,
392                    "Persisted {} rows to {}",
393                    chunk_len,
394                    stringify!($table),
395                );
396            })
397            .tap_err(|e| {
398                tracing::error!("failed to persist {} with error: {e}", stringify!($table));
399            })
400        }};
401    }
402
403    #[macro_export]
404    macro_rules! persist_chunk_into_table_in_existing_connection {
405        ($table:expr, $chunk:expr, $conn:expr) => {{
406            for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
407                insert_or_ignore_into!($table, chunk, $conn);
408            }
409        }};
410    }
411
412    pub use blocking_call_is_ok_or_panic;
413    pub use read_only_blocking;
414    pub use read_only_repeatable_blocking;
415    pub use run_query;
416    pub use run_query_async;
417    pub use run_query_repeatable;
418    pub use run_query_repeatable_async;
419    pub use spawn_read_only_blocking;
420    pub use transactional_blocking_with_retry;
421}