iota_indexer/processors/
network_metrics_processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use tap::tap::TapFallible;
6use tracing::{error, info};
7
8use crate::{
9    errors::IndexerError, metrics::IndexerMetrics, store::IndexerAnalyticalStore,
10    types::IndexerResult,
11};
12
13const MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 10;
14const MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 80000;
15const NETWORK_METRICS_PROCESSOR_PARALLELISM: usize = 1;
16
17pub struct NetworkMetricsProcessor<S> {
18    pub store: S,
19    metrics: IndexerMetrics,
20    pub min_network_metrics_processor_batch_size: usize,
21    pub max_network_metrics_processor_batch_size: usize,
22    pub network_metrics_processor_parallelism: usize,
23}
24
25impl<S> NetworkMetricsProcessor<S>
26where
27    S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
28{
29    pub fn new(store: S, metrics: IndexerMetrics) -> NetworkMetricsProcessor<S> {
30        let min_network_metrics_processor_batch_size =
31            std::env::var("MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE")
32                .map(|s| {
33                    s.parse::<usize>()
34                        .unwrap_or(MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE)
35                })
36                .unwrap_or(MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE);
37        let max_network_metrics_processor_batch_size =
38            std::env::var("MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE")
39                .map(|s| {
40                    s.parse::<usize>()
41                        .unwrap_or(MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE)
42                })
43                .unwrap_or(MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE);
44        let network_metrics_processor_parallelism =
45            std::env::var("NETWORK_METRICS_PROCESSOR_PARALLELISM")
46                .map(|s| {
47                    s.parse::<usize>()
48                        .unwrap_or(NETWORK_METRICS_PROCESSOR_PARALLELISM)
49                })
50                .unwrap_or(NETWORK_METRICS_PROCESSOR_PARALLELISM);
51        Self {
52            store,
53            metrics,
54            min_network_metrics_processor_batch_size,
55            max_network_metrics_processor_batch_size,
56            network_metrics_processor_parallelism,
57        }
58    }
59
60    pub async fn start(&self) -> IndexerResult<()> {
61        info!("Indexer network metrics async processor started...");
62        let latest_tx_count_metrics = self
63            .store
64            .get_latest_tx_count_metrics()
65            .await
66            .unwrap_or_default();
67        let latest_epoch_peak_tps = self
68            .store
69            .get_latest_epoch_peak_tps()
70            .await
71            .unwrap_or_default();
72        let mut last_processed_cp_seq = latest_tx_count_metrics
73            .unwrap_or_default()
74            .checkpoint_sequence_number;
75        let mut last_processed_peak_tps_epoch = latest_epoch_peak_tps.unwrap_or_default().epoch;
76
77        loop {
78            let latest_stored_checkpoint = loop {
79                if let Some(latest_stored_checkpoint) =
80                    self.store.get_latest_stored_checkpoint().await?
81                {
82                    if latest_stored_checkpoint.sequence_number
83                        >= last_processed_cp_seq
84                            + self.min_network_metrics_processor_batch_size as i64
85                    {
86                        break latest_stored_checkpoint;
87                    }
88                }
89                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
90            };
91
92            let available_checkpoints =
93                latest_stored_checkpoint.sequence_number - last_processed_cp_seq;
94            let batch_size =
95                available_checkpoints.min(self.max_network_metrics_processor_batch_size as i64);
96
97            info!(
98                "Preparing tx count metrics for checkpoints [{}-{}]",
99                last_processed_cp_seq + 1,
100                last_processed_cp_seq + batch_size
101            );
102
103            let step_size =
104                (batch_size as usize / self.network_metrics_processor_parallelism).max(1);
105            let mut persist_tasks = vec![];
106
107            for chunk_start_cp in
108                (last_processed_cp_seq + 1..=last_processed_cp_seq + batch_size).step_by(step_size)
109            {
110                let chunk_end_cp =
111                    (chunk_start_cp + step_size as i64).min(last_processed_cp_seq + batch_size + 1);
112
113                let store = self.store.clone();
114                persist_tasks.push(tokio::task::spawn_blocking(move || {
115                    store.persist_tx_count_metrics(chunk_start_cp, chunk_end_cp)
116                }));
117            }
118
119            futures::future::join_all(persist_tasks)
120                .await
121                .into_iter()
122                .collect::<Result<Vec<_>, _>>()
123                .tap_err(|e| error!("Error joining network persist tasks: {:?}", e))?
124                .into_iter()
125                .collect::<Result<Vec<_>, _>>()
126                .tap_err(|e| error!("Error persisting tx count metrics: {:?}", e))?;
127
128            last_processed_cp_seq += batch_size;
129
130            self.metrics
131                .latest_network_metrics_cp_seq
132                .set(last_processed_cp_seq);
133
134            let end_cp = self
135                .store
136                .get_checkpoints_in_range(last_processed_cp_seq, last_processed_cp_seq + 1)
137                .await?
138                .first()
139                .ok_or(IndexerError::PostgresRead(
140                    "Cannot read checkpoint from PG for epoch peak TPS".to_string(),
141                ))?
142                .clone();
143            for epoch in last_processed_peak_tps_epoch + 1..end_cp.epoch {
144                self.store.persist_epoch_peak_tps(epoch).await?;
145                last_processed_peak_tps_epoch = epoch;
146                info!("Persisted epoch peak TPS for epoch {}", epoch);
147            }
148        }
149    }
150}