iota_indexer/store/
pg_indexer_analytical_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use core::result::Result::Ok;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, dsl::count};
10use downcast::Any;
11use iota_types::base_types::ObjectID;
12use tap::tap::TapFallible;
13use tracing::{error, info};
14
15use super::IndexerAnalyticalStore;
16use crate::{
17    db::ConnectionPool,
18    errors::{Context, IndexerError},
19    models::{
20        address_metrics::StoredAddressMetrics,
21        checkpoints::StoredCheckpoint,
22        move_call_metrics::{
23            QueriedMoveCallMetrics, QueriedMoveMetrics, StoredMoveCallMetrics,
24            build_move_call_metric_query,
25        },
26        network_metrics::{StoredEpochPeakTps, Tps},
27        transactions::{
28            StoredTransaction, StoredTransactionCheckpoint, StoredTransactionSuccessCommandCount,
29            StoredTransactionTimestamp, TxSeq,
30        },
31        tx_count_metrics::StoredTxCountMetrics,
32    },
33    schema::{
34        active_addresses, address_metrics, addresses, checkpoints, epoch_peak_tps,
35        move_call_metrics, move_calls, transactions, tx_count_metrics,
36    },
37    store::diesel_macro::{read_only_blocking, transactional_blocking_with_retry},
38    types::IndexerResult,
39};
40
41/// The store for the indexer analytical data. Represents a Postgres
42/// implementation of the `IndexerAnalyticalStore` trait.
43#[derive(Clone)]
44pub struct PgIndexerAnalyticalStore {
45    blocking_cp: ConnectionPool,
46}
47
48impl PgIndexerAnalyticalStore {
49    pub fn new(blocking_cp: ConnectionPool) -> Self {
50        Self { blocking_cp }
51    }
52}
53
54#[async_trait]
55impl IndexerAnalyticalStore for PgIndexerAnalyticalStore {
56    async fn get_latest_stored_checkpoint(&self) -> IndexerResult<Option<StoredCheckpoint>> {
57        let latest_cp = read_only_blocking!(&self.blocking_cp, |conn| {
58            checkpoints::dsl::checkpoints
59                .order(checkpoints::sequence_number.desc())
60                .first::<StoredCheckpoint>(conn)
61                .optional()
62        })
63        .context("Failed reading latest checkpoint from PostgresDB")?;
64        Ok(latest_cp)
65    }
66
67    async fn get_latest_stored_transaction(&self) -> IndexerResult<Option<StoredTransaction>> {
68        let latest_tx = read_only_blocking!(&self.blocking_cp, |conn| {
69            transactions::dsl::transactions
70                .order(transactions::tx_sequence_number.desc())
71                .first::<StoredTransaction>(conn)
72                .optional()
73        })
74        .context("Failed reading latest transaction from PostgresDB")?;
75        Ok(latest_tx)
76    }
77
78    async fn get_checkpoints_in_range(
79        &self,
80        start_checkpoint: i64,
81        end_checkpoint: i64,
82    ) -> IndexerResult<Vec<StoredCheckpoint>> {
83        let cps = read_only_blocking!(&self.blocking_cp, |conn| {
84            checkpoints::dsl::checkpoints
85                .filter(checkpoints::sequence_number.ge(start_checkpoint))
86                .filter(checkpoints::sequence_number.lt(end_checkpoint))
87                .order(checkpoints::sequence_number.asc())
88                .load::<StoredCheckpoint>(conn)
89        })
90        .context("Failed reading checkpoints from PostgresDB")?;
91        Ok(cps)
92    }
93
94    async fn get_tx_timestamps_in_checkpoint_range(
95        &self,
96        start_checkpoint: i64,
97        end_checkpoint: i64,
98    ) -> IndexerResult<Vec<StoredTransactionTimestamp>> {
99        let tx_timestamps = read_only_blocking!(&self.blocking_cp, |conn| {
100            transactions::dsl::transactions
101                .filter(transactions::dsl::checkpoint_sequence_number.ge(start_checkpoint))
102                .filter(transactions::dsl::checkpoint_sequence_number.lt(end_checkpoint))
103                .order(transactions::dsl::tx_sequence_number.asc())
104                .select((
105                    transactions::dsl::tx_sequence_number,
106                    transactions::dsl::timestamp_ms,
107                ))
108                .load::<StoredTransactionTimestamp>(conn)
109        })
110        .context("Failed reading transaction timestamps from PostgresDB")?;
111        Ok(tx_timestamps)
112    }
113
114    async fn get_tx_checkpoints_in_checkpoint_range(
115        &self,
116        start_checkpoint: i64,
117        end_checkpoint: i64,
118    ) -> IndexerResult<Vec<StoredTransactionCheckpoint>> {
119        let tx_checkpoints = read_only_blocking!(&self.blocking_cp, |conn| {
120            transactions::dsl::transactions
121                .filter(transactions::dsl::checkpoint_sequence_number.ge(start_checkpoint))
122                .filter(transactions::dsl::checkpoint_sequence_number.lt(end_checkpoint))
123                .order(transactions::dsl::tx_sequence_number.asc())
124                .select((
125                    transactions::dsl::tx_sequence_number,
126                    transactions::dsl::checkpoint_sequence_number,
127                ))
128                .load::<StoredTransactionCheckpoint>(conn)
129        })
130        .context("Failed reading transaction checkpoints from PostgresDB")?;
131        Ok(tx_checkpoints)
132    }
133
134    async fn get_tx_success_cmd_counts_in_checkpoint_range(
135        &self,
136        start_checkpoint: i64,
137        end_checkpoint: i64,
138    ) -> IndexerResult<Vec<StoredTransactionSuccessCommandCount>> {
139        let tx_success_cmd_counts = read_only_blocking!(&self.blocking_cp, |conn| {
140            transactions::dsl::transactions
141                .filter(transactions::dsl::checkpoint_sequence_number.ge(start_checkpoint))
142                .filter(transactions::dsl::checkpoint_sequence_number.lt(end_checkpoint))
143                .order(transactions::dsl::tx_sequence_number.asc())
144                .select((
145                    transactions::dsl::tx_sequence_number,
146                    transactions::dsl::checkpoint_sequence_number,
147                    transactions::dsl::success_command_count,
148                    transactions::dsl::timestamp_ms,
149                ))
150                .load::<StoredTransactionSuccessCommandCount>(conn)
151        })
152        .context("Failed reading transaction success command counts from PostgresDB")?;
153        Ok(tx_success_cmd_counts)
154    }
155    async fn get_tx(&self, tx_sequence_number: i64) -> IndexerResult<Option<StoredTransaction>> {
156        let tx = read_only_blocking!(&self.blocking_cp, |conn| {
157            transactions::dsl::transactions
158                .filter(transactions::dsl::tx_sequence_number.eq(tx_sequence_number))
159                .first::<StoredTransaction>(conn)
160                .optional()
161        })
162        .context("Failed reading transaction from PostgresDB")?;
163        Ok(tx)
164    }
165
166    async fn get_cp(&self, sequence_number: i64) -> IndexerResult<Option<StoredCheckpoint>> {
167        let cp = read_only_blocking!(&self.blocking_cp, |conn| {
168            checkpoints::dsl::checkpoints
169                .filter(checkpoints::dsl::sequence_number.eq(sequence_number))
170                .first::<StoredCheckpoint>(conn)
171                .optional()
172        })
173        .context("Failed reading checkpoint from PostgresDB")?;
174        Ok(cp)
175    }
176
177    async fn get_latest_tx_count_metrics(&self) -> IndexerResult<Option<StoredTxCountMetrics>> {
178        let latest_tx_count = read_only_blocking!(&self.blocking_cp, |conn| {
179            tx_count_metrics::dsl::tx_count_metrics
180                .order(tx_count_metrics::dsl::checkpoint_sequence_number.desc())
181                .first::<StoredTxCountMetrics>(conn)
182                .optional()
183        })
184        .context("Failed reading latest tx count metrics from PostgresDB")?;
185        Ok(latest_tx_count)
186    }
187
188    async fn get_latest_epoch_peak_tps(&self) -> IndexerResult<Option<StoredEpochPeakTps>> {
189        let latest_network_metrics = read_only_blocking!(&self.blocking_cp, |conn| {
190            epoch_peak_tps::dsl::epoch_peak_tps
191                .order(epoch_peak_tps::dsl::epoch.desc())
192                .first::<StoredEpochPeakTps>(conn)
193                .optional()
194        })
195        .context("Failed reading latest epoch peak TPS from PostgresDB")?;
196        Ok(latest_network_metrics)
197    }
198
199    /// Persists the transaction count metrics for the given checkpoint range.
200    /// Start checkpoint is inclusive, end checkpoint is exclusive.
201    fn persist_tx_count_metrics(
202        &self,
203        start_checkpoint: i64,
204        end_checkpoint: i64,
205    ) -> IndexerResult<()> {
206        let tx_count_query = construct_checkpoint_tx_count_query(start_checkpoint, end_checkpoint);
207        info!(
208            "Persisting tx count metrics for checkpoints [{}-{}]",
209            start_checkpoint,
210            end_checkpoint - 1
211        );
212        transactional_blocking_with_retry!(
213            &self.blocking_cp,
214            |conn| {
215                diesel::sql_query(tx_count_query.clone()).execute(conn)?;
216                Ok::<(), IndexerError>(())
217            },
218            Duration::from_secs(10)
219        )
220        .context("Failed persisting tx count metrics to PostgresDB")?;
221        info!(
222            "Persisted tx count metrics for checkpoints [{}-{}]",
223            start_checkpoint,
224            end_checkpoint - 1
225        );
226        Ok(())
227    }
228
229    async fn persist_epoch_peak_tps(&self, epoch: i64) -> IndexerResult<()> {
230        let epoch_peak_tps_query = construct_peak_tps_query(epoch, 1);
231        let peak_tps_30d_query = construct_peak_tps_query(epoch, 30);
232        let epoch_tps: Tps =
233            read_only_blocking!(&self.blocking_cp, |conn| diesel::RunQueryDsl::get_result(
234                diesel::sql_query(epoch_peak_tps_query),
235                conn
236            ))
237            .context("Failed reading epoch peak TPS from PostgresDB")?;
238        let tps_30d: Tps =
239            read_only_blocking!(&self.blocking_cp, |conn| diesel::RunQueryDsl::get_result(
240                diesel::sql_query(peak_tps_30d_query),
241                conn
242            ))
243            .context("Failed reading 30d peak TPS from PostgresDB")?;
244
245        let epoch_peak_tps = StoredEpochPeakTps {
246            epoch,
247            peak_tps: epoch_tps.peak_tps,
248            peak_tps_30d: tps_30d.peak_tps,
249        };
250        transactional_blocking_with_retry!(
251            &self.blocking_cp,
252            |conn| {
253                diesel::insert_into(epoch_peak_tps::table)
254                    .values(epoch_peak_tps.clone())
255                    .on_conflict_do_nothing()
256                    .execute(conn)
257            },
258            Duration::from_secs(10)
259        )
260        .context("Failed persisting epoch peak TPS to PostgresDB.")?;
261        Ok(())
262    }
263
264    async fn get_address_metrics_last_processed_tx_seq(&self) -> IndexerResult<Option<TxSeq>> {
265        let last_processed_tx_seq = read_only_blocking!(&self.blocking_cp, |conn| {
266            active_addresses::dsl::active_addresses
267                .order(active_addresses::dsl::last_appearance_tx.desc())
268                .select((active_addresses::dsl::last_appearance_tx,))
269                .first::<TxSeq>(conn)
270                .optional()
271        })
272        .context("Failed to read address metrics last processed tx sequence.")?;
273        Ok(last_processed_tx_seq)
274    }
275
276    fn persist_addresses_in_tx_range(
277        &self,
278        start_tx_seq: i64,
279        end_tx_seq: i64,
280    ) -> IndexerResult<()> {
281        let address_persist_query = construct_address_persisting_query(start_tx_seq, end_tx_seq);
282        transactional_blocking_with_retry!(
283            &self.blocking_cp,
284            |conn| {
285                diesel::sql_query(address_persist_query.clone()).execute(conn)?;
286                Ok::<(), IndexerError>(())
287            },
288            Duration::from_secs(10)
289        )
290        .context("Failed persisting addresses to PostgresDB")?;
291        Ok(())
292    }
293
294    fn persist_active_addresses_in_tx_range(
295        &self,
296        start_tx_seq: i64,
297        end_tx_seq: i64,
298    ) -> IndexerResult<()> {
299        let active_address_persist_query =
300            construct_active_address_persisting_query(start_tx_seq, end_tx_seq);
301        transactional_blocking_with_retry!(
302            &self.blocking_cp,
303            |conn| {
304                diesel::sql_query(active_address_persist_query.clone()).execute(conn)?;
305                Ok::<(), IndexerError>(())
306            },
307            Duration::from_secs(10)
308        )
309        .context("Failed persisting active addresses to PostgresDB")?;
310        Ok(())
311    }
312
313    async fn calculate_and_persist_address_metrics(&self, checkpoint: i64) -> IndexerResult<()> {
314        let mut checkpoint_opt = self
315            .get_checkpoints_in_range(checkpoint, checkpoint + 1)
316            .await?
317            .pop();
318        while checkpoint_opt.is_none() {
319            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
320            checkpoint_opt = self
321                .get_checkpoints_in_range(checkpoint, checkpoint + 1)
322                .await?
323                .pop();
324        }
325        let checkpoint = checkpoint_opt.unwrap();
326        let cp_timestamp_ms = checkpoint.timestamp_ms;
327        let addr_count = read_only_blocking!(&self.blocking_cp, |conn| {
328            addresses::dsl::addresses
329                .filter(addresses::first_appearance_time.le(cp_timestamp_ms))
330                .count()
331                .get_result::<i64>(conn)
332        })?;
333        let active_addr_count = read_only_blocking!(&self.blocking_cp, |conn| {
334            active_addresses::dsl::active_addresses
335                .filter(active_addresses::first_appearance_time.le(cp_timestamp_ms))
336                .count()
337                .get_result::<i64>(conn)
338        })?;
339        let time_one_day_ago = cp_timestamp_ms - 1000 * 60 * 60 * 24;
340        let daily_active_addresses = read_only_blocking!(&self.blocking_cp, |conn| {
341            active_addresses::dsl::active_addresses
342                .filter(active_addresses::first_appearance_time.le(cp_timestamp_ms))
343                .filter(active_addresses::last_appearance_time.gt(time_one_day_ago))
344                .select(count(active_addresses::address))
345                .first(conn)
346        })?;
347        let address_metrics_to_commit = StoredAddressMetrics {
348            checkpoint: checkpoint.sequence_number,
349            epoch: checkpoint.epoch,
350            timestamp_ms: checkpoint.timestamp_ms,
351            cumulative_addresses: addr_count,
352            cumulative_active_addresses: active_addr_count,
353            daily_active_addresses,
354        };
355        transactional_blocking_with_retry!(
356            &self.blocking_cp,
357            |conn| {
358                diesel::insert_into(address_metrics::table)
359                    .values(address_metrics_to_commit.clone())
360                    .on_conflict_do_nothing()
361                    .execute(conn)
362            },
363            Duration::from_secs(60)
364        )
365        .context("Failed persisting address metrics to PostgresDB")?;
366        Ok(())
367    }
368
369    async fn get_latest_move_call_tx_seq(&self) -> IndexerResult<Option<TxSeq>> {
370        let last_processed_tx_seq = read_only_blocking!(&self.blocking_cp, |conn| {
371            move_calls::dsl::move_calls
372                .order(move_calls::dsl::transaction_sequence_number.desc())
373                .select((move_calls::dsl::transaction_sequence_number,))
374                .first::<TxSeq>(conn)
375                .optional()
376        })
377        .unwrap_or_default();
378        Ok(last_processed_tx_seq)
379    }
380
381    async fn get_latest_move_call_metrics(&self) -> IndexerResult<Option<StoredMoveCallMetrics>> {
382        let latest_move_call_metrics = read_only_blocking!(&self.blocking_cp, |conn| {
383            move_call_metrics::dsl::move_call_metrics
384                .order(move_call_metrics::epoch.desc())
385                .first::<QueriedMoveCallMetrics>(conn)
386                .optional()
387        })
388        .unwrap_or_default();
389        Ok(latest_move_call_metrics.map(|m| m.into()))
390    }
391
392    fn persist_move_calls_in_tx_range(
393        &self,
394        start_tx_seq: i64,
395        end_tx_seq: i64,
396    ) -> IndexerResult<()> {
397        let move_call_persist_query = construct_move_call_persist_query(start_tx_seq, end_tx_seq);
398        transactional_blocking_with_retry!(
399            &self.blocking_cp,
400            |conn| {
401                diesel::sql_query(move_call_persist_query.clone()).execute(conn)?;
402                Ok::<(), IndexerError>(())
403            },
404            Duration::from_secs(10)
405        )
406        .context("Failed persisting move calls to PostgresDB")?;
407        Ok(())
408    }
409
410    async fn calculate_and_persist_move_call_metrics(&self, epoch: i64) -> IndexerResult<()> {
411        let move_call_query_3d = build_move_call_metric_query(epoch, 3);
412        let move_call_query_7d = build_move_call_metric_query(epoch, 7);
413        let move_call_query_30d = build_move_call_metric_query(epoch, 30);
414
415        let mut calculate_tasks = vec![];
416        let blocking_cp_3d = self.blocking_cp.clone();
417        calculate_tasks.push(tokio::task::spawn_blocking(move || {
418            read_only_blocking!(&blocking_cp_3d, |conn| {
419                diesel::sql_query(move_call_query_3d).get_results::<QueriedMoveMetrics>(conn)
420            })
421        }));
422        let blocking_cp_7d = self.blocking_cp.clone();
423        calculate_tasks.push(tokio::task::spawn_blocking(move || {
424            read_only_blocking!(&blocking_cp_7d, |conn| {
425                diesel::sql_query(move_call_query_7d).get_results::<QueriedMoveMetrics>(conn)
426            })
427        }));
428        let blocking_cp_30d = self.blocking_cp.clone();
429        calculate_tasks.push(tokio::task::spawn_blocking(move || {
430            read_only_blocking!(&blocking_cp_30d, |conn| {
431                diesel::sql_query(move_call_query_30d).get_results::<QueriedMoveMetrics>(conn)
432            })
433        }));
434        let chained = futures::future::join_all(calculate_tasks)
435            .await
436            .into_iter()
437            .collect::<Result<Vec<_>, _>>()
438            .tap_err(|e| {
439                error!("Error joining move call calculation tasks: {:?}", e);
440            })?
441            .into_iter()
442            .collect::<Result<Vec<_>, _>>()
443            .tap_err(|e| {
444                error!("Error calculating move call metrics: {:?}", e);
445            })?
446            .into_iter()
447            .flatten()
448            .collect::<Vec<_>>();
449
450        let move_call_metrics: Vec<StoredMoveCallMetrics> = chained
451            .into_iter()
452            .filter_map(|queried_move_metrics| {
453                let package = ObjectID::from_bytes(queried_move_metrics.move_package.clone()).ok();
454                let package_str = match package {
455                    Some(p) => p.to_canonical_string(/* with_prefix */ true),
456                    None => {
457                        tracing::error!(
458                            "Failed to parse move package ID: {:?}",
459                            queried_move_metrics.move_package
460                        );
461                        return None;
462                    }
463                };
464                Some(StoredMoveCallMetrics {
465                    id: None,
466                    epoch,
467                    day: queried_move_metrics.day,
468                    move_package: package_str,
469                    move_module: queried_move_metrics.move_module,
470                    move_function: queried_move_metrics.move_function,
471                    count: queried_move_metrics.count,
472                })
473            })
474            .collect();
475
476        transactional_blocking_with_retry!(
477            &self.blocking_cp,
478            |conn| {
479                diesel::insert_into(move_call_metrics::table)
480                    .values(move_call_metrics.clone())
481                    .on_conflict_do_nothing()
482                    .execute(conn)
483            },
484            Duration::from_secs(60)
485        )
486        .context("Failed persisting move call metrics to PostgresDB")?;
487        Ok(())
488    }
489}
490
491fn construct_checkpoint_tx_count_query(start_checkpoint: i64, end_checkpoint: i64) -> String {
492    format!(
493        "WITH expanded_checkpoint_range AS (
494            SELECT
495                sequence_number AS checkpoint_sequence_number,
496                epoch,
497                generate_series(min_tx_sequence_number, max_tx_sequence_number) AS tx_sequence_number
498            FROM checkpoints
499            WHERE sequence_number >= {start_checkpoint} AND sequence_number < {end_checkpoint}
500        )
501
502        INSERT INTO tx_count_metrics
503        SELECT
504            ecr.checkpoint_sequence_number,
505            ecr.epoch,
506            MAX(t.timestamp_ms) AS timestamp_ms,
507            COUNT(*) AS total_transaction_blocks,
508            SUM(CASE WHEN t.success_command_count > 0 THEN 1 ELSE 0 END) AS total_successful_transaction_blocks,
509            SUM(t.success_command_count) AS total_successful_transactions
510        FROM expanded_checkpoint_range ecr
511        JOIN transactions t
512            ON t.tx_sequence_number = ecr.tx_sequence_number
513        GROUP BY ecr.checkpoint_sequence_number, ecr.epoch
514        ORDER BY ecr.checkpoint_sequence_number
515        ON CONFLICT (checkpoint_sequence_number) DO NOTHING;"
516    )
517}
518
519fn construct_peak_tps_query(epoch: i64, offset: i64) -> String {
520    format!(
521        "WITH filtered_checkpoints AS (
522            SELECT
523              MAX(checkpoint_sequence_number) AS checkpoint_sequence_number,
524              SUM(total_successful_transactions) AS total_successful_transactions,
525              timestamp_ms
526            FROM
527              tx_count_metrics
528              WHERE epoch > ({} - {}) AND epoch <= {}
529            GROUP BY
530              timestamp_ms
531          ),
532          tps_data AS (
533            SELECT
534              checkpoint_sequence_number,
535              total_successful_transactions,
536              timestamp_ms - LAG(timestamp_ms) OVER (ORDER BY timestamp_ms) AS time_diff
537            FROM
538              filtered_checkpoints
539          )
540          SELECT
541            MAX(total_successful_transactions * 1000.0 / time_diff)::float8 as peak_tps
542          FROM
543            tps_data
544          WHERE
545            time_diff IS NOT NULL;
546        ",
547        epoch, offset, epoch
548    )
549}
550
551fn construct_address_persisting_query(start_tx_seq: i64, end_tx_seq: i64) -> String {
552    format!(
553        "WITH senders AS (
554        SELECT
555            s.sender AS address,
556            s.tx_sequence_number,
557            t.timestamp_ms
558        FROM tx_senders s
559        JOIN transactions t
560        ON s.tx_sequence_number = t.tx_sequence_number
561        WHERE s.tx_sequence_number >= {} AND s.tx_sequence_number < {}
562      ),
563      recipients AS (
564        SELECT
565            r.recipient AS address,
566            r.tx_sequence_number,
567            t.timestamp_ms
568        FROM tx_recipients r
569        JOIN transactions t
570        ON r.tx_sequence_number = t.tx_sequence_number
571        WHERE r.tx_sequence_number >= {} AND r.tx_sequence_number < {}
572      ),
573      union_address AS (
574        SELECT
575            address,
576            MIN(tx_sequence_number) as first_seq,
577            MIN(timestamp_ms) AS first_timestamp,
578            MAX(tx_sequence_number) as last_seq,
579            MAX(timestamp_ms) AS last_timestamp
580        FROM recipients GROUP BY address
581        UNION ALL
582        SELECT
583            address,
584            MIN(tx_sequence_number) as first_seq,
585            MIN(timestamp_ms) AS first_timestamp,
586            MAX(tx_sequence_number) as last_seq,
587            MAX(timestamp_ms) AS last_timestamp
588        FROM senders GROUP BY address
589      )
590      INSERT INTO addresses
591      SELECT
592        address,
593        MIN(first_seq) AS first_appearance_tx,
594        MIN(first_timestamp) AS first_appearance_time,
595        MAX(last_seq) AS last_appearance_tx,
596        MAX(last_timestamp) AS last_appearance_time
597      FROM union_address
598      GROUP BY address
599      ON CONFLICT (address) DO UPDATE
600      SET
601        last_appearance_tx = GREATEST(EXCLUDED.last_appearance_tx, addresses.last_appearance_tx),
602        last_appearance_time = GREATEST(EXCLUDED.last_appearance_time, addresses.last_appearance_time);
603    ",
604        start_tx_seq, end_tx_seq, start_tx_seq, end_tx_seq
605    )
606}
607
608fn construct_active_address_persisting_query(start_tx_seq: i64, end_tx_seq: i64) -> String {
609    format!(
610        "WITH senders AS (
611        SELECT
612            s.sender AS address,
613            s.tx_sequence_number,
614            t.timestamp_ms
615        FROM tx_senders s
616        JOIN transactions t
617        ON s.tx_sequence_number = t.tx_sequence_number
618        WHERE s.tx_sequence_number >= {} AND s.tx_sequence_number < {}
619      )
620      INSERT INTO active_addresses
621      SELECT
622            address,
623            MIN(tx_sequence_number) AS first_appearance_tx,
624            MIN(timestamp_ms) AS first_appearance_time,
625            MAX(tx_sequence_number) AS last_appearance_tx,
626            MAX(timestamp_ms) AS last_appearance_time
627      FROM senders
628      GROUP BY address
629      ON CONFLICT (address) DO UPDATE
630      SET
631        last_appearance_tx = GREATEST(EXCLUDED.last_appearance_tx, active_addresses.last_appearance_tx),
632        last_appearance_time = GREATEST(EXCLUDED.last_appearance_time, active_addresses.last_appearance_time);
633    ",
634        start_tx_seq, end_tx_seq
635    )
636}
637
638fn construct_move_call_persist_query(start_tx_seq: i64, end_tx_seq: i64) -> String {
639    format!(
640        "INSERT INTO move_calls
641    SELECT
642        m.tx_sequence_number AS transaction_sequence_number,
643        c.sequence_number AS checkpoint_sequence_number,
644        c.epoch AS epoch,
645        m.package AS move_package,
646        m.module AS move_module,
647        m.func AS move_function
648    FROM tx_calls_fun m
649    INNER JOIN transactions t
650        ON m.tx_sequence_number = t.tx_sequence_number
651    INNER JOIN checkpoints c
652        ON t.checkpoint_sequence_number = c.sequence_number
653    WHERE m.tx_sequence_number >= {} AND m.tx_sequence_number < {}
654    ON CONFLICT (transaction_sequence_number, move_package, move_module, move_function) DO NOTHING;
655    ",
656        start_tx_seq, end_tx_seq
657    )
658}