iota_indexer/processors/
network_metrics_processor.rs1use tap::tap::TapFallible;
6use tracing::{error, info};
7
8use crate::{
9 errors::IndexerError, metrics::IndexerMetrics, store::IndexerAnalyticalStore,
10 types::IndexerResult,
11};
12
13const MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 10;
14const MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 80000;
15const NETWORK_METRICS_PROCESSOR_PARALLELISM: usize = 1;
16
17pub struct NetworkMetricsProcessor<S> {
18 pub store: S,
19 metrics: IndexerMetrics,
20 pub min_network_metrics_processor_batch_size: usize,
21 pub max_network_metrics_processor_batch_size: usize,
22 pub network_metrics_processor_parallelism: usize,
23}
24
25impl<S> NetworkMetricsProcessor<S>
26where
27 S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
28{
29 pub fn new(store: S, metrics: IndexerMetrics) -> NetworkMetricsProcessor<S> {
30 let min_network_metrics_processor_batch_size =
31 std::env::var("MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE")
32 .map(|s| {
33 s.parse::<usize>()
34 .unwrap_or(MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE)
35 })
36 .unwrap_or(MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE);
37 let max_network_metrics_processor_batch_size =
38 std::env::var("MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE")
39 .map(|s| {
40 s.parse::<usize>()
41 .unwrap_or(MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE)
42 })
43 .unwrap_or(MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE);
44 let network_metrics_processor_parallelism =
45 std::env::var("NETWORK_METRICS_PROCESSOR_PARALLELISM")
46 .map(|s| {
47 s.parse::<usize>()
48 .unwrap_or(NETWORK_METRICS_PROCESSOR_PARALLELISM)
49 })
50 .unwrap_or(NETWORK_METRICS_PROCESSOR_PARALLELISM);
51 Self {
52 store,
53 metrics,
54 min_network_metrics_processor_batch_size,
55 max_network_metrics_processor_batch_size,
56 network_metrics_processor_parallelism,
57 }
58 }
59
60 pub async fn start(&self) -> IndexerResult<()> {
61 info!("Indexer network metrics async processor started...");
62 let latest_tx_count_metrics = self
63 .store
64 .get_latest_tx_count_metrics()
65 .await
66 .unwrap_or_default();
67 let latest_epoch_peak_tps = self
68 .store
69 .get_latest_epoch_peak_tps()
70 .await
71 .unwrap_or_default();
72 let mut last_processed_cp_seq = latest_tx_count_metrics
73 .unwrap_or_default()
74 .checkpoint_sequence_number;
75 let mut last_processed_peak_tps_epoch = latest_epoch_peak_tps.unwrap_or_default().epoch;
76
77 loop {
78 let latest_stored_checkpoint = loop {
79 if let Some(latest_stored_checkpoint) =
80 self.store.get_latest_stored_checkpoint().await?
81 {
82 if latest_stored_checkpoint.sequence_number
83 >= last_processed_cp_seq
84 + self.min_network_metrics_processor_batch_size as i64
85 {
86 break latest_stored_checkpoint;
87 }
88 }
89 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
90 };
91
92 let available_checkpoints =
93 latest_stored_checkpoint.sequence_number - last_processed_cp_seq;
94 let batch_size =
95 available_checkpoints.min(self.max_network_metrics_processor_batch_size as i64);
96
97 info!(
98 "Preparing tx count metrics for checkpoints [{}-{}]",
99 last_processed_cp_seq + 1,
100 last_processed_cp_seq + batch_size
101 );
102
103 let step_size =
104 (batch_size as usize / self.network_metrics_processor_parallelism).max(1);
105 let mut persist_tasks = vec![];
106
107 for chunk_start_cp in
108 (last_processed_cp_seq + 1..=last_processed_cp_seq + batch_size).step_by(step_size)
109 {
110 let chunk_end_cp =
111 (chunk_start_cp + step_size as i64).min(last_processed_cp_seq + batch_size + 1);
112
113 let store = self.store.clone();
114 persist_tasks.push(tokio::task::spawn_blocking(move || {
115 store.persist_tx_count_metrics(chunk_start_cp, chunk_end_cp)
116 }));
117 }
118
119 futures::future::join_all(persist_tasks)
120 .await
121 .into_iter()
122 .collect::<Result<Vec<_>, _>>()
123 .tap_err(|e| error!("Error joining network persist tasks: {:?}", e))?
124 .into_iter()
125 .collect::<Result<Vec<_>, _>>()
126 .tap_err(|e| error!("Error persisting tx count metrics: {:?}", e))?;
127
128 last_processed_cp_seq += batch_size;
129
130 self.metrics
131 .latest_network_metrics_cp_seq
132 .set(last_processed_cp_seq);
133
134 let end_cp = self
135 .store
136 .get_checkpoints_in_range(last_processed_cp_seq, last_processed_cp_seq + 1)
137 .await?
138 .first()
139 .ok_or(IndexerError::PostgresRead(
140 "Cannot read checkpoint from PG for epoch peak TPS".to_string(),
141 ))?
142 .clone();
143 for epoch in last_processed_peak_tps_epoch + 1..end_cp.epoch {
144 self.store.persist_epoch_peak_tps(epoch).await?;
145 last_processed_peak_tps_epoch = epoch;
146 info!("Persisted epoch peak TPS for epoch {}", epoch);
147 }
148 }
149 }
150}