iota_indexer/store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub(crate) use indexer_analytics_store::IndexerAnalyticalStore;
6pub(crate) use indexer_store::*;
7pub use pg_indexer_analytical_store::PgIndexerAnalyticalStore;
8pub use pg_indexer_store::PgIndexerStore;
9
10mod indexer_analytics_store;
11pub mod indexer_store;
12pub mod package_resolver;
13mod pg_indexer_analytical_store;
14mod pg_indexer_store;
15pub mod pg_partition_manager;
16
17pub mod diesel_macro {
18    thread_local! {
19        pub static CALLED_FROM_BLOCKING_POOL: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
20    }
21
22    #[macro_export]
23    macro_rules! read_only_repeatable_blocking {
24        ($pool:expr, $query:expr) => {{
25            use downcast::Any;
26            use $crate::db::{PoolConnection, get_pool_connection};
27
28            let mut pool_conn = get_pool_connection($pool)?;
29            pool_conn
30                .as_any_mut()
31                .downcast_mut::<PoolConnection>()
32                .unwrap()
33                .build_transaction()
34                .read_only()
35                .repeatable_read()
36                .run($query)
37                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
38        }};
39    }
40
41    #[macro_export]
42    macro_rules! read_only_blocking {
43        ($pool:expr, $query:expr) => {{
44            use downcast::Any;
45            use $crate::db::{PoolConnection, get_pool_connection};
46
47            let mut pool_conn = get_pool_connection($pool)?;
48            pool_conn
49                .as_any_mut()
50                .downcast_mut::<PoolConnection>()
51                .unwrap()
52                .build_transaction()
53                .read_only()
54                .run($query)
55                .map_err(|e| IndexerError::PostgresRead(e.to_string()))
56        }};
57    }
58
59    #[macro_export]
60    macro_rules! transactional_blocking_with_retry {
61        ($pool:expr, $query:expr, $max_elapsed:expr) => {{
62            use $crate::{
63                db::{PoolConnection, get_pool_connection},
64                errors::IndexerError,
65            };
66            let mut backoff = backoff::ExponentialBackoff::default();
67            backoff.max_elapsed_time = Some($max_elapsed);
68            let result = match backoff::retry(backoff, || {
69                let mut pool_conn =
70                    get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
71                        err: IndexerError::PostgresWrite(e.to_string()),
72                        retry_after: None,
73                    })?;
74                pool_conn
75                    .as_any_mut()
76                    .downcast_mut::<PoolConnection>()
77                    .unwrap()
78                    .build_transaction()
79                    .read_write()
80                    .run($query)
81                    .map_err(|e| {
82                        tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
83                        backoff::Error::Transient {
84                            err: IndexerError::PostgresWrite(e.to_string()),
85                            retry_after: None,
86                        }
87                    })
88            }) {
89                Ok(v) => Ok(v),
90                Err(backoff::Error::Transient { err, .. }) => Err(err),
91                Err(backoff::Error::Permanent(err)) => Err(err),
92            };
93
94            result
95        }};
96    }
97
98    #[macro_export]
99    macro_rules! spawn_read_only_blocking {
100        ($pool:expr, $query:expr, $repeatable_read:expr) => {{
101            use downcast::Any;
102            use $crate::{
103                db::{PoolConnection, get_pool_connection},
104                errors::IndexerError,
105                store::diesel_macro::CALLED_FROM_BLOCKING_POOL,
106            };
107            let current_span = tracing::Span::current();
108            tokio::task::spawn_blocking(move || {
109                CALLED_FROM_BLOCKING_POOL
110                    .with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
111                let _guard = current_span.enter();
112                let mut pool_conn = get_pool_connection($pool).unwrap();
113
114                if $repeatable_read {
115                    pool_conn
116                        .as_any_mut()
117                        .downcast_mut::<PoolConnection>()
118                        .unwrap()
119                        .build_transaction()
120                        .read_only()
121                        .repeatable_read()
122                        .run($query)
123                        .map_err(|e| IndexerError::PostgresRead(e.to_string()))
124                } else {
125                    pool_conn
126                        .as_any_mut()
127                        .downcast_mut::<PoolConnection>()
128                        .unwrap()
129                        .build_transaction()
130                        .read_only()
131                        .run($query)
132                        .map_err(|e| IndexerError::PostgresRead(e.to_string()))
133                }
134            })
135            .await
136            .expect("Blocking call failed")
137        }};
138    }
139
140    #[macro_export]
141    macro_rules! insert_or_ignore_into {
142        ($table:expr, $values:expr, $conn:expr) => {{
143            use diesel::RunQueryDsl;
144            let error_message = concat!("Failed to write to ", stringify!($table), " DB");
145
146            diesel::insert_into($table)
147                .values($values)
148                .on_conflict_do_nothing()
149                .execute($conn)
150                .map_err(IndexerError::from)
151                .context(error_message)?;
152        }};
153    }
154
155    #[macro_export]
156    macro_rules! on_conflict_do_update {
157        ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $conn:expr) => {{
158            use diesel::{ExpressionMethods, RunQueryDsl};
159
160            diesel::insert_into($table)
161                .values($values)
162                .on_conflict($target)
163                .do_update()
164                .set($pg_columns)
165                .execute($conn)?;
166        }};
167    }
168
169    #[macro_export]
170    macro_rules! run_query {
171        ($pool:expr, $query:expr) => {{
172            blocking_call_is_ok_or_panic!();
173            read_only_blocking!($pool, $query)
174        }};
175    }
176
177    #[macro_export]
178    macro_rules! run_query_repeatable {
179        ($pool:expr, $query:expr) => {{
180            blocking_call_is_ok_or_panic!();
181            read_only_repeatable_blocking!($pool, $query)
182        }};
183    }
184
185    #[macro_export]
186    macro_rules! run_query_async {
187        ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, false) }};
188    }
189
190    #[macro_export]
191    macro_rules! run_query_repeatable_async {
192        ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, true) }};
193    }
194
195    /// Check that we are in a context conducive to making blocking calls.
196    /// This is done by either:
197    /// - Checking that we are not inside a tokio runtime context
198    ///
199    /// Or:
200    /// - If we are inside a tokio runtime context, ensure that the call went
201    ///   through `IndexerReader::spawn_blocking` which properly moves the
202    ///   blocking call to a blocking thread pool.
203    #[macro_export]
204    macro_rules! blocking_call_is_ok_or_panic {
205        () => {{
206            use $crate::store::diesel_macro::CALLED_FROM_BLOCKING_POOL;
207            if tokio::runtime::Handle::try_current().is_ok()
208                && !CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow())
209            {
210                panic!(
211                    "You are calling a blocking DB operation directly on an async thread. \
212                        Please use IndexerReader::spawn_blocking instead to move the \
213                        operation to a blocking thread"
214                );
215            }
216        }};
217    }
218
219    #[macro_export]
220    macro_rules! persist_chunk_into_table {
221        ($table:expr, $chunk:expr, $pool:expr) => {{
222            let now = std::time::Instant::now();
223            let chunk_len = $chunk.len();
224            transactional_blocking_with_retry!(
225                $pool,
226                |conn| {
227                    for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
228                        insert_or_ignore_into!($table, chunk, conn);
229                    }
230                    Ok::<(), IndexerError>(())
231                },
232                PG_DB_COMMIT_SLEEP_DURATION
233            )
234            .tap_ok(|_| {
235                let elapsed = now.elapsed().as_secs_f64();
236                info!(
237                    elapsed,
238                    "Persisted {} rows to {}",
239                    chunk_len,
240                    stringify!($table),
241                );
242            })
243            .tap_err(|e| {
244                tracing::error!("Failed to persist {} with error: {}", stringify!($table), e);
245            })
246        }};
247    }
248
249    pub use blocking_call_is_ok_or_panic;
250    pub use read_only_blocking;
251    pub use read_only_repeatable_blocking;
252    pub use run_query;
253    pub use run_query_async;
254    pub use run_query_repeatable;
255    pub use run_query_repeatable_async;
256    pub use spawn_read_only_blocking;
257    pub use transactional_blocking_with_retry;
258}