iota_indexer/processors/
processor_orchestrator.rs1use futures::future::try_join_all;
6use tracing::{error, info};
7
8use super::{
9 address_metrics_processor::AddressMetricsProcessor,
10 move_call_metrics_processor::MoveCallMetricsProcessor,
11 network_metrics_processor::NetworkMetricsProcessor,
12};
13use crate::{metrics::IndexerMetrics, store::IndexerAnalyticalStore};
14
15pub struct ProcessorOrchestrator<S> {
16 store: S,
17 metrics: IndexerMetrics,
18}
19
20impl<S> ProcessorOrchestrator<S>
21where
22 S: IndexerAnalyticalStore + Clone + Send + Sync + 'static,
23{
24 pub fn new(store: S, metrics: IndexerMetrics) -> Self {
25 Self { store, metrics }
26 }
27
28 pub async fn run_forever(&mut self) {
29 info!("Processor orchestrator started...");
30 let network_metrics_processor =
31 NetworkMetricsProcessor::new(self.store.clone(), self.metrics.clone());
32 let network_metrics_handle = tokio::task::spawn(async move {
33 loop {
34 let network_metrics_res = network_metrics_processor.start().await;
35 if let Err(e) = network_metrics_res {
36 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
37 error!(
38 "indexer network metrics processor failed with error {e:?}, retrying in 5s..."
39 );
40 }
41 }
42 });
43
44 let addr_metrics_processor =
45 AddressMetricsProcessor::new(self.store.clone(), self.metrics.clone());
46 let addr_metrics_handle = tokio::task::spawn(async move {
47 loop {
48 let addr_metrics_res = addr_metrics_processor.start().await;
49 if let Err(e) = addr_metrics_res {
50 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
51 error!(
52 "indexer address metrics processor failed with error {e:?}, retrying in 5s..."
53 );
54 }
55 }
56 });
57
58 let move_call_metrics_processor =
59 MoveCallMetricsProcessor::new(self.store.clone(), self.metrics.clone());
60 let move_call_metrics_handle = tokio::task::spawn(async move {
61 loop {
62 let move_call_metrics_res = move_call_metrics_processor.start().await;
63 if let Err(e) = move_call_metrics_res {
64 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
65 error!(
66 "indexer move call metrics processor failed with error {e:?}, retrying in 5s..."
67 );
68 }
69 }
70 });
71
72 try_join_all(vec![
73 network_metrics_handle,
74 addr_metrics_handle,
75 move_call_metrics_handle,
76 ])
77 .await
78 .expect("processor orchestrator should not run into errors.");
79 }
80}