iota_indexer/processors/
address_metrics_processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use tap::tap::TapFallible;
6use tracing::{error, info};
7
8use crate::{metrics::IndexerMetrics, store::IndexerAnalyticalStore, types::IndexerResult};
9
10const ADDRESS_PROCESSOR_BATCH_SIZE: usize = 80000;
11const PARALLELISM: usize = 10;
12
13pub struct AddressMetricsProcessor<S> {
14    pub store: S,
15    metrics: IndexerMetrics,
16    pub address_processor_batch_size: usize,
17    pub address_processor_parallelism: usize,
18}
19
20impl<S> AddressMetricsProcessor<S>
21where
22    S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
23{
24    pub fn new(store: S, metrics: IndexerMetrics) -> AddressMetricsProcessor<S> {
25        let address_processor_batch_size = std::env::var("ADDRESS_PROCESSOR_BATCH_SIZE")
26            .map(|s| s.parse::<usize>().unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE))
27            .unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE);
28        let address_processor_parallelism = std::env::var("ADDRESS_PROCESSOR_PARALLELISM")
29            .map(|s| s.parse::<usize>().unwrap_or(PARALLELISM))
30            .unwrap_or(PARALLELISM);
31        Self {
32            store,
33            metrics,
34            address_processor_batch_size,
35            address_processor_parallelism,
36        }
37    }
38
39    pub async fn start(&self) -> IndexerResult<()> {
40        info!("Indexer address metrics async processor started...");
41        let latest_tx_seq = self
42            .store
43            .get_address_metrics_last_processed_tx_seq()
44            .await?;
45        let mut last_processed_tx_seq = latest_tx_seq.unwrap_or_default().seq;
46        loop {
47            let mut latest_tx = self.store.get_latest_stored_transaction().await?;
48            while if let Some(tx) = latest_tx {
49                tx.tx_sequence_number
50                    < last_processed_tx_seq + self.address_processor_batch_size as i64
51            } else {
52                true
53            } {
54                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
55                latest_tx = self.store.get_latest_stored_transaction().await?;
56            }
57
58            let mut persist_tasks = vec![];
59            let batch_size = self.address_processor_batch_size;
60            let step_size = batch_size / self.address_processor_parallelism;
61            for chunk_start_tx_seq in (last_processed_tx_seq + 1
62                ..last_processed_tx_seq + batch_size as i64 + 1)
63                .step_by(step_size)
64            {
65                let active_address_store = self.store.clone();
66                persist_tasks.push(tokio::task::spawn_blocking(move || {
67                    active_address_store.persist_active_addresses_in_tx_range(
68                        chunk_start_tx_seq,
69                        chunk_start_tx_seq + step_size as i64,
70                    )
71                }));
72            }
73            for chunk_start_tx_seq in (last_processed_tx_seq + 1
74                ..last_processed_tx_seq + batch_size as i64 + 1)
75                .step_by(step_size)
76            {
77                let address_store = self.store.clone();
78                persist_tasks.push(tokio::task::spawn_blocking(move || {
79                    address_store.persist_addresses_in_tx_range(
80                        chunk_start_tx_seq,
81                        chunk_start_tx_seq + step_size as i64,
82                    )
83                }));
84            }
85            futures::future::join_all(persist_tasks)
86                .await
87                .into_iter()
88                .collect::<Result<Vec<_>, _>>()
89                .tap_err(|e| {
90                    error!("Error joining address persist tasks: {:?}", e);
91                })?
92                .into_iter()
93                .collect::<Result<Vec<_>, _>>()
94                .tap_err(|e| {
95                    error!("Error persisting addresses or active addresses: {:?}", e);
96                })?;
97            last_processed_tx_seq += self.address_processor_batch_size as i64;
98            info!(
99                "Persisted addresses and active addresses for tx seq: {}",
100                last_processed_tx_seq,
101            );
102            self.metrics
103                .latest_address_metrics_tx_seq
104                .set(last_processed_tx_seq);
105
106            let mut last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?;
107            while last_processed_tx.is_none() {
108                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
109                last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?;
110            }
111            // unwrap is safe here b/c we just checked that it's not None
112            let last_processed_cp = last_processed_tx.unwrap().checkpoint_sequence_number;
113            self.store
114                .calculate_and_persist_address_metrics(last_processed_cp)
115                .await?;
116            info!(
117                "Persisted address metrics for checkpoint: {}",
118                last_processed_cp
119            );
120        }
121    }
122}