iota_indexer/processors/
processor_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use futures::future::try_join_all;
6use tracing::{error, info};
7
8use super::{
9    address_metrics_processor::AddressMetricsProcessor,
10    move_call_metrics_processor::MoveCallMetricsProcessor,
11    network_metrics_processor::NetworkMetricsProcessor,
12};
13use crate::{metrics::IndexerMetrics, store::IndexerAnalyticalStore};
14
15pub struct ProcessorOrchestrator<S> {
16    store: S,
17    metrics: IndexerMetrics,
18}
19
20impl<S> ProcessorOrchestrator<S>
21where
22    S: IndexerAnalyticalStore + Clone + Send + Sync + 'static,
23{
24    pub fn new(store: S, metrics: IndexerMetrics) -> Self {
25        Self { store, metrics }
26    }
27
28    pub async fn run_forever(&mut self) {
29        info!("Processor orchestrator started...");
30        let network_metrics_processor =
31            NetworkMetricsProcessor::new(self.store.clone(), self.metrics.clone());
32        let network_metrics_handle = tokio::task::spawn(async move {
33            loop {
34                let network_metrics_res = network_metrics_processor.start().await;
35                if let Err(e) = network_metrics_res {
36                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
37                    error!(
38                        "indexer network metrics processor failed with error {e:?}, retrying in 5s..."
39                    );
40                }
41            }
42        });
43
44        let addr_metrics_processor =
45            AddressMetricsProcessor::new(self.store.clone(), self.metrics.clone());
46        let addr_metrics_handle = tokio::task::spawn(async move {
47            loop {
48                let addr_metrics_res = addr_metrics_processor.start().await;
49                if let Err(e) = addr_metrics_res {
50                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
51                    error!(
52                        "indexer address metrics processor failed with error {e:?}, retrying in 5s..."
53                    );
54                }
55            }
56        });
57
58        let move_call_metrics_processor =
59            MoveCallMetricsProcessor::new(self.store.clone(), self.metrics.clone());
60        let move_call_metrics_handle = tokio::task::spawn(async move {
61            loop {
62                let move_call_metrics_res = move_call_metrics_processor.start().await;
63                if let Err(e) = move_call_metrics_res {
64                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
65                    error!(
66                        "indexer move call metrics processor failed with error {e:?}, retrying in 5s..."
67                    );
68                }
69            }
70        });
71
72        try_join_all(vec![
73            network_metrics_handle,
74            addr_metrics_handle,
75            move_call_metrics_handle,
76        ])
77        .await
78        .expect("processor orchestrator should not run into errors.");
79    }
80}