iota_core/quorum_driver/
reconfig_observer.rs1use std::sync::Arc;
6
7use async_trait::async_trait;
8use iota_types::iota_system_state::{
9 IotaSystemState, IotaSystemStateTrait,
10 epoch_start_iota_system_state::EpochStartSystemStateTrait,
11};
12use tokio::sync::broadcast::error::RecvError;
13use tracing::{info, warn};
14
15use super::QuorumDriver;
16use crate::{
17 authority_aggregator::AuthAggMetrics,
18 authority_client::{AuthorityAPI, NetworkAuthorityClient},
19 epoch::committee_store::CommitteeStore,
20 execution_cache::ObjectCacheRead,
21 safe_client::SafeClientMetricsBase,
22};
23
24#[async_trait]
25pub trait ReconfigObserver<A: Clone> {
26 async fn run(&mut self, quorum_driver: Arc<QuorumDriver<A>>);
27 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync>;
28}
29
30pub struct OnsiteReconfigObserver {
33 reconfig_rx: tokio::sync::broadcast::Receiver<IotaSystemState>,
34 execution_cache: Arc<dyn ObjectCacheRead>,
35 committee_store: Arc<CommitteeStore>,
36 safe_client_metrics_base: SafeClientMetricsBase,
38 auth_agg_metrics: AuthAggMetrics,
39}
40
41impl OnsiteReconfigObserver {
42 pub fn new(
43 reconfig_rx: tokio::sync::broadcast::Receiver<IotaSystemState>,
44 execution_cache: Arc<dyn ObjectCacheRead>,
45 committee_store: Arc<CommitteeStore>,
46 safe_client_metrics_base: SafeClientMetricsBase,
47 auth_agg_metrics: AuthAggMetrics,
48 ) -> Self {
49 Self {
50 reconfig_rx,
51 execution_cache,
52 committee_store,
53 safe_client_metrics_base,
54 auth_agg_metrics,
55 }
56 }
57}
58
59#[async_trait]
60impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
61 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<NetworkAuthorityClient> + Send + Sync> {
62 Box::new(Self {
63 reconfig_rx: self.reconfig_rx.resubscribe(),
64 execution_cache: self.execution_cache.clone(),
65 committee_store: self.committee_store.clone(),
66 safe_client_metrics_base: self.safe_client_metrics_base.clone(),
67 auth_agg_metrics: self.auth_agg_metrics.clone(),
68 })
69 }
70
71 async fn run(&mut self, quorum_driver: Arc<QuorumDriver<NetworkAuthorityClient>>) {
72 loop {
73 match self.reconfig_rx.recv().await {
74 Ok(system_state) => {
75 let epoch_start_state = system_state.into_epoch_start_state();
76 let committee = epoch_start_state.get_iota_committee();
77 info!("Got reconfig message. New committee: {}", committee);
78 if committee.epoch() > quorum_driver.current_epoch() {
79 let new_auth_agg = quorum_driver
80 .authority_aggregator()
81 .load()
82 .recreate_with_new_epoch_start_state(&epoch_start_state);
83 quorum_driver
84 .update_validators(Arc::new(new_auth_agg))
85 .await;
86 } else {
87 warn!("Epoch number decreased - ignoring committee: {}", committee);
89 }
90 }
91 Err(RecvError::Lagged(_)) => {
93 continue;
94 }
95 Err(RecvError::Closed) => {
96 if cfg!(msim) {
98 return;
99 } else {
100 panic!("Do not expect the channel to be closed")
101 }
102 }
103 }
104 }
105 }
106}
107pub struct DummyReconfigObserver;
109
110#[async_trait]
111impl<A> ReconfigObserver<A> for DummyReconfigObserver
112where
113 A: AuthorityAPI + Send + Sync + Clone + 'static,
114{
115 fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync> {
116 Box::new(Self {})
117 }
118
119 async fn run(&mut self, _quorum_driver: Arc<QuorumDriver<A>>) {}
120}