1use core::result::Result::Ok;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, dsl::count};
10use downcast::Any;
11use iota_types::base_types::ObjectID;
12use tap::tap::TapFallible;
13use tracing::{error, info};
14
15use super::IndexerAnalyticalStore;
16use crate::{
17 db::ConnectionPool,
18 errors::{Context, IndexerError},
19 models::{
20 address_metrics::StoredAddressMetrics,
21 checkpoints::StoredCheckpoint,
22 move_call_metrics::{
23 QueriedMoveCallMetrics, QueriedMoveMetrics, StoredMoveCallMetrics,
24 build_move_call_metric_query,
25 },
26 network_metrics::{StoredEpochPeakTps, Tps},
27 transactions::{
28 StoredTransaction, StoredTransactionCheckpoint, StoredTransactionSuccessCommandCount,
29 StoredTransactionTimestamp, TxSeq,
30 },
31 tx_count_metrics::StoredTxCountMetrics,
32 },
33 schema::{
34 active_addresses, address_metrics, addresses, checkpoints, epoch_peak_tps,
35 move_call_metrics, move_calls, transactions, tx_count_metrics,
36 },
37 store::diesel_macro::{read_only_blocking, transactional_blocking_with_retry},
38 types::IndexerResult,
39};
40
41#[derive(Clone)]
44pub struct PgIndexerAnalyticalStore {
45 blocking_cp: ConnectionPool,
46}
47
48impl PgIndexerAnalyticalStore {
49 pub fn new(blocking_cp: ConnectionPool) -> Self {
50 Self { blocking_cp }
51 }
52}
53
54#[async_trait]
55impl IndexerAnalyticalStore for PgIndexerAnalyticalStore {
56 async fn get_latest_stored_checkpoint(&self) -> IndexerResult<Option<StoredCheckpoint>> {
57 let latest_cp = read_only_blocking!(&self.blocking_cp, |conn| {
58 checkpoints::dsl::checkpoints
59 .order(checkpoints::sequence_number.desc())
60 .first::<StoredCheckpoint>(conn)
61 .optional()
62 })
63 .context("Failed reading latest checkpoint from PostgresDB")?;
64 Ok(latest_cp)
65 }
66
67 async fn get_latest_stored_transaction(&self) -> IndexerResult<Option<StoredTransaction>> {
68 let latest_tx = read_only_blocking!(&self.blocking_cp, |conn| {
69 transactions::dsl::transactions
70 .order(transactions::tx_sequence_number.desc())
71 .first::<StoredTransaction>(conn)
72 .optional()
73 })
74 .context("Failed reading latest transaction from PostgresDB")?;
75 Ok(latest_tx)
76 }
77
78 async fn get_checkpoints_in_range(
79 &self,
80 start_checkpoint: i64,
81 end_checkpoint: i64,
82 ) -> IndexerResult<Vec<StoredCheckpoint>> {
83 let cps = read_only_blocking!(&self.blocking_cp, |conn| {
84 checkpoints::dsl::checkpoints
85 .filter(checkpoints::sequence_number.ge(start_checkpoint))
86 .filter(checkpoints::sequence_number.lt(end_checkpoint))
87 .order(checkpoints::sequence_number.asc())
88 .load::<StoredCheckpoint>(conn)
89 })
90 .context("Failed reading checkpoints from PostgresDB")?;
91 Ok(cps)
92 }
93
94 async fn get_tx_timestamps_in_checkpoint_range(
95 &self,
96 start_checkpoint: i64,
97 end_checkpoint: i64,
98 ) -> IndexerResult<Vec<StoredTransactionTimestamp>> {
99 let tx_timestamps = read_only_blocking!(&self.blocking_cp, |conn| {
100 transactions::dsl::transactions
101 .filter(transactions::dsl::checkpoint_sequence_number.ge(start_checkpoint))
102 .filter(transactions::dsl::checkpoint_sequence_number.lt(end_checkpoint))
103 .order(transactions::dsl::tx_sequence_number.asc())
104 .select((
105 transactions::dsl::tx_sequence_number,
106 transactions::dsl::timestamp_ms,
107 ))
108 .load::<StoredTransactionTimestamp>(conn)
109 })
110 .context("Failed reading transaction timestamps from PostgresDB")?;
111 Ok(tx_timestamps)
112 }
113
114 async fn get_tx_checkpoints_in_checkpoint_range(
115 &self,
116 start_checkpoint: i64,
117 end_checkpoint: i64,
118 ) -> IndexerResult<Vec<StoredTransactionCheckpoint>> {
119 let tx_checkpoints = read_only_blocking!(&self.blocking_cp, |conn| {
120 transactions::dsl::transactions
121 .filter(transactions::dsl::checkpoint_sequence_number.ge(start_checkpoint))
122 .filter(transactions::dsl::checkpoint_sequence_number.lt(end_checkpoint))
123 .order(transactions::dsl::tx_sequence_number.asc())
124 .select((
125 transactions::dsl::tx_sequence_number,
126 transactions::dsl::checkpoint_sequence_number,
127 ))
128 .load::<StoredTransactionCheckpoint>(conn)
129 })
130 .context("Failed reading transaction checkpoints from PostgresDB")?;
131 Ok(tx_checkpoints)
132 }
133
134 async fn get_tx_success_cmd_counts_in_checkpoint_range(
135 &self,
136 start_checkpoint: i64,
137 end_checkpoint: i64,
138 ) -> IndexerResult<Vec<StoredTransactionSuccessCommandCount>> {
139 let tx_success_cmd_counts = read_only_blocking!(&self.blocking_cp, |conn| {
140 transactions::dsl::transactions
141 .filter(transactions::dsl::checkpoint_sequence_number.ge(start_checkpoint))
142 .filter(transactions::dsl::checkpoint_sequence_number.lt(end_checkpoint))
143 .order(transactions::dsl::tx_sequence_number.asc())
144 .select((
145 transactions::dsl::tx_sequence_number,
146 transactions::dsl::checkpoint_sequence_number,
147 transactions::dsl::success_command_count,
148 transactions::dsl::timestamp_ms,
149 ))
150 .load::<StoredTransactionSuccessCommandCount>(conn)
151 })
152 .context("Failed reading transaction success command counts from PostgresDB")?;
153 Ok(tx_success_cmd_counts)
154 }
155 async fn get_tx(&self, tx_sequence_number: i64) -> IndexerResult<Option<StoredTransaction>> {
156 let tx = read_only_blocking!(&self.blocking_cp, |conn| {
157 transactions::dsl::transactions
158 .filter(transactions::dsl::tx_sequence_number.eq(tx_sequence_number))
159 .first::<StoredTransaction>(conn)
160 .optional()
161 })
162 .context("Failed reading transaction from PostgresDB")?;
163 Ok(tx)
164 }
165
166 async fn get_cp(&self, sequence_number: i64) -> IndexerResult<Option<StoredCheckpoint>> {
167 let cp = read_only_blocking!(&self.blocking_cp, |conn| {
168 checkpoints::dsl::checkpoints
169 .filter(checkpoints::dsl::sequence_number.eq(sequence_number))
170 .first::<StoredCheckpoint>(conn)
171 .optional()
172 })
173 .context("Failed reading checkpoint from PostgresDB")?;
174 Ok(cp)
175 }
176
177 async fn get_latest_tx_count_metrics(&self) -> IndexerResult<Option<StoredTxCountMetrics>> {
178 let latest_tx_count = read_only_blocking!(&self.blocking_cp, |conn| {
179 tx_count_metrics::dsl::tx_count_metrics
180 .order(tx_count_metrics::dsl::checkpoint_sequence_number.desc())
181 .first::<StoredTxCountMetrics>(conn)
182 .optional()
183 })
184 .context("Failed reading latest tx count metrics from PostgresDB")?;
185 Ok(latest_tx_count)
186 }
187
188 async fn get_latest_epoch_peak_tps(&self) -> IndexerResult<Option<StoredEpochPeakTps>> {
189 let latest_network_metrics = read_only_blocking!(&self.blocking_cp, |conn| {
190 epoch_peak_tps::dsl::epoch_peak_tps
191 .order(epoch_peak_tps::dsl::epoch.desc())
192 .first::<StoredEpochPeakTps>(conn)
193 .optional()
194 })
195 .context("Failed reading latest epoch peak TPS from PostgresDB")?;
196 Ok(latest_network_metrics)
197 }
198
199 fn persist_tx_count_metrics(
202 &self,
203 start_checkpoint: i64,
204 end_checkpoint: i64,
205 ) -> IndexerResult<()> {
206 let tx_count_query = construct_checkpoint_tx_count_query(start_checkpoint, end_checkpoint);
207 info!(
208 "Persisting tx count metrics for checkpoints [{}-{}]",
209 start_checkpoint,
210 end_checkpoint - 1
211 );
212 transactional_blocking_with_retry!(
213 &self.blocking_cp,
214 |conn| {
215 diesel::sql_query(tx_count_query.clone()).execute(conn)?;
216 Ok::<(), IndexerError>(())
217 },
218 Duration::from_secs(10)
219 )
220 .context("Failed persisting tx count metrics to PostgresDB")?;
221 info!(
222 "Persisted tx count metrics for checkpoints [{}-{}]",
223 start_checkpoint,
224 end_checkpoint - 1
225 );
226 Ok(())
227 }
228
229 async fn persist_epoch_peak_tps(&self, epoch: i64) -> IndexerResult<()> {
230 let epoch_peak_tps_query = construct_peak_tps_query(epoch, 1);
231 let peak_tps_30d_query = construct_peak_tps_query(epoch, 30);
232 let epoch_tps: Tps =
233 read_only_blocking!(&self.blocking_cp, |conn| diesel::RunQueryDsl::get_result(
234 diesel::sql_query(epoch_peak_tps_query),
235 conn
236 ))
237 .context("Failed reading epoch peak TPS from PostgresDB")?;
238 let tps_30d: Tps =
239 read_only_blocking!(&self.blocking_cp, |conn| diesel::RunQueryDsl::get_result(
240 diesel::sql_query(peak_tps_30d_query),
241 conn
242 ))
243 .context("Failed reading 30d peak TPS from PostgresDB")?;
244
245 let epoch_peak_tps = StoredEpochPeakTps {
246 epoch,
247 peak_tps: epoch_tps.peak_tps,
248 peak_tps_30d: tps_30d.peak_tps,
249 };
250 transactional_blocking_with_retry!(
251 &self.blocking_cp,
252 |conn| {
253 diesel::insert_into(epoch_peak_tps::table)
254 .values(epoch_peak_tps.clone())
255 .on_conflict_do_nothing()
256 .execute(conn)
257 },
258 Duration::from_secs(10)
259 )
260 .context("Failed persisting epoch peak TPS to PostgresDB.")?;
261 Ok(())
262 }
263
264 async fn get_address_metrics_last_processed_tx_seq(&self) -> IndexerResult<Option<TxSeq>> {
265 let last_processed_tx_seq = read_only_blocking!(&self.blocking_cp, |conn| {
266 active_addresses::dsl::active_addresses
267 .order(active_addresses::dsl::last_appearance_tx.desc())
268 .select((active_addresses::dsl::last_appearance_tx,))
269 .first::<TxSeq>(conn)
270 .optional()
271 })
272 .context("Failed to read address metrics last processed tx sequence.")?;
273 Ok(last_processed_tx_seq)
274 }
275
276 fn persist_addresses_in_tx_range(
277 &self,
278 start_tx_seq: i64,
279 end_tx_seq: i64,
280 ) -> IndexerResult<()> {
281 let address_persist_query = construct_address_persisting_query(start_tx_seq, end_tx_seq);
282 transactional_blocking_with_retry!(
283 &self.blocking_cp,
284 |conn| {
285 diesel::sql_query(address_persist_query.clone()).execute(conn)?;
286 Ok::<(), IndexerError>(())
287 },
288 Duration::from_secs(10)
289 )
290 .context("Failed persisting addresses to PostgresDB")?;
291 Ok(())
292 }
293
294 fn persist_active_addresses_in_tx_range(
295 &self,
296 start_tx_seq: i64,
297 end_tx_seq: i64,
298 ) -> IndexerResult<()> {
299 let active_address_persist_query =
300 construct_active_address_persisting_query(start_tx_seq, end_tx_seq);
301 transactional_blocking_with_retry!(
302 &self.blocking_cp,
303 |conn| {
304 diesel::sql_query(active_address_persist_query.clone()).execute(conn)?;
305 Ok::<(), IndexerError>(())
306 },
307 Duration::from_secs(10)
308 )
309 .context("Failed persisting active addresses to PostgresDB")?;
310 Ok(())
311 }
312
313 async fn calculate_and_persist_address_metrics(&self, checkpoint: i64) -> IndexerResult<()> {
314 let mut checkpoint_opt = self
315 .get_checkpoints_in_range(checkpoint, checkpoint + 1)
316 .await?
317 .pop();
318 while checkpoint_opt.is_none() {
319 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
320 checkpoint_opt = self
321 .get_checkpoints_in_range(checkpoint, checkpoint + 1)
322 .await?
323 .pop();
324 }
325 let checkpoint = checkpoint_opt.unwrap();
326 let cp_timestamp_ms = checkpoint.timestamp_ms;
327 let addr_count = read_only_blocking!(&self.blocking_cp, |conn| {
328 addresses::dsl::addresses
329 .filter(addresses::first_appearance_time.le(cp_timestamp_ms))
330 .count()
331 .get_result::<i64>(conn)
332 })?;
333 let active_addr_count = read_only_blocking!(&self.blocking_cp, |conn| {
334 active_addresses::dsl::active_addresses
335 .filter(active_addresses::first_appearance_time.le(cp_timestamp_ms))
336 .count()
337 .get_result::<i64>(conn)
338 })?;
339 let time_one_day_ago = cp_timestamp_ms - 1000 * 60 * 60 * 24;
340 let daily_active_addresses = read_only_blocking!(&self.blocking_cp, |conn| {
341 active_addresses::dsl::active_addresses
342 .filter(active_addresses::first_appearance_time.le(cp_timestamp_ms))
343 .filter(active_addresses::last_appearance_time.gt(time_one_day_ago))
344 .select(count(active_addresses::address))
345 .first(conn)
346 })?;
347 let address_metrics_to_commit = StoredAddressMetrics {
348 checkpoint: checkpoint.sequence_number,
349 epoch: checkpoint.epoch,
350 timestamp_ms: checkpoint.timestamp_ms,
351 cumulative_addresses: addr_count,
352 cumulative_active_addresses: active_addr_count,
353 daily_active_addresses,
354 };
355 transactional_blocking_with_retry!(
356 &self.blocking_cp,
357 |conn| {
358 diesel::insert_into(address_metrics::table)
359 .values(address_metrics_to_commit.clone())
360 .on_conflict_do_nothing()
361 .execute(conn)
362 },
363 Duration::from_secs(60)
364 )
365 .context("Failed persisting address metrics to PostgresDB")?;
366 Ok(())
367 }
368
369 async fn get_latest_move_call_tx_seq(&self) -> IndexerResult<Option<TxSeq>> {
370 let last_processed_tx_seq = read_only_blocking!(&self.blocking_cp, |conn| {
371 move_calls::dsl::move_calls
372 .order(move_calls::dsl::transaction_sequence_number.desc())
373 .select((move_calls::dsl::transaction_sequence_number,))
374 .first::<TxSeq>(conn)
375 .optional()
376 })
377 .unwrap_or_default();
378 Ok(last_processed_tx_seq)
379 }
380
381 async fn get_latest_move_call_metrics(&self) -> IndexerResult<Option<StoredMoveCallMetrics>> {
382 let latest_move_call_metrics = read_only_blocking!(&self.blocking_cp, |conn| {
383 move_call_metrics::dsl::move_call_metrics
384 .order(move_call_metrics::epoch.desc())
385 .first::<QueriedMoveCallMetrics>(conn)
386 .optional()
387 })
388 .unwrap_or_default();
389 Ok(latest_move_call_metrics.map(|m| m.into()))
390 }
391
392 fn persist_move_calls_in_tx_range(
393 &self,
394 start_tx_seq: i64,
395 end_tx_seq: i64,
396 ) -> IndexerResult<()> {
397 let move_call_persist_query = construct_move_call_persist_query(start_tx_seq, end_tx_seq);
398 transactional_blocking_with_retry!(
399 &self.blocking_cp,
400 |conn| {
401 diesel::sql_query(move_call_persist_query.clone()).execute(conn)?;
402 Ok::<(), IndexerError>(())
403 },
404 Duration::from_secs(10)
405 )
406 .context("Failed persisting move calls to PostgresDB")?;
407 Ok(())
408 }
409
410 async fn calculate_and_persist_move_call_metrics(&self, epoch: i64) -> IndexerResult<()> {
411 let move_call_query_3d = build_move_call_metric_query(epoch, 3);
412 let move_call_query_7d = build_move_call_metric_query(epoch, 7);
413 let move_call_query_30d = build_move_call_metric_query(epoch, 30);
414
415 let mut calculate_tasks = vec![];
416 let blocking_cp_3d = self.blocking_cp.clone();
417 calculate_tasks.push(tokio::task::spawn_blocking(move || {
418 read_only_blocking!(&blocking_cp_3d, |conn| {
419 diesel::sql_query(move_call_query_3d).get_results::<QueriedMoveMetrics>(conn)
420 })
421 }));
422 let blocking_cp_7d = self.blocking_cp.clone();
423 calculate_tasks.push(tokio::task::spawn_blocking(move || {
424 read_only_blocking!(&blocking_cp_7d, |conn| {
425 diesel::sql_query(move_call_query_7d).get_results::<QueriedMoveMetrics>(conn)
426 })
427 }));
428 let blocking_cp_30d = self.blocking_cp.clone();
429 calculate_tasks.push(tokio::task::spawn_blocking(move || {
430 read_only_blocking!(&blocking_cp_30d, |conn| {
431 diesel::sql_query(move_call_query_30d).get_results::<QueriedMoveMetrics>(conn)
432 })
433 }));
434 let chained = futures::future::join_all(calculate_tasks)
435 .await
436 .into_iter()
437 .collect::<Result<Vec<_>, _>>()
438 .tap_err(|e| {
439 error!("Error joining move call calculation tasks: {:?}", e);
440 })?
441 .into_iter()
442 .collect::<Result<Vec<_>, _>>()
443 .tap_err(|e| {
444 error!("Error calculating move call metrics: {:?}", e);
445 })?
446 .into_iter()
447 .flatten()
448 .collect::<Vec<_>>();
449
450 let move_call_metrics: Vec<StoredMoveCallMetrics> = chained
451 .into_iter()
452 .filter_map(|queried_move_metrics| {
453 let package = ObjectID::from_bytes(queried_move_metrics.move_package.clone()).ok();
454 let package_str = match package {
455 Some(p) => p.to_canonical_string(true),
456 None => {
457 tracing::error!(
458 "Failed to parse move package ID: {:?}",
459 queried_move_metrics.move_package
460 );
461 return None;
462 }
463 };
464 Some(StoredMoveCallMetrics {
465 id: None,
466 epoch,
467 day: queried_move_metrics.day,
468 move_package: package_str,
469 move_module: queried_move_metrics.move_module,
470 move_function: queried_move_metrics.move_function,
471 count: queried_move_metrics.count,
472 })
473 })
474 .collect();
475
476 transactional_blocking_with_retry!(
477 &self.blocking_cp,
478 |conn| {
479 diesel::insert_into(move_call_metrics::table)
480 .values(move_call_metrics.clone())
481 .on_conflict_do_nothing()
482 .execute(conn)
483 },
484 Duration::from_secs(60)
485 )
486 .context("Failed persisting move call metrics to PostgresDB")?;
487 Ok(())
488 }
489}
490
491fn construct_checkpoint_tx_count_query(start_checkpoint: i64, end_checkpoint: i64) -> String {
492 format!(
493 "WITH expanded_checkpoint_range AS (
494 SELECT
495 sequence_number AS checkpoint_sequence_number,
496 epoch,
497 generate_series(min_tx_sequence_number, max_tx_sequence_number) AS tx_sequence_number
498 FROM checkpoints
499 WHERE sequence_number >= {start_checkpoint} AND sequence_number < {end_checkpoint}
500 )
501
502 INSERT INTO tx_count_metrics
503 SELECT
504 ecr.checkpoint_sequence_number,
505 ecr.epoch,
506 MAX(t.timestamp_ms) AS timestamp_ms,
507 COUNT(*) AS total_transaction_blocks,
508 SUM(CASE WHEN t.success_command_count > 0 THEN 1 ELSE 0 END) AS total_successful_transaction_blocks,
509 SUM(t.success_command_count) AS total_successful_transactions
510 FROM expanded_checkpoint_range ecr
511 JOIN transactions t
512 ON t.tx_sequence_number = ecr.tx_sequence_number
513 GROUP BY ecr.checkpoint_sequence_number, ecr.epoch
514 ORDER BY ecr.checkpoint_sequence_number
515 ON CONFLICT (checkpoint_sequence_number) DO NOTHING;"
516 )
517}
518
519fn construct_peak_tps_query(epoch: i64, offset: i64) -> String {
520 format!(
521 "WITH filtered_checkpoints AS (
522 SELECT
523 MAX(checkpoint_sequence_number) AS checkpoint_sequence_number,
524 SUM(total_successful_transactions) AS total_successful_transactions,
525 timestamp_ms
526 FROM
527 tx_count_metrics
528 WHERE epoch > ({} - {}) AND epoch <= {}
529 GROUP BY
530 timestamp_ms
531 ),
532 tps_data AS (
533 SELECT
534 checkpoint_sequence_number,
535 total_successful_transactions,
536 timestamp_ms - LAG(timestamp_ms) OVER (ORDER BY timestamp_ms) AS time_diff
537 FROM
538 filtered_checkpoints
539 )
540 SELECT
541 MAX(total_successful_transactions * 1000.0 / time_diff)::float8 as peak_tps
542 FROM
543 tps_data
544 WHERE
545 time_diff IS NOT NULL;
546 ",
547 epoch, offset, epoch
548 )
549}
550
551fn construct_address_persisting_query(start_tx_seq: i64, end_tx_seq: i64) -> String {
552 format!(
553 "WITH senders AS (
554 SELECT
555 s.sender AS address,
556 s.tx_sequence_number,
557 t.timestamp_ms
558 FROM tx_senders s
559 JOIN transactions t
560 ON s.tx_sequence_number = t.tx_sequence_number
561 WHERE s.tx_sequence_number >= {} AND s.tx_sequence_number < {}
562 ),
563 recipients AS (
564 SELECT
565 r.recipient AS address,
566 r.tx_sequence_number,
567 t.timestamp_ms
568 FROM tx_recipients r
569 JOIN transactions t
570 ON r.tx_sequence_number = t.tx_sequence_number
571 WHERE r.tx_sequence_number >= {} AND r.tx_sequence_number < {}
572 ),
573 union_address AS (
574 SELECT
575 address,
576 MIN(tx_sequence_number) as first_seq,
577 MIN(timestamp_ms) AS first_timestamp,
578 MAX(tx_sequence_number) as last_seq,
579 MAX(timestamp_ms) AS last_timestamp
580 FROM recipients GROUP BY address
581 UNION ALL
582 SELECT
583 address,
584 MIN(tx_sequence_number) as first_seq,
585 MIN(timestamp_ms) AS first_timestamp,
586 MAX(tx_sequence_number) as last_seq,
587 MAX(timestamp_ms) AS last_timestamp
588 FROM senders GROUP BY address
589 )
590 INSERT INTO addresses
591 SELECT
592 address,
593 MIN(first_seq) AS first_appearance_tx,
594 MIN(first_timestamp) AS first_appearance_time,
595 MAX(last_seq) AS last_appearance_tx,
596 MAX(last_timestamp) AS last_appearance_time
597 FROM union_address
598 GROUP BY address
599 ON CONFLICT (address) DO UPDATE
600 SET
601 last_appearance_tx = GREATEST(EXCLUDED.last_appearance_tx, addresses.last_appearance_tx),
602 last_appearance_time = GREATEST(EXCLUDED.last_appearance_time, addresses.last_appearance_time);
603 ",
604 start_tx_seq, end_tx_seq, start_tx_seq, end_tx_seq
605 )
606}
607
608fn construct_active_address_persisting_query(start_tx_seq: i64, end_tx_seq: i64) -> String {
609 format!(
610 "WITH senders AS (
611 SELECT
612 s.sender AS address,
613 s.tx_sequence_number,
614 t.timestamp_ms
615 FROM tx_senders s
616 JOIN transactions t
617 ON s.tx_sequence_number = t.tx_sequence_number
618 WHERE s.tx_sequence_number >= {} AND s.tx_sequence_number < {}
619 )
620 INSERT INTO active_addresses
621 SELECT
622 address,
623 MIN(tx_sequence_number) AS first_appearance_tx,
624 MIN(timestamp_ms) AS first_appearance_time,
625 MAX(tx_sequence_number) AS last_appearance_tx,
626 MAX(timestamp_ms) AS last_appearance_time
627 FROM senders
628 GROUP BY address
629 ON CONFLICT (address) DO UPDATE
630 SET
631 last_appearance_tx = GREATEST(EXCLUDED.last_appearance_tx, active_addresses.last_appearance_tx),
632 last_appearance_time = GREATEST(EXCLUDED.last_appearance_time, active_addresses.last_appearance_time);
633 ",
634 start_tx_seq, end_tx_seq
635 )
636}
637
638fn construct_move_call_persist_query(start_tx_seq: i64, end_tx_seq: i64) -> String {
639 format!(
640 "INSERT INTO move_calls
641 SELECT
642 m.tx_sequence_number AS transaction_sequence_number,
643 c.sequence_number AS checkpoint_sequence_number,
644 c.epoch AS epoch,
645 m.package AS move_package,
646 m.module AS move_module,
647 m.func AS move_function
648 FROM tx_calls_fun m
649 INNER JOIN transactions t
650 ON m.tx_sequence_number = t.tx_sequence_number
651 INNER JOIN checkpoints c
652 ON t.checkpoint_sequence_number = c.sequence_number
653 WHERE m.tx_sequence_number >= {} AND m.tx_sequence_number < {}
654 ON CONFLICT (transaction_sequence_number, move_package, move_module, move_function) DO NOTHING;
655 ",
656 start_tx_seq, end_tx_seq
657 )
658}