iota_indexer/processors/
address_metrics_processor.rs1use tap::tap::TapFallible;
6use tracing::{error, info};
7
8use crate::{metrics::IndexerMetrics, store::IndexerAnalyticalStore, types::IndexerResult};
9
10const ADDRESS_PROCESSOR_BATCH_SIZE: usize = 80000;
11const PARALLELISM: usize = 10;
12
13pub struct AddressMetricsProcessor<S> {
14 pub store: S,
15 metrics: IndexerMetrics,
16 pub address_processor_batch_size: usize,
17 pub address_processor_parallelism: usize,
18}
19
20impl<S> AddressMetricsProcessor<S>
21where
22 S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
23{
24 pub fn new(store: S, metrics: IndexerMetrics) -> AddressMetricsProcessor<S> {
25 let address_processor_batch_size = std::env::var("ADDRESS_PROCESSOR_BATCH_SIZE")
26 .map(|s| s.parse::<usize>().unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE))
27 .unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE);
28 let address_processor_parallelism = std::env::var("ADDRESS_PROCESSOR_PARALLELISM")
29 .map(|s| s.parse::<usize>().unwrap_or(PARALLELISM))
30 .unwrap_or(PARALLELISM);
31 Self {
32 store,
33 metrics,
34 address_processor_batch_size,
35 address_processor_parallelism,
36 }
37 }
38
39 pub async fn start(&self) -> IndexerResult<()> {
40 info!("Indexer address metrics async processor started...");
41 let latest_tx_seq = self
42 .store
43 .get_address_metrics_last_processed_tx_seq()
44 .await?;
45 let mut last_processed_tx_seq = latest_tx_seq.unwrap_or_default().seq;
46 loop {
47 let mut latest_tx = self.store.get_latest_stored_transaction().await?;
48 while if let Some(tx) = latest_tx {
49 tx.tx_sequence_number
50 < last_processed_tx_seq + self.address_processor_batch_size as i64
51 } else {
52 true
53 } {
54 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
55 latest_tx = self.store.get_latest_stored_transaction().await?;
56 }
57
58 let mut persist_tasks = vec![];
59 let batch_size = self.address_processor_batch_size;
60 let step_size = batch_size / self.address_processor_parallelism;
61 for chunk_start_tx_seq in (last_processed_tx_seq + 1
62 ..last_processed_tx_seq + batch_size as i64 + 1)
63 .step_by(step_size)
64 {
65 let address_store = self.store.clone();
66 persist_tasks.push(tokio::task::spawn_blocking(move || {
67 address_store.persist_addresses_in_tx_range(
68 chunk_start_tx_seq,
69 chunk_start_tx_seq + step_size as i64,
70 )
71 }));
72 }
73 futures::future::join_all(persist_tasks)
74 .await
75 .into_iter()
76 .collect::<Result<Vec<_>, _>>()
77 .tap_err(|e| {
78 error!("Error joining address persist tasks: {:?}", e);
79 })?
80 .into_iter()
81 .collect::<Result<Vec<_>, _>>()
82 .tap_err(|e| {
83 error!("Error persisting addresses or active addresses: {:?}", e);
84 })?;
85 last_processed_tx_seq += self.address_processor_batch_size as i64;
86 info!(
87 "Persisted addresses and active addresses for tx seq: {}",
88 last_processed_tx_seq,
89 );
90 self.metrics
91 .latest_address_metrics_tx_seq
92 .set(last_processed_tx_seq);
93
94 let mut last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?;
95 while last_processed_tx.is_none() {
96 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
97 last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?;
98 }
99 let last_processed_cp = last_processed_tx.unwrap().checkpoint_sequence_number;
101 self.store
102 .calculate_and_persist_address_metrics(last_processed_cp)
103 .await?;
104 info!(
105 "Persisted address metrics for checkpoint: {}",
106 last_processed_cp
107 );
108 }
109 }
110}