iota_indexer/store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub(crate) use indexer_analytics_store::IndexerAnalyticalStore;
6pub(crate) use indexer_store::*;
7pub use pg_indexer_analytical_store::PgIndexerAnalyticalStore;
8pub use pg_indexer_store::{PgIndexerStore, TxGlobalOrderCursor};
9
10mod indexer_analytics_store;
11pub mod indexer_store;
12pub mod package_resolver;
13mod pg_indexer_analytical_store;
14mod pg_indexer_store;
15pub mod pg_partition_manager;
16
17pub mod diesel_macro {
18    thread_local! {
19        pub static CALLED_FROM_BLOCKING_POOL: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
20    }
21
22    /// Marks the current thread as being in a blocking pool.
23    ///
24    /// Call this at the start of any `spawn_blocking` closure that will perform
25    /// blocking DB operations.
26    pub fn mark_in_blocking_pool() {
27        CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
28    }
29
30    #[macro_export]
31    macro_rules! read_only_repeatable_blocking {
32        ($pool:expr, $query:expr) => {{
33            use downcast::Any;
34            use $crate::db::{PoolConnection, get_pool_connection};
35
36            let mut pool_conn = get_pool_connection($pool)?;
37            pool_conn
38                .as_any_mut()
39                .downcast_mut::<PoolConnection>()
40                .unwrap()
41                .build_transaction()
42                .read_only()
43                .repeatable_read()
44                .run($query)
45                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
46        }};
47    }
48
49    #[macro_export]
50    macro_rules! read_only_blocking {
51        ($pool:expr, $query:expr) => {{
52            use downcast::Any;
53            use $crate::db::{PoolConnection, get_pool_connection};
54
55            let mut pool_conn = get_pool_connection($pool)?;
56            pool_conn
57                .as_any_mut()
58                .downcast_mut::<PoolConnection>()
59                .unwrap()
60                .build_transaction()
61                .read_only()
62                .run($query)
63                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
64        }};
65    }
66
67    #[macro_export]
68    macro_rules! transactional_blocking_with_retry {
69        ($pool:expr, $query:expr, $max_elapsed:expr) => {{
70            use $crate::{
71                db::{PoolConnection, get_pool_connection},
72                errors::IndexerError,
73            };
74            let mut backoff = backoff::ExponentialBackoff::default();
75            backoff.max_elapsed_time = Some($max_elapsed);
76            let result = match backoff::retry(backoff, || {
77                let mut pool_conn =
78                    get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
79                        err: IndexerError::PostgresWrite(e.to_string()),
80                        retry_after: None,
81                    })?;
82                pool_conn
83                    .as_any_mut()
84                    .downcast_mut::<PoolConnection>()
85                    .unwrap()
86                    .build_transaction()
87                    .read_write()
88                    .run($query)
89                    .map_err(|e| {
90                        tracing::error!("error with persisting data into DB: {e:?}, retrying...");
91                        backoff::Error::Transient {
92                            err: IndexerError::PostgresWrite(e.to_string()),
93                            retry_after: None,
94                        }
95                    })
96            }) {
97                Ok(v) => Ok(v),
98                Err(backoff::Error::Transient { err, .. }) => Err(err),
99                Err(backoff::Error::Permanent(err)) => Err(err),
100            };
101
102            result
103        }};
104    }
105
106    #[macro_export]
107    macro_rules! transactional_blocking_with_retry_with_conditional_abort {
108        ($pool:expr, $query:expr, $abort_condition:expr, $max_elapsed:expr) => {{
109            use $crate::{
110                db::{PoolConnection, get_pool_connection},
111                errors::IndexerError,
112            };
113            let mut backoff = backoff::ExponentialBackoff::default();
114            backoff.max_elapsed_time = Some($max_elapsed);
115            let result = match backoff::retry(backoff, || {
116                let mut pool_conn =
117                    get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
118                        err: IndexerError::PostgresWrite(e.to_string()),
119                        retry_after: None,
120                    })?;
121                pool_conn
122                    .as_any_mut()
123                    .downcast_mut::<PoolConnection>()
124                    .unwrap()
125                    .build_transaction()
126                    .read_write()
127                    .run($query)
128                    .map_err(|e| {
129                        tracing::error!("error with persisting data into DB: {e:?}, retrying...");
130                        if $abort_condition(&e) {
131                            backoff::Error::Permanent(e)
132                        } else {
133                            backoff::Error::Transient {
134                                err: IndexerError::PostgresWrite(e.to_string()),
135                                retry_after: None,
136                            }
137                        }
138                    })
139            }) {
140                Ok(v) => Ok(v),
141                Err(backoff::Error::Transient { err, .. }) => Err(err),
142                Err(backoff::Error::Permanent(err)) => Err(err),
143            };
144
145            result
146        }};
147    }
148
149    #[macro_export]
150    macro_rules! spawn_read_only_blocking {
151        ($pool:expr, $query:expr, $repeatable_read:expr) => {{
152            use downcast::Any;
153            use $crate::{
154                db::{PoolConnection, get_pool_connection},
155                errors::IndexerError,
156                store::diesel_macro::mark_in_blocking_pool,
157            };
158            let current_span = tracing::Span::current();
159            tokio::task::spawn_blocking(move || {
160                mark_in_blocking_pool();
161                let _guard = current_span.enter();
162                let mut pool_conn = get_pool_connection($pool).unwrap();
163
164                if $repeatable_read {
165                    pool_conn
166                        .as_any_mut()
167                        .downcast_mut::<PoolConnection>()
168                        .unwrap()
169                        .build_transaction()
170                        .read_only()
171                        .repeatable_read()
172                        .run($query)
173                        .map_err(|e| IndexerError::PostgresRead(e.to_string()))
174                } else {
175                    pool_conn
176                        .as_any_mut()
177                        .downcast_mut::<PoolConnection>()
178                        .unwrap()
179                        .build_transaction()
180                        .read_only()
181                        .run($query)
182                        .map_err(|e| IndexerError::PostgresRead(e.to_string()))
183                }
184            })
185            .await
186            .expect("blocking call failed")
187        }};
188    }
189
190    #[macro_export]
191    macro_rules! insert_or_ignore_into {
192        ($table:expr, $values:expr, $conn:expr) => {{
193            use diesel::RunQueryDsl;
194            let error_message = concat!("failed to write to ", stringify!($table), " DB");
195
196            diesel::insert_into($table)
197                .values($values)
198                .on_conflict_do_nothing()
199                .execute($conn)
200                .map_err(IndexerError::from)
201                .context(error_message)?;
202        }};
203    }
204
205    #[macro_export]
206    macro_rules! on_conflict_do_update {
207        ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $conn:expr) => {{
208            use diesel::{ExpressionMethods, RunQueryDsl};
209
210            diesel::insert_into($table)
211                .values($values)
212                .on_conflict($target)
213                .do_update()
214                .set($pg_columns)
215                .execute($conn)?;
216        }};
217    }
218
219    #[macro_export]
220    macro_rules! on_conflict_do_update_with_condition {
221        ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{
222            use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl};
223
224            diesel::insert_into($table)
225                .values($values)
226                .on_conflict($target)
227                .do_update()
228                .set($pg_columns)
229                .filter($condition)
230                .execute($conn)?;
231        }};
232    }
233
234    #[macro_export]
235    macro_rules! run_query {
236        ($pool:expr, $query:expr) => {{
237            blocking_call_is_ok_or_panic!();
238            read_only_blocking!($pool, $query)
239        }};
240    }
241
242    #[macro_export]
243    macro_rules! run_query_repeatable {
244        ($pool:expr, $query:expr) => {{
245            blocking_call_is_ok_or_panic!();
246            read_only_repeatable_blocking!($pool, $query)
247        }};
248    }
249
250    #[macro_export]
251    macro_rules! run_query_with_retry {
252        ($pool:expr, $query:expr, $max_elapsed:expr) => {{
253            blocking_call_is_ok_or_panic!();
254            let mut backoff = backoff::ExponentialBackoff::default();
255            backoff.max_elapsed_time = Some($max_elapsed);
256            let result = match backoff::retry(backoff, || {
257                read_only_blocking!($pool, $query).map_err(|e| {
258                    tracing::error!("error with reading data from DB: {e:?}, retrying...");
259                    backoff::Error::Transient {
260                        err: e,
261                        retry_after: None,
262                    }
263                })
264            }) {
265                Ok(v) => Ok(v),
266                Err(backoff::Error::Transient { err, .. }) => Err(err),
267                Err(backoff::Error::Permanent(err)) => Err(err),
268            };
269
270            result
271        }};
272    }
273
274    #[macro_export]
275    macro_rules! run_query_async {
276        ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, false) }};
277    }
278
279    #[macro_export]
280    macro_rules! run_query_repeatable_async {
281        ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, true) }};
282    }
283
284    /// Check that we are in a context conducive to making blocking calls.
285    /// This is done by either:
286    /// - Checking that we are not inside a tokio runtime context
287    ///
288    /// Or:
289    /// - If we are inside a tokio runtime context, ensure that the call went
290    ///   through `IndexerReader::spawn_blocking` which properly moves the
291    ///   blocking call to a blocking thread pool.
292    #[macro_export]
293    macro_rules! blocking_call_is_ok_or_panic {
294        () => {{
295            use $crate::store::diesel_macro::CALLED_FROM_BLOCKING_POOL;
296            if tokio::runtime::Handle::try_current().is_ok()
297                && !CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow())
298            {
299                panic!(
300                    "you are calling a blocking DB operation directly on an async thread. \
301                        Please use IndexerReader::spawn_blocking instead to move the \
302                        operation to a blocking thread"
303                );
304            }
305        }};
306    }
307
308    /// This macro provides a standardized way to bulk insert data into database
309    /// tables with built-in performance monitoring, error handling, and
310    /// retry logic. It automatically subdivides large chunks into smaller
311    /// batches to optimize database performance and avoid overwhelming
312    /// individual transactions.
313    ///
314    /// # Parameters
315    ///
316    /// * `$table` - The target database table (e.g., `events::table`,
317    ///   `transactions::table`)
318    /// * `$chunk` - Collection of data to persist (must implement `.len()` and
319    ///   `.chunks()`)
320    /// * `$pool` - Database connection pool reference
321    ///
322    /// # Behavior
323    ///
324    /// 1. **Performance Timing**: Records operation duration for monitoring
325    /// 2. **Automatic Batching**: Splits data into chunks of
326    ///    `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX` rows
327    /// 3. **Transaction Safety**: Uses `transactional_blocking_with_retry!` for
328    ///    atomic operations
329    /// 4. **Conflict Resolution**: Employs `INSERT ... ON CONFLICT DO NOTHING`
330    ///    strategy
331    /// 5. **Retry Logic**: Automatically retries failed operations with timeout
332    ///    of `PG_DB_COMMIT_SLEEP_DURATION`
333    /// 6. **Comprehensive Logging**: Logs success/failure with timing and row
334    ///    count information
335    ///
336    /// # Examples
337    ///
338    /// ```rust,ignore
339    /// let event_batch = vec![/* event data */];
340    /// // Persist event data
341    /// persist_chunk_into_table!(
342    ///     events::table,
343    ///     event_batch,
344    ///     &connection_pool
345    /// ).unwrap();
346    ///
347    /// let sender_data = vec![/* sender data */];
348    /// // Persist transaction senders
349    /// persist_chunk_into_table!(
350    ///     tx_senders::table,
351    ///     sender_data,
352    ///     &blocking_pool
353    /// ).unwrap();
354    /// ```
355    #[macro_export]
356    macro_rules! persist_chunk_into_table {
357        ($table:expr, $chunk:expr, $pool:expr) => {{
358            let now = std::time::Instant::now();
359            let chunk_len = $chunk.len();
360            transactional_blocking_with_retry!(
361                $pool,
362                |conn| {
363                    persist_chunk_into_table_in_existing_connection!($table, $chunk, conn);
364                    Ok::<(), IndexerError>(())
365                },
366                PG_DB_COMMIT_SLEEP_DURATION
367            )
368            .tap_ok(|_| {
369                let elapsed = now.elapsed().as_secs_f64();
370                info!(
371                    elapsed,
372                    "Persisted {} rows to {}",
373                    chunk_len,
374                    stringify!($table),
375                );
376            })
377            .tap_err(|e| {
378                tracing::error!("failed to persist {} with error: {e}", stringify!($table));
379            })
380        }};
381    }
382
383    #[macro_export]
384    macro_rules! persist_chunk_into_table_in_existing_connection {
385        ($table:expr, $chunk:expr, $conn:expr) => {{
386            for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
387                insert_or_ignore_into!($table, chunk, $conn);
388            }
389        }};
390    }
391
392    pub use blocking_call_is_ok_or_panic;
393    pub use read_only_blocking;
394    pub use read_only_repeatable_blocking;
395    pub use run_query;
396    pub use run_query_async;
397    pub use run_query_repeatable;
398    pub use run_query_repeatable_async;
399    pub use spawn_read_only_blocking;
400    pub use transactional_blocking_with_retry;
401}