1pub(crate) use indexer_analytics_store::IndexerAnalyticalStore;
6pub(crate) use indexer_store::*;
7pub use pg_indexer_analytical_store::PgIndexerAnalyticalStore;
8pub use pg_indexer_store::PgIndexerStore;
9
10mod indexer_analytics_store;
11pub mod indexer_store;
12pub mod package_resolver;
13mod pg_indexer_analytical_store;
14mod pg_indexer_store;
15pub mod pg_partition_manager;
16
17pub mod diesel_macro {
18 thread_local! {
19 pub static CALLED_FROM_BLOCKING_POOL: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
20 }
21
22 #[macro_export]
23 macro_rules! read_only_repeatable_blocking {
24 ($pool:expr, $query:expr) => {{
25 use downcast::Any;
26 use $crate::db::{PoolConnection, get_pool_connection};
27
28 let mut pool_conn = get_pool_connection($pool)?;
29 pool_conn
30 .as_any_mut()
31 .downcast_mut::<PoolConnection>()
32 .unwrap()
33 .build_transaction()
34 .read_only()
35 .repeatable_read()
36 .run($query)
37 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
38 }};
39 }
40
41 #[macro_export]
42 macro_rules! read_only_blocking {
43 ($pool:expr, $query:expr) => {{
44 use downcast::Any;
45 use $crate::db::{PoolConnection, get_pool_connection};
46
47 let mut pool_conn = get_pool_connection($pool)?;
48 pool_conn
49 .as_any_mut()
50 .downcast_mut::<PoolConnection>()
51 .unwrap()
52 .build_transaction()
53 .read_only()
54 .run($query)
55 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
56 }};
57 }
58
59 #[macro_export]
60 macro_rules! transactional_blocking_with_retry {
61 ($pool:expr, $query:expr, $max_elapsed:expr) => {{
62 use $crate::{
63 db::{PoolConnection, get_pool_connection},
64 errors::IndexerError,
65 };
66 let mut backoff = backoff::ExponentialBackoff::default();
67 backoff.max_elapsed_time = Some($max_elapsed);
68 let result = match backoff::retry(backoff, || {
69 let mut pool_conn =
70 get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
71 err: IndexerError::PostgresWrite(e.to_string()),
72 retry_after: None,
73 })?;
74 pool_conn
75 .as_any_mut()
76 .downcast_mut::<PoolConnection>()
77 .unwrap()
78 .build_transaction()
79 .read_write()
80 .run($query)
81 .map_err(|e| {
82 tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
83 backoff::Error::Transient {
84 err: IndexerError::PostgresWrite(e.to_string()),
85 retry_after: None,
86 }
87 })
88 }) {
89 Ok(v) => Ok(v),
90 Err(backoff::Error::Transient { err, .. }) => Err(err),
91 Err(backoff::Error::Permanent(err)) => Err(err),
92 };
93
94 result
95 }};
96 }
97
98 #[macro_export]
99 macro_rules! transactional_blocking_with_retry_with_conditional_abort {
100 ($pool:expr, $query:expr, $abort_condition:expr, $max_elapsed:expr) => {{
101 use $crate::{
102 db::{PoolConnection, get_pool_connection},
103 errors::IndexerError,
104 };
105 let mut backoff = backoff::ExponentialBackoff::default();
106 backoff.max_elapsed_time = Some($max_elapsed);
107 let result = match backoff::retry(backoff, || {
108 let mut pool_conn =
109 get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
110 err: IndexerError::PostgresWrite(e.to_string()),
111 retry_after: None,
112 })?;
113 pool_conn
114 .as_any_mut()
115 .downcast_mut::<PoolConnection>()
116 .unwrap()
117 .build_transaction()
118 .read_write()
119 .run($query)
120 .map_err(|e| {
121 tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
122 if $abort_condition(&e) {
123 backoff::Error::Permanent(e)
124 } else {
125 backoff::Error::Transient {
126 err: IndexerError::PostgresWrite(e.to_string()),
127 retry_after: None,
128 }
129 }
130 })
131 }) {
132 Ok(v) => Ok(v),
133 Err(backoff::Error::Transient { err, .. }) => Err(err),
134 Err(backoff::Error::Permanent(err)) => Err(err),
135 };
136
137 result
138 }};
139 }
140
141 #[macro_export]
142 macro_rules! spawn_read_only_blocking {
143 ($pool:expr, $query:expr, $repeatable_read:expr) => {{
144 use downcast::Any;
145 use $crate::{
146 db::{PoolConnection, get_pool_connection},
147 errors::IndexerError,
148 store::diesel_macro::CALLED_FROM_BLOCKING_POOL,
149 };
150 let current_span = tracing::Span::current();
151 tokio::task::spawn_blocking(move || {
152 CALLED_FROM_BLOCKING_POOL
153 .with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
154 let _guard = current_span.enter();
155 let mut pool_conn = get_pool_connection($pool).unwrap();
156
157 if $repeatable_read {
158 pool_conn
159 .as_any_mut()
160 .downcast_mut::<PoolConnection>()
161 .unwrap()
162 .build_transaction()
163 .read_only()
164 .repeatable_read()
165 .run($query)
166 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
167 } else {
168 pool_conn
169 .as_any_mut()
170 .downcast_mut::<PoolConnection>()
171 .unwrap()
172 .build_transaction()
173 .read_only()
174 .run($query)
175 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
176 }
177 })
178 .await
179 .expect("Blocking call failed")
180 }};
181 }
182
183 #[macro_export]
184 macro_rules! insert_or_ignore_into {
185 ($table:expr, $values:expr, $conn:expr) => {{
186 use diesel::RunQueryDsl;
187 let error_message = concat!("Failed to write to ", stringify!($table), " DB");
188
189 diesel::insert_into($table)
190 .values($values)
191 .on_conflict_do_nothing()
192 .execute($conn)
193 .map_err(IndexerError::from)
194 .context(error_message)?;
195 }};
196 }
197
198 #[macro_export]
199 macro_rules! on_conflict_do_update {
200 ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $conn:expr) => {{
201 use diesel::{ExpressionMethods, RunQueryDsl};
202
203 diesel::insert_into($table)
204 .values($values)
205 .on_conflict($target)
206 .do_update()
207 .set($pg_columns)
208 .execute($conn)?;
209 }};
210 }
211
212 #[macro_export]
213 macro_rules! on_conflict_do_update_with_condition {
214 ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{
215 use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl};
216
217 diesel::insert_into($table)
218 .values($values)
219 .on_conflict($target)
220 .do_update()
221 .set($pg_columns)
222 .filter($condition)
223 .execute($conn)?;
224 }};
225 }
226
227 #[macro_export]
228 macro_rules! run_query {
229 ($pool:expr, $query:expr) => {{
230 blocking_call_is_ok_or_panic!();
231 read_only_blocking!($pool, $query)
232 }};
233 }
234
235 #[macro_export]
236 macro_rules! run_query_repeatable {
237 ($pool:expr, $query:expr) => {{
238 blocking_call_is_ok_or_panic!();
239 read_only_repeatable_blocking!($pool, $query)
240 }};
241 }
242
243 #[macro_export]
244 macro_rules! run_query_async {
245 ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, false) }};
246 }
247
248 #[macro_export]
249 macro_rules! run_query_repeatable_async {
250 ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, true) }};
251 }
252
253 #[macro_export]
262 macro_rules! blocking_call_is_ok_or_panic {
263 () => {{
264 use $crate::store::diesel_macro::CALLED_FROM_BLOCKING_POOL;
265 if tokio::runtime::Handle::try_current().is_ok()
266 && !CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow())
267 {
268 panic!(
269 "You are calling a blocking DB operation directly on an async thread. \
270 Please use IndexerReader::spawn_blocking instead to move the \
271 operation to a blocking thread"
272 );
273 }
274 }};
275 }
276
277 #[macro_export]
325 macro_rules! persist_chunk_into_table {
326 ($table:expr, $chunk:expr, $pool:expr) => {{
327 let now = std::time::Instant::now();
328 let chunk_len = $chunk.len();
329 transactional_blocking_with_retry!(
330 $pool,
331 |conn| {
332 persist_chunk_into_table_in_existing_connection!($table, $chunk, conn);
333 Ok::<(), IndexerError>(())
334 },
335 PG_DB_COMMIT_SLEEP_DURATION
336 )
337 .tap_ok(|_| {
338 let elapsed = now.elapsed().as_secs_f64();
339 info!(
340 elapsed,
341 "Persisted {} rows to {}",
342 chunk_len,
343 stringify!($table),
344 );
345 })
346 .tap_err(|e| {
347 tracing::error!("Failed to persist {} with error: {}", stringify!($table), e);
348 })
349 }};
350 }
351
352 #[macro_export]
353 macro_rules! persist_chunk_into_table_in_existing_connection {
354 ($table:expr, $chunk:expr, $conn:expr) => {{
355 for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
356 insert_or_ignore_into!($table, chunk, $conn);
357 }
358 }};
359 }
360
361 pub use blocking_call_is_ok_or_panic;
362 pub use read_only_blocking;
363 pub use read_only_repeatable_blocking;
364 pub use run_query;
365 pub use run_query_async;
366 pub use run_query_repeatable;
367 pub use run_query_repeatable_async;
368 pub use spawn_read_only_blocking;
369 pub use transactional_blocking_with_retry;
370}