1pub(crate) use indexer_analytics_store::IndexerAnalyticalStore;
6pub(crate) use indexer_store::*;
7pub use pg_indexer_analytical_store::PgIndexerAnalyticalStore;
8pub use pg_indexer_store::{PgIndexerStore, TxGlobalOrderCursor};
9
10mod indexer_analytics_store;
11pub mod indexer_store;
12pub mod package_resolver;
13mod pg_indexer_analytical_store;
14mod pg_indexer_store;
15pub mod pg_partition_manager;
16
17pub mod diesel_macro {
18 thread_local! {
19 pub static CALLED_FROM_BLOCKING_POOL: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
20 }
21
22 pub fn mark_in_blocking_pool() {
27 CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
28 }
29
30 #[macro_export]
31 macro_rules! read_only_repeatable_blocking {
32 ($pool:expr, $query:expr) => {{
33 use downcast::Any;
34 use $crate::db::{PoolConnection, get_pool_connection};
35
36 let mut pool_conn = get_pool_connection($pool)?;
37 pool_conn
38 .as_any_mut()
39 .downcast_mut::<PoolConnection>()
40 .unwrap()
41 .build_transaction()
42 .read_only()
43 .repeatable_read()
44 .run($query)
45 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
46 }};
47 }
48
49 #[macro_export]
50 macro_rules! read_only_blocking {
51 ($pool:expr, $query:expr) => {{
52 use downcast::Any;
53 use $crate::db::{PoolConnection, get_pool_connection};
54
55 let mut pool_conn = get_pool_connection($pool)?;
56 pool_conn
57 .as_any_mut()
58 .downcast_mut::<PoolConnection>()
59 .unwrap()
60 .build_transaction()
61 .read_only()
62 .run($query)
63 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
64 }};
65 }
66
67 #[macro_export]
68 macro_rules! transactional_blocking_with_retry {
69 ($pool:expr, $query:expr, $max_elapsed:expr) => {{
70 use $crate::{
71 db::{PoolConnection, get_pool_connection},
72 errors::IndexerError,
73 };
74 let mut backoff = backoff::ExponentialBackoff::default();
75 backoff.max_elapsed_time = Some($max_elapsed);
76 let result = match backoff::retry(backoff, || {
77 let mut pool_conn =
78 get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
79 err: IndexerError::PostgresWrite(e.to_string()),
80 retry_after: None,
81 })?;
82 pool_conn
83 .as_any_mut()
84 .downcast_mut::<PoolConnection>()
85 .unwrap()
86 .build_transaction()
87 .read_write()
88 .run($query)
89 .map_err(|e| {
90 tracing::error!("error with persisting data into DB: {e:?}, retrying...");
91 backoff::Error::Transient {
92 err: IndexerError::PostgresWrite(e.to_string()),
93 retry_after: None,
94 }
95 })
96 }) {
97 Ok(v) => Ok(v),
98 Err(backoff::Error::Transient { err, .. }) => Err(err),
99 Err(backoff::Error::Permanent(err)) => Err(err),
100 };
101
102 result
103 }};
104 }
105
106 #[macro_export]
107 macro_rules! transactional_blocking_with_retry_with_conditional_abort {
108 ($pool:expr, $query:expr, $abort_condition:expr, $max_elapsed:expr) => {{
109 use $crate::{
110 db::{PoolConnection, get_pool_connection},
111 errors::IndexerError,
112 };
113 let mut backoff = backoff::ExponentialBackoff::default();
114 backoff.max_elapsed_time = Some($max_elapsed);
115 let result = match backoff::retry(backoff, || {
116 let mut pool_conn =
117 get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
118 err: IndexerError::PostgresWrite(e.to_string()),
119 retry_after: None,
120 })?;
121 pool_conn
122 .as_any_mut()
123 .downcast_mut::<PoolConnection>()
124 .unwrap()
125 .build_transaction()
126 .read_write()
127 .run($query)
128 .map_err(|e| {
129 tracing::error!("error with persisting data into DB: {e:?}, retrying...");
130 if $abort_condition(&e) {
131 backoff::Error::Permanent(e)
132 } else {
133 backoff::Error::Transient {
134 err: IndexerError::PostgresWrite(e.to_string()),
135 retry_after: None,
136 }
137 }
138 })
139 }) {
140 Ok(v) => Ok(v),
141 Err(backoff::Error::Transient { err, .. }) => Err(err),
142 Err(backoff::Error::Permanent(err)) => Err(err),
143 };
144
145 result
146 }};
147 }
148
149 #[macro_export]
150 macro_rules! spawn_read_only_blocking {
151 ($pool:expr, $query:expr, $repeatable_read:expr) => {{
152 use downcast::Any;
153 use $crate::{
154 db::{PoolConnection, get_pool_connection},
155 errors::IndexerError,
156 store::diesel_macro::mark_in_blocking_pool,
157 };
158 let current_span = tracing::Span::current();
159 tokio::task::spawn_blocking(move || {
160 mark_in_blocking_pool();
161 let _guard = current_span.enter();
162 let mut pool_conn = get_pool_connection($pool).unwrap();
163
164 if $repeatable_read {
165 pool_conn
166 .as_any_mut()
167 .downcast_mut::<PoolConnection>()
168 .unwrap()
169 .build_transaction()
170 .read_only()
171 .repeatable_read()
172 .run($query)
173 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
174 } else {
175 pool_conn
176 .as_any_mut()
177 .downcast_mut::<PoolConnection>()
178 .unwrap()
179 .build_transaction()
180 .read_only()
181 .run($query)
182 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
183 }
184 })
185 .await
186 .expect("blocking call failed")
187 }};
188 }
189
190 #[macro_export]
191 macro_rules! insert_or_ignore_into {
192 ($table:expr, $values:expr, $conn:expr) => {{
193 use diesel::RunQueryDsl;
194 let error_message = concat!("failed to write to ", stringify!($table), " DB");
195
196 diesel::insert_into($table)
197 .values($values)
198 .on_conflict_do_nothing()
199 .execute($conn)
200 .map_err(IndexerError::from)
201 .context(error_message)?;
202 }};
203 }
204
205 #[macro_export]
206 macro_rules! on_conflict_do_update {
207 ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $conn:expr) => {{
208 use diesel::{ExpressionMethods, RunQueryDsl};
209
210 diesel::insert_into($table)
211 .values($values)
212 .on_conflict($target)
213 .do_update()
214 .set($pg_columns)
215 .execute($conn)?;
216 }};
217 }
218
219 #[macro_export]
220 macro_rules! on_conflict_do_update_with_condition {
221 ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{
222 use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl};
223
224 diesel::insert_into($table)
225 .values($values)
226 .on_conflict($target)
227 .do_update()
228 .set($pg_columns)
229 .filter($condition)
230 .execute($conn)?;
231 }};
232 }
233
234 #[macro_export]
235 macro_rules! run_query {
236 ($pool:expr, $query:expr) => {{
237 blocking_call_is_ok_or_panic!();
238 read_only_blocking!($pool, $query)
239 }};
240 }
241
242 #[macro_export]
243 macro_rules! run_query_repeatable {
244 ($pool:expr, $query:expr) => {{
245 blocking_call_is_ok_or_panic!();
246 read_only_repeatable_blocking!($pool, $query)
247 }};
248 }
249
250 #[macro_export]
251 macro_rules! run_query_with_retry {
252 ($pool:expr, $query:expr, $max_elapsed:expr) => {{
253 blocking_call_is_ok_or_panic!();
254 let mut backoff = backoff::ExponentialBackoff::default();
255 backoff.max_elapsed_time = Some($max_elapsed);
256 let result = match backoff::retry(backoff, || {
257 read_only_blocking!($pool, $query).map_err(|e| {
258 tracing::error!("error with reading data from DB: {e:?}, retrying...");
259 backoff::Error::Transient {
260 err: e,
261 retry_after: None,
262 }
263 })
264 }) {
265 Ok(v) => Ok(v),
266 Err(backoff::Error::Transient { err, .. }) => Err(err),
267 Err(backoff::Error::Permanent(err)) => Err(err),
268 };
269
270 result
271 }};
272 }
273
274 #[macro_export]
275 macro_rules! run_query_async {
276 ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, false) }};
277 }
278
279 #[macro_export]
280 macro_rules! run_query_repeatable_async {
281 ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, true) }};
282 }
283
284 #[macro_export]
293 macro_rules! blocking_call_is_ok_or_panic {
294 () => {{
295 use $crate::store::diesel_macro::CALLED_FROM_BLOCKING_POOL;
296 if tokio::runtime::Handle::try_current().is_ok()
297 && !CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow())
298 {
299 panic!(
300 "you are calling a blocking DB operation directly on an async thread. \
301 Please use IndexerReader::spawn_blocking instead to move the \
302 operation to a blocking thread"
303 );
304 }
305 }};
306 }
307
308 #[macro_export]
356 macro_rules! persist_chunk_into_table {
357 ($table:expr, $chunk:expr, $pool:expr) => {{
358 let now = std::time::Instant::now();
359 let chunk_len = $chunk.len();
360 transactional_blocking_with_retry!(
361 $pool,
362 |conn| {
363 persist_chunk_into_table_in_existing_connection!($table, $chunk, conn);
364 Ok::<(), IndexerError>(())
365 },
366 PG_DB_COMMIT_SLEEP_DURATION
367 )
368 .tap_ok(|_| {
369 let elapsed = now.elapsed().as_secs_f64();
370 info!(
371 elapsed,
372 "Persisted {} rows to {}",
373 chunk_len,
374 stringify!($table),
375 );
376 })
377 .tap_err(|e| {
378 tracing::error!("failed to persist {} with error: {e}", stringify!($table));
379 })
380 }};
381 }
382
383 #[macro_export]
384 macro_rules! persist_chunk_into_table_in_existing_connection {
385 ($table:expr, $chunk:expr, $conn:expr) => {{
386 for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
387 insert_or_ignore_into!($table, chunk, $conn);
388 }
389 }};
390 }
391
392 pub use blocking_call_is_ok_or_panic;
393 pub use read_only_blocking;
394 pub use read_only_repeatable_blocking;
395 pub use run_query;
396 pub use run_query_async;
397 pub use run_query_repeatable;
398 pub use run_query_repeatable_async;
399 pub use spawn_read_only_blocking;
400 pub use transactional_blocking_with_retry;
401}