iota_indexer/store/
mod.rs1pub(crate) use indexer_analytics_store::IndexerAnalyticalStore;
6pub(crate) use indexer_store::*;
7pub use pg_indexer_analytical_store::PgIndexerAnalyticalStore;
8pub use pg_indexer_store::PgIndexerStore;
9
10mod indexer_analytics_store;
11pub mod indexer_store;
12pub mod package_resolver;
13mod pg_indexer_analytical_store;
14mod pg_indexer_store;
15pub mod pg_partition_manager;
16
17pub mod diesel_macro {
18 thread_local! {
19 pub static CALLED_FROM_BLOCKING_POOL: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
20 }
21
22 #[macro_export]
23 macro_rules! read_only_repeatable_blocking {
24 ($pool:expr, $query:expr) => {{
25 use downcast::Any;
26 use $crate::db::{PoolConnection, get_pool_connection};
27
28 let mut pool_conn = get_pool_connection($pool)?;
29 pool_conn
30 .as_any_mut()
31 .downcast_mut::<PoolConnection>()
32 .unwrap()
33 .build_transaction()
34 .read_only()
35 .repeatable_read()
36 .run($query)
37 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
38 }};
39 }
40
41 #[macro_export]
42 macro_rules! read_only_blocking {
43 ($pool:expr, $query:expr) => {{
44 use downcast::Any;
45 use $crate::db::{PoolConnection, get_pool_connection};
46
47 let mut pool_conn = get_pool_connection($pool)?;
48 pool_conn
49 .as_any_mut()
50 .downcast_mut::<PoolConnection>()
51 .unwrap()
52 .build_transaction()
53 .read_only()
54 .run($query)
55 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
56 }};
57 }
58
59 #[macro_export]
60 macro_rules! transactional_blocking_with_retry {
61 ($pool:expr, $query:expr, $max_elapsed:expr) => {{
62 use $crate::{
63 db::{PoolConnection, get_pool_connection},
64 errors::IndexerError,
65 };
66 let mut backoff = backoff::ExponentialBackoff::default();
67 backoff.max_elapsed_time = Some($max_elapsed);
68 let result = match backoff::retry(backoff, || {
69 let mut pool_conn =
70 get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
71 err: IndexerError::PostgresWrite(e.to_string()),
72 retry_after: None,
73 })?;
74 pool_conn
75 .as_any_mut()
76 .downcast_mut::<PoolConnection>()
77 .unwrap()
78 .build_transaction()
79 .read_write()
80 .run($query)
81 .map_err(|e| {
82 tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
83 backoff::Error::Transient {
84 err: IndexerError::PostgresWrite(e.to_string()),
85 retry_after: None,
86 }
87 })
88 }) {
89 Ok(v) => Ok(v),
90 Err(backoff::Error::Transient { err, .. }) => Err(err),
91 Err(backoff::Error::Permanent(err)) => Err(err),
92 };
93
94 result
95 }};
96 }
97
98 #[macro_export]
99 macro_rules! spawn_read_only_blocking {
100 ($pool:expr, $query:expr, $repeatable_read:expr) => {{
101 use downcast::Any;
102 use $crate::{
103 db::{PoolConnection, get_pool_connection},
104 errors::IndexerError,
105 store::diesel_macro::CALLED_FROM_BLOCKING_POOL,
106 };
107 let current_span = tracing::Span::current();
108 tokio::task::spawn_blocking(move || {
109 CALLED_FROM_BLOCKING_POOL
110 .with(|in_blocking_pool| *in_blocking_pool.borrow_mut() = true);
111 let _guard = current_span.enter();
112 let mut pool_conn = get_pool_connection($pool).unwrap();
113
114 if $repeatable_read {
115 pool_conn
116 .as_any_mut()
117 .downcast_mut::<PoolConnection>()
118 .unwrap()
119 .build_transaction()
120 .read_only()
121 .repeatable_read()
122 .run($query)
123 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
124 } else {
125 pool_conn
126 .as_any_mut()
127 .downcast_mut::<PoolConnection>()
128 .unwrap()
129 .build_transaction()
130 .read_only()
131 .run($query)
132 .map_err(|e| IndexerError::PostgresRead(e.to_string()))
133 }
134 })
135 .await
136 .expect("Blocking call failed")
137 }};
138 }
139
140 #[macro_export]
141 macro_rules! insert_or_ignore_into {
142 ($table:expr, $values:expr, $conn:expr) => {{
143 use diesel::RunQueryDsl;
144 let error_message = concat!("Failed to write to ", stringify!($table), " DB");
145
146 diesel::insert_into($table)
147 .values($values)
148 .on_conflict_do_nothing()
149 .execute($conn)
150 .map_err(IndexerError::from)
151 .context(error_message)?;
152 }};
153 }
154
155 #[macro_export]
156 macro_rules! on_conflict_do_update {
157 ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $conn:expr) => {{
158 use diesel::{ExpressionMethods, RunQueryDsl};
159
160 diesel::insert_into($table)
161 .values($values)
162 .on_conflict($target)
163 .do_update()
164 .set($pg_columns)
165 .execute($conn)?;
166 }};
167 }
168
169 #[macro_export]
170 macro_rules! run_query {
171 ($pool:expr, $query:expr) => {{
172 blocking_call_is_ok_or_panic!();
173 read_only_blocking!($pool, $query)
174 }};
175 }
176
177 #[macro_export]
178 macro_rules! run_query_repeatable {
179 ($pool:expr, $query:expr) => {{
180 blocking_call_is_ok_or_panic!();
181 read_only_repeatable_blocking!($pool, $query)
182 }};
183 }
184
185 #[macro_export]
186 macro_rules! run_query_async {
187 ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, false) }};
188 }
189
190 #[macro_export]
191 macro_rules! run_query_repeatable_async {
192 ($pool:expr, $query:expr) => {{ spawn_read_only_blocking!($pool, $query, true) }};
193 }
194
195 #[macro_export]
204 macro_rules! blocking_call_is_ok_or_panic {
205 () => {{
206 use $crate::store::diesel_macro::CALLED_FROM_BLOCKING_POOL;
207 if tokio::runtime::Handle::try_current().is_ok()
208 && !CALLED_FROM_BLOCKING_POOL.with(|in_blocking_pool| *in_blocking_pool.borrow())
209 {
210 panic!(
211 "You are calling a blocking DB operation directly on an async thread. \
212 Please use IndexerReader::spawn_blocking instead to move the \
213 operation to a blocking thread"
214 );
215 }
216 }};
217 }
218
219 #[macro_export]
220 macro_rules! persist_chunk_into_table {
221 ($table:expr, $chunk:expr, $pool:expr) => {{
222 let now = std::time::Instant::now();
223 let chunk_len = $chunk.len();
224 transactional_blocking_with_retry!(
225 $pool,
226 |conn| {
227 for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
228 insert_or_ignore_into!($table, chunk, conn);
229 }
230 Ok::<(), IndexerError>(())
231 },
232 PG_DB_COMMIT_SLEEP_DURATION
233 )
234 .tap_ok(|_| {
235 let elapsed = now.elapsed().as_secs_f64();
236 info!(
237 elapsed,
238 "Persisted {} rows to {}",
239 chunk_len,
240 stringify!($table),
241 );
242 })
243 .tap_err(|e| {
244 tracing::error!("Failed to persist {} with error: {}", stringify!($table), e);
245 })
246 }};
247 }
248
249 pub use blocking_call_is_ok_or_panic;
250 pub use read_only_blocking;
251 pub use read_only_repeatable_blocking;
252 pub use run_query;
253 pub use run_query_async;
254 pub use run_query_repeatable;
255 pub use run_query_repeatable_async;
256 pub use spawn_read_only_blocking;
257 pub use transactional_blocking_with_retry;
258}