iota_indexer/processors/
processor_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use futures::future::try_join_all;
6use tracing::{error, info};
7
8use super::{
9    address_metrics_processor::AddressMetricsProcessor,
10    move_call_metrics_processor::MoveCallMetricsProcessor,
11    network_metrics_processor::NetworkMetricsProcessor,
12};
13use crate::{metrics::IndexerMetrics, store::IndexerAnalyticalStore};
14
15pub struct ProcessorOrchestrator<S> {
16    store: S,
17    metrics: IndexerMetrics,
18}
19
20impl<S> ProcessorOrchestrator<S>
21where
22    S: IndexerAnalyticalStore + Clone + Send + Sync + 'static,
23{
24    pub fn new(store: S, metrics: IndexerMetrics) -> Self {
25        Self { store, metrics }
26    }
27
28    pub async fn run_forever(&mut self) {
29        info!("Processor orchestrator started...");
30        let network_metrics_processor =
31            NetworkMetricsProcessor::new(self.store.clone(), self.metrics.clone());
32        let network_metrics_handle = tokio::task::spawn(async move {
33            loop {
34                let network_metrics_res = network_metrics_processor.start().await;
35                if let Err(e) = network_metrics_res {
36                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
37                    error!(
38                        "Indexer network metrics processor failed with error {:?}, retrying in 5s...",
39                        e
40                    );
41                }
42            }
43        });
44
45        let addr_metrics_processor =
46            AddressMetricsProcessor::new(self.store.clone(), self.metrics.clone());
47        let addr_metrics_handle = tokio::task::spawn(async move {
48            loop {
49                let addr_metrics_res = addr_metrics_processor.start().await;
50                if let Err(e) = addr_metrics_res {
51                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
52                    error!(
53                        "Indexer address metrics processor failed with error {:?}, retrying in 5s...",
54                        e
55                    );
56                }
57            }
58        });
59
60        let move_call_metrics_processor =
61            MoveCallMetricsProcessor::new(self.store.clone(), self.metrics.clone());
62        let move_call_metrics_handle = tokio::task::spawn(async move {
63            loop {
64                let move_call_metrics_res = move_call_metrics_processor.start().await;
65                if let Err(e) = move_call_metrics_res {
66                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
67                    error!(
68                        "Indexer move call metrics processor failed with error {:?}, retrying in 5s...",
69                        e
70                    );
71                }
72            }
73        });
74
75        try_join_all(vec![
76            network_metrics_handle,
77            addr_metrics_handle,
78            move_call_metrics_handle,
79        ])
80        .await
81        .expect("Processor orchestrator should not run into errors.");
82    }
83}