1pub(crate) use indexer_analytics_store::IndexerAnalyticalStore;
6pub(crate) use indexer_store::*;
7pub use pg_indexer_analytical_store::PgIndexerAnalyticalStore;
8pub use pg_indexer_store::{PgIndexerStore, TxGlobalOrderCursor};
9
10mod indexer_analytics_store;
11pub mod indexer_store;
12pub mod package_resolver;
13mod pg_indexer_analytical_store;
14mod pg_indexer_store;
15pub mod pg_partition_manager;
16
17pub mod diesel_macro {
18 thread_local! {
19 pub static CALLED_FROM_BLOCKING_POOL: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
20 }
21
22 pub fn mark_in_blocking_pool() {
27 CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
28 }
29
30 #[macro_export]
31 macro_rules! read_only_repeatable_blocking {
32 ($pool:expr, $query:expr) => {{
33 use downcast::Any;
34 use $crate::db::{PoolConnection, get_pool_connection};
35
36 let mut pool_conn = get_pool_connection($pool)?;
37 pool_conn
38 .as_any_mut()
39 .downcast_mut::<PoolConnection>()
40 .unwrap()
41 .build_transaction()
42 .read_only()
43 .repeatable_read()
44 .run($query)
45 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
46 }};
47 }
48
49 #[macro_export]
53 macro_rules! read_only_blocking {
54 ($pool:expr, $query:expr) => {{
55 use downcast::Any;
56 use $crate::db::{PoolConnection, get_pool_connection};
57
58 let mut pool_conn = get_pool_connection($pool)?;
59 pool_conn
60 .as_any_mut()
61 .downcast_mut::<PoolConnection>()
62 .unwrap()
63 .build_transaction()
64 .read_only()
65 .run($query)
66 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
67 }};
68 }
69
70 #[macro_export]
74 macro_rules! transactional_blocking_with_retry {
75 ($pool:expr, $query:expr, $max_elapsed:expr) => {{
76 use $crate::{
77 db::{PoolConnection, get_pool_connection},
78 errors::IndexerError,
79 };
80 let mut backoff = backoff::ExponentialBackoff::default();
81 backoff.max_elapsed_time = Some($max_elapsed);
82 let result = match backoff::retry(backoff, || {
83 let mut pool_conn =
84 get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
85 err: IndexerError::PostgresWrite(e.to_string()),
86 retry_after: None,
87 })?;
88 pool_conn
89 .as_any_mut()
90 .downcast_mut::<PoolConnection>()
91 .unwrap()
92 .build_transaction()
93 .read_write()
94 .run($query)
95 .map_err(|e| {
96 tracing::error!("error with persisting data into DB: {e:?}, retrying...");
97 backoff::Error::Transient {
98 err: IndexerError::PostgresWrite(e.to_string()),
99 retry_after: None,
100 }
101 })
102 }) {
103 Ok(v) => Ok(v),
104 Err(backoff::Error::Transient { err, .. }) => Err(err),
105 Err(backoff::Error::Permanent(err)) => Err(err),
106 };
107
108 result
109 }};
110 }
111
112 #[macro_export]
116 macro_rules! transactional_blocking_with_retry_with_conditional_abort {
117 ($pool:expr, $query:expr, $abort_condition:expr, $max_elapsed:expr) => {{
118 use $crate::{
119 db::{PoolConnection, get_pool_connection},
120 errors::IndexerError,
121 };
122 let mut backoff = backoff::ExponentialBackoff::default();
123 backoff.max_elapsed_time = Some($max_elapsed);
124 let result = match backoff::retry(backoff, || {
125 let mut pool_conn =
126 get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
127 err: IndexerError::PostgresWrite(e.to_string()),
128 retry_after: None,
129 })?;
130 pool_conn
131 .as_any_mut()
132 .downcast_mut::<PoolConnection>()
133 .unwrap()
134 .build_transaction()
135 .read_write()
136 .run($query)
137 .map_err(|e| {
138 tracing::error!("error with persisting data into DB: {e:?}, retrying...");
139 if $abort_condition(&e) {
140 backoff::Error::Permanent(e)
141 } else {
142 backoff::Error::Transient {
143 err: IndexerError::PostgresWrite(e.to_string()),
144 retry_after: None,
145 }
146 }
147 })
148 }) {
149 Ok(v) => Ok(v),
150 Err(backoff::Error::Transient { err, .. }) => Err(err),
151 Err(backoff::Error::Permanent(err)) => Err(err),
152 };
153
154 result
155 }};
156 }
157
158 #[macro_export]
160 macro_rules! spawn_read_only_blocking {
161 ($pool:expr, $query:expr, $repeatable_read:expr) => {{
162 use downcast::Any;
163 use $crate::{
164 db::{PoolConnection, get_pool_connection},
165 errors::IndexerError,
166 store::diesel_macro::mark_in_blocking_pool,
167 };
168 let current_span = tracing::Span::current();
169 tokio::task::spawn_blocking(move || {
170 mark_in_blocking_pool();
171 let _guard = current_span.enter();
172 let mut pool_conn = get_pool_connection($pool).unwrap();
173
174 if $repeatable_read {
175 pool_conn
176 .as_any_mut()
177 .downcast_mut::<PoolConnection>()
178 .unwrap()
179 .build_transaction()
180 .read_only()
181 .repeatable_read()
182 .run($query)
183 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
184 } else {
185 pool_conn
186 .as_any_mut()
187 .downcast_mut::<PoolConnection>()
188 .unwrap()
189 .build_transaction()
190 .read_only()
191 .run($query)
192 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
193 }
194 })
195 .await
196 .expect("blocking call failed")
197 }};
198 }
199
200 #[macro_export]
201 macro_rules! insert_or_ignore_into {
202 ($table:expr, $values:expr, $conn:expr) => {{
203 use diesel::RunQueryDsl;
204 let error_message = concat!("failed to write to ", stringify!($table), " DB");
205
206 diesel::insert_into($table)
207 .values($values)
208 .on_conflict_do_nothing()
209 .execute($conn)
210 .map_err(IndexerError::from)
211 .context(error_message)?;
212 }};
213 }
214
215 #[macro_export]
216 macro_rules! on_conflict_do_update {
217 ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $conn:expr) => {{
218 use diesel::{ExpressionMethods, RunQueryDsl};
219
220 diesel::insert_into($table)
221 .values($values)
222 .on_conflict($target)
223 .do_update()
224 .set($pg_columns)
225 .execute($conn)?;
226 }};
227 }
228
229 #[macro_export]
230 macro_rules! on_conflict_do_update_with_condition {
231 ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{
232 use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl};
233
234 diesel::insert_into($table)
235 .values($values)
236 .on_conflict($target)
237 .do_update()
238 .set($pg_columns)
239 .filter($condition)
240 .execute($conn)?;
241 }};
242 }
243
244 #[macro_export]
248 macro_rules! run_query {
249 ($pool:expr, $query:expr) => {{
250 blocking_call_is_ok_or_panic!();
251 read_only_blocking!($pool, $query)
252 }};
253 }
254
255 #[macro_export]
259 macro_rules! run_query_repeatable {
260 ($pool:expr, $query:expr) => {{
261 blocking_call_is_ok_or_panic!();
262 read_only_repeatable_blocking!($pool, $query)
263 }};
264 }
265
266 #[macro_export]
270 macro_rules! run_query_with_retry {
271 ($pool:expr, $query:expr, $max_elapsed:expr) => {{
272 blocking_call_is_ok_or_panic!();
273 let mut backoff = backoff::ExponentialBackoff::default();
274 backoff.max_elapsed_time = Some($max_elapsed);
275 let result = match backoff::retry(backoff, || {
276 read_only_blocking!($pool, $query).map_err(|e| {
277 tracing::error!("error with reading data from DB: {e:?}, retrying...");
278 backoff::Error::Transient {
279 err: e,
280 retry_after: None,
281 }
282 })
283 }) {
284 Ok(v) => Ok(v),
285 Err(backoff::Error::Transient { err, .. }) => Err(err),
286 Err(backoff::Error::Permanent(err)) => Err(err),
287 };
288
289 result
290 }};
291 }
292
293 #[macro_export]
295 macro_rules! run_query_async {
296 ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, false) }};
297 }
298
299 #[macro_export]
300 macro_rules! run_query_repeatable_async {
301 ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, true) }};
302 }
303
304 #[macro_export]
313 macro_rules! blocking_call_is_ok_or_panic {
314 () => {{
315 use $crate::store::diesel_macro::CALLED_FROM_BLOCKING_POOL;
316 if tokio::runtime::Handle::try_current().is_ok()
317 && !CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow())
318 {
319 panic!(
320 "you are calling a blocking DB operation directly on an async thread. \
321 Please use IndexerReader::spawn_blocking instead to move the \
322 operation to a blocking thread"
323 );
324 }
325 }};
326 }
327
328 #[macro_export]
376 macro_rules! persist_chunk_into_table {
377 ($table:expr, $chunk:expr, $pool:expr) => {{
378 let now = std::time::Instant::now();
379 let chunk_len = $chunk.len();
380 transactional_blocking_with_retry!(
381 $pool,
382 |conn| {
383 persist_chunk_into_table_in_existing_connection!($table, $chunk, conn);
384 Ok::<(), IndexerError>(())
385 },
386 PG_DB_COMMIT_SLEEP_DURATION
387 )
388 .tap_ok(|_| {
389 let elapsed = now.elapsed().as_secs_f64();
390 info!(
391 elapsed,
392 "Persisted {} rows to {}",
393 chunk_len,
394 stringify!($table),
395 );
396 })
397 .tap_err(|e| {
398 tracing::error!("failed to persist {} with error: {e}", stringify!($table));
399 })
400 }};
401 }
402
403 #[macro_export]
404 macro_rules! persist_chunk_into_table_in_existing_connection {
405 ($table:expr, $chunk:expr, $conn:expr) => {{
406 for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
407 insert_or_ignore_into!($table, chunk, $conn);
408 }
409 }};
410 }
411
412 pub use blocking_call_is_ok_or_panic;
413 pub use read_only_blocking;
414 pub use read_only_repeatable_blocking;
415 pub use run_query;
416 pub use run_query_async;
417 pub use run_query_repeatable;
418 pub use run_query_repeatable_async;
419 pub use spawn_read_only_blocking;
420 pub use transactional_blocking_with_retry;
421}