iota_indexer/processors/
move_call_metrics_processor.rs1use tap::tap::TapFallible;
6use tracing::{error, info};
7
8use crate::{metrics::IndexerMetrics, store::IndexerAnalyticalStore, types::IndexerResult};
9
10const MOVE_CALL_PROCESSOR_BATCH_SIZE: usize = 80000;
11const PARALLELISM: usize = 10;
12
13pub struct MoveCallMetricsProcessor<S> {
14 pub store: S,
15 metrics: IndexerMetrics,
16 pub move_call_processor_batch_size: usize,
17 pub move_call_processor_parallelism: usize,
18}
19
20impl<S> MoveCallMetricsProcessor<S>
21where
22 S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
23{
24 pub fn new(store: S, metrics: IndexerMetrics) -> MoveCallMetricsProcessor<S> {
25 let move_call_processor_batch_size = std::env::var("MOVE_CALL_PROCESSOR_BATCH_SIZE")
26 .map(|s| s.parse::<usize>().unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE))
27 .unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE);
28 let move_call_processor_parallelism = std::env::var("MOVE_CALL_PROCESSOR_PARALLELISM")
29 .map(|s| s.parse::<usize>().unwrap_or(PARALLELISM))
30 .unwrap_or(PARALLELISM);
31 Self {
32 store,
33 metrics,
34 move_call_processor_batch_size,
35 move_call_processor_parallelism,
36 }
37 }
38
39 pub async fn start(&self) -> IndexerResult<()> {
40 info!("Indexer move call metrics async processor started...");
41 let latest_move_call_tx_seq = self.store.get_latest_move_call_tx_seq().await?;
42 let mut last_processed_tx_seq = latest_move_call_tx_seq.unwrap_or_default().seq;
43 let latest_move_call_epoch = self.store.get_latest_move_call_metrics().await?;
44 let mut last_processed_epoch = latest_move_call_epoch.unwrap_or_default().epoch;
45 loop {
46 let mut latest_tx = self.store.get_latest_stored_transaction().await?;
47 while if let Some(tx) = latest_tx {
48 tx.tx_sequence_number
49 < last_processed_tx_seq + self.move_call_processor_batch_size as i64
50 } else {
51 true
52 } {
53 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
54 latest_tx = self.store.get_latest_stored_transaction().await?;
55 }
56
57 let batch_size = self.move_call_processor_batch_size;
58 let step_size = batch_size / self.move_call_processor_parallelism;
59 let mut persist_tasks = vec![];
60 for chunk_start_tx_seq in (last_processed_tx_seq + 1
61 ..last_processed_tx_seq + batch_size as i64 + 1)
62 .step_by(step_size)
63 {
64 let move_call_store = self.store.clone();
65 persist_tasks.push(tokio::task::spawn_blocking(move || {
66 move_call_store.persist_move_calls_in_tx_range(
67 chunk_start_tx_seq,
68 chunk_start_tx_seq + step_size as i64,
69 )
70 }));
71 }
72 futures::future::join_all(persist_tasks)
73 .await
74 .into_iter()
75 .collect::<Result<Vec<_>, _>>()
76 .tap_err(|e| {
77 error!("Error joining move call persist tasks: {:?}", e);
78 })?
79 .into_iter()
80 .collect::<Result<Vec<_>, _>>()
81 .tap_err(|e| {
82 error!("Error persisting move calls: {:?}", e);
83 })?;
84 last_processed_tx_seq += batch_size as i64;
85 info!("Persisted move_calls at tx seq: {}", last_processed_tx_seq);
86 self.metrics
87 .latest_move_call_metrics_tx_seq
88 .set(last_processed_tx_seq);
89
90 let mut tx = self.store.get_tx(last_processed_tx_seq).await?;
91 while tx.is_none() {
92 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
93 tx = self.store.get_tx(last_processed_tx_seq).await?;
94 }
95 let cp_seq = tx.unwrap().checkpoint_sequence_number;
96 let mut cp = self.store.get_cp(cp_seq).await?;
97 while cp.is_none() {
98 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
99 cp = self.store.get_cp(cp_seq).await?;
100 }
101 let end_epoch = cp.unwrap().epoch;
102 for epoch in last_processed_epoch + 1..end_epoch {
103 self.store
104 .calculate_and_persist_move_call_metrics(epoch)
105 .await?;
106 info!("Persisted move_call_metrics for epoch: {}", epoch);
107 }
108 last_processed_epoch = end_epoch - 1;
109 }
110 }
111}