iota_indexer/processors/
processor_orchestrator.rs1use futures::future::try_join_all;
6use tracing::{error, info};
7
8use super::{
9 address_metrics_processor::AddressMetricsProcessor,
10 move_call_metrics_processor::MoveCallMetricsProcessor,
11 network_metrics_processor::NetworkMetricsProcessor,
12};
13use crate::{metrics::IndexerMetrics, store::IndexerAnalyticalStore};
14
15pub struct ProcessorOrchestrator<S> {
16 store: S,
17 metrics: IndexerMetrics,
18}
19
20impl<S> ProcessorOrchestrator<S>
21where
22 S: IndexerAnalyticalStore + Clone + Send + Sync + 'static,
23{
24 pub fn new(store: S, metrics: IndexerMetrics) -> Self {
25 Self { store, metrics }
26 }
27
28 pub async fn run_forever(&mut self) {
29 info!("Processor orchestrator started...");
30 let network_metrics_processor =
31 NetworkMetricsProcessor::new(self.store.clone(), self.metrics.clone());
32 let network_metrics_handle = tokio::task::spawn(async move {
33 loop {
34 let network_metrics_res = network_metrics_processor.start().await;
35 if let Err(e) = network_metrics_res {
36 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
37 error!(
38 "Indexer network metrics processor failed with error {:?}, retrying in 5s...",
39 e
40 );
41 }
42 }
43 });
44
45 let addr_metrics_processor =
46 AddressMetricsProcessor::new(self.store.clone(), self.metrics.clone());
47 let addr_metrics_handle = tokio::task::spawn(async move {
48 loop {
49 let addr_metrics_res = addr_metrics_processor.start().await;
50 if let Err(e) = addr_metrics_res {
51 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
52 error!(
53 "Indexer address metrics processor failed with error {:?}, retrying in 5s...",
54 e
55 );
56 }
57 }
58 });
59
60 let move_call_metrics_processor =
61 MoveCallMetricsProcessor::new(self.store.clone(), self.metrics.clone());
62 let move_call_metrics_handle = tokio::task::spawn(async move {
63 loop {
64 let move_call_metrics_res = move_call_metrics_processor.start().await;
65 if let Err(e) = move_call_metrics_res {
66 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
67 error!(
68 "Indexer move call metrics processor failed with error {:?}, retrying in 5s...",
69 e
70 );
71 }
72 }
73 });
74
75 try_join_all(vec![
76 network_metrics_handle,
77 addr_metrics_handle,
78 move_call_metrics_handle,
79 ])
80 .await
81 .expect("Processor orchestrator should not run into errors.");
82 }
83}