iota_indexer/processors/
address_metrics_processor.rs1use tap::tap::TapFallible;
6use tracing::{error, info};
7
8use crate::{metrics::IndexerMetrics, store::IndexerAnalyticalStore, types::IndexerResult};
9
10const ADDRESS_PROCESSOR_BATCH_SIZE: usize = 80000;
11const PARALLELISM: usize = 10;
12
13pub struct AddressMetricsProcessor<S> {
14 pub store: S,
15 metrics: IndexerMetrics,
16 pub address_processor_batch_size: usize,
17 pub address_processor_parallelism: usize,
18}
19
20impl<S> AddressMetricsProcessor<S>
21where
22 S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
23{
24 pub fn new(store: S, metrics: IndexerMetrics) -> AddressMetricsProcessor<S> {
25 let address_processor_batch_size = std::env::var("ADDRESS_PROCESSOR_BATCH_SIZE")
26 .map(|s| s.parse::<usize>().unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE))
27 .unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE);
28 let address_processor_parallelism = std::env::var("ADDRESS_PROCESSOR_PARALLELISM")
29 .map(|s| s.parse::<usize>().unwrap_or(PARALLELISM))
30 .unwrap_or(PARALLELISM);
31 Self {
32 store,
33 metrics,
34 address_processor_batch_size,
35 address_processor_parallelism,
36 }
37 }
38
39 pub async fn start(&self) -> IndexerResult<()> {
40 info!("Indexer address metrics async processor started...");
41 let latest_tx_seq = self
42 .store
43 .get_address_metrics_last_processed_tx_seq()
44 .await?;
45 let mut last_processed_tx_seq = latest_tx_seq.unwrap_or_default().seq;
46 loop {
47 let mut latest_tx = self.store.get_latest_stored_transaction().await?;
48 while if let Some(tx) = latest_tx {
49 tx.tx_sequence_number
50 < last_processed_tx_seq + self.address_processor_batch_size as i64
51 } else {
52 true
53 } {
54 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
55 latest_tx = self.store.get_latest_stored_transaction().await?;
56 }
57
58 let mut persist_tasks = vec![];
59 let batch_size = self.address_processor_batch_size;
60 let step_size = batch_size / self.address_processor_parallelism;
61 for chunk_start_tx_seq in (last_processed_tx_seq + 1
62 ..last_processed_tx_seq + batch_size as i64 + 1)
63 .step_by(step_size)
64 {
65 let active_address_store = self.store.clone();
66 persist_tasks.push(tokio::task::spawn_blocking(move || {
67 active_address_store.persist_active_addresses_in_tx_range(
68 chunk_start_tx_seq,
69 chunk_start_tx_seq + step_size as i64,
70 )
71 }));
72 }
73 for chunk_start_tx_seq in (last_processed_tx_seq + 1
74 ..last_processed_tx_seq + batch_size as i64 + 1)
75 .step_by(step_size)
76 {
77 let address_store = self.store.clone();
78 persist_tasks.push(tokio::task::spawn_blocking(move || {
79 address_store.persist_addresses_in_tx_range(
80 chunk_start_tx_seq,
81 chunk_start_tx_seq + step_size as i64,
82 )
83 }));
84 }
85 futures::future::join_all(persist_tasks)
86 .await
87 .into_iter()
88 .collect::<Result<Vec<_>, _>>()
89 .tap_err(|e| {
90 error!("Error joining address persist tasks: {:?}", e);
91 })?
92 .into_iter()
93 .collect::<Result<Vec<_>, _>>()
94 .tap_err(|e| {
95 error!("Error persisting addresses or active addresses: {:?}", e);
96 })?;
97 last_processed_tx_seq += self.address_processor_batch_size as i64;
98 info!(
99 "Persisted addresses and active addresses for tx seq: {}",
100 last_processed_tx_seq,
101 );
102 self.metrics
103 .latest_address_metrics_tx_seq
104 .set(last_processed_tx_seq);
105
106 let mut last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?;
107 while last_processed_tx.is_none() {
108 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
109 last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?;
110 }
111 let last_processed_cp = last_processed_tx.unwrap().checkpoint_sequence_number;
113 self.store
114 .calculate_and_persist_address_metrics(last_processed_cp)
115 .await?;
116 info!(
117 "Persisted address metrics for checkpoint: {}",
118 last_processed_cp
119 );
120 }
121 }
122}