iota_indexer/processors/
move_call_metrics_processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use tap::tap::TapFallible;
6use tracing::{error, info};
7
8use crate::{metrics::IndexerMetrics, store::IndexerAnalyticalStore, types::IndexerResult};
9
10const MOVE_CALL_PROCESSOR_BATCH_SIZE: usize = 80000;
11const PARALLELISM: usize = 10;
12
13pub struct MoveCallMetricsProcessor<S> {
14    pub store: S,
15    metrics: IndexerMetrics,
16    pub move_call_processor_batch_size: usize,
17    pub move_call_processor_parallelism: usize,
18}
19
20impl<S> MoveCallMetricsProcessor<S>
21where
22    S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
23{
24    pub fn new(store: S, metrics: IndexerMetrics) -> MoveCallMetricsProcessor<S> {
25        let move_call_processor_batch_size = std::env::var("MOVE_CALL_PROCESSOR_BATCH_SIZE")
26            .map(|s| s.parse::<usize>().unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE))
27            .unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE);
28        let move_call_processor_parallelism = std::env::var("MOVE_CALL_PROCESSOR_PARALLELISM")
29            .map(|s| s.parse::<usize>().unwrap_or(PARALLELISM))
30            .unwrap_or(PARALLELISM);
31        Self {
32            store,
33            metrics,
34            move_call_processor_batch_size,
35            move_call_processor_parallelism,
36        }
37    }
38
39    pub async fn start(&self) -> IndexerResult<()> {
40        info!("Indexer move call metrics async processor started...");
41        let latest_move_call_tx_seq = self.store.get_latest_move_call_tx_seq().await?;
42        let mut last_processed_tx_seq = latest_move_call_tx_seq.unwrap_or_default().seq;
43        let latest_move_call_epoch = self.store.get_latest_move_call_metrics().await?;
44        let mut last_processed_epoch = latest_move_call_epoch.unwrap_or_default().epoch;
45        loop {
46            let mut latest_tx = self.store.get_latest_stored_transaction().await?;
47            while if let Some(tx) = latest_tx {
48                tx.tx_sequence_number
49                    < last_processed_tx_seq + self.move_call_processor_batch_size as i64
50            } else {
51                true
52            } {
53                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
54                latest_tx = self.store.get_latest_stored_transaction().await?;
55            }
56
57            let batch_size = self.move_call_processor_batch_size;
58            let step_size = batch_size / self.move_call_processor_parallelism;
59            let mut persist_tasks = vec![];
60            for chunk_start_tx_seq in (last_processed_tx_seq + 1
61                ..last_processed_tx_seq + batch_size as i64 + 1)
62                .step_by(step_size)
63            {
64                let move_call_store = self.store.clone();
65                persist_tasks.push(tokio::task::spawn_blocking(move || {
66                    move_call_store.persist_move_calls_in_tx_range(
67                        chunk_start_tx_seq,
68                        chunk_start_tx_seq + step_size as i64,
69                    )
70                }));
71            }
72            futures::future::join_all(persist_tasks)
73                .await
74                .into_iter()
75                .collect::<Result<Vec<_>, _>>()
76                .tap_err(|e| {
77                    error!("Error joining move call persist tasks: {:?}", e);
78                })?
79                .into_iter()
80                .collect::<Result<Vec<_>, _>>()
81                .tap_err(|e| {
82                    error!("Error persisting move calls: {:?}", e);
83                })?;
84            last_processed_tx_seq += batch_size as i64;
85            info!("Persisted move_calls at tx seq: {}", last_processed_tx_seq);
86            self.metrics
87                .latest_move_call_metrics_tx_seq
88                .set(last_processed_tx_seq);
89
90            let mut tx = self.store.get_tx(last_processed_tx_seq).await?;
91            while tx.is_none() {
92                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
93                tx = self.store.get_tx(last_processed_tx_seq).await?;
94            }
95            let cp_seq = tx.unwrap().checkpoint_sequence_number;
96            let mut cp = self.store.get_cp(cp_seq).await?;
97            while cp.is_none() {
98                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
99                cp = self.store.get_cp(cp_seq).await?;
100            }
101            let end_epoch = cp.unwrap().epoch;
102            for epoch in last_processed_epoch + 1..end_epoch {
103                self.store
104                    .calculate_and_persist_move_call_metrics(epoch)
105                    .await?;
106                info!("Persisted move_call_metrics for epoch: {}", epoch);
107            }
108            last_processed_epoch = end_epoch - 1;
109        }
110    }
111}