iota_core/quorum_driver/
reconfig_observer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use iota_types::iota_system_state::{
9    IotaSystemState, IotaSystemStateTrait,
10    epoch_start_iota_system_state::EpochStartSystemStateTrait,
11};
12use tokio::sync::broadcast::error::RecvError;
13use tracing::{info, warn};
14
15use super::QuorumDriver;
16use crate::{
17    authority_aggregator::AuthAggMetrics,
18    authority_client::{AuthorityAPI, NetworkAuthorityClient},
19    epoch::committee_store::CommitteeStore,
20    execution_cache::ObjectCacheRead,
21    safe_client::SafeClientMetricsBase,
22};
23
24#[async_trait]
25pub trait ReconfigObserver<A: Clone> {
26    async fn run(&mut self, quorum_driver: Arc<QuorumDriver<A>>);
27    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync>;
28}
29
30/// A ReconfigObserver that subscribes to a reconfig channel of new committee.
31/// This is used in TransactionOrchestrator.
32pub struct OnsiteReconfigObserver {
33    reconfig_rx: tokio::sync::broadcast::Receiver<IotaSystemState>,
34    execution_cache: Arc<dyn ObjectCacheRead>,
35    committee_store: Arc<CommitteeStore>,
36    // TODO: Use Arc for both metrics.
37    safe_client_metrics_base: SafeClientMetricsBase,
38    auth_agg_metrics: AuthAggMetrics,
39}
40
41impl OnsiteReconfigObserver {
42    pub fn new(
43        reconfig_rx: tokio::sync::broadcast::Receiver<IotaSystemState>,
44        execution_cache: Arc<dyn ObjectCacheRead>,
45        committee_store: Arc<CommitteeStore>,
46        safe_client_metrics_base: SafeClientMetricsBase,
47        auth_agg_metrics: AuthAggMetrics,
48    ) -> Self {
49        Self {
50            reconfig_rx,
51            execution_cache,
52            committee_store,
53            safe_client_metrics_base,
54            auth_agg_metrics,
55        }
56    }
57}
58
59#[async_trait]
60impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
61    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<NetworkAuthorityClient> + Send + Sync> {
62        Box::new(Self {
63            reconfig_rx: self.reconfig_rx.resubscribe(),
64            execution_cache: self.execution_cache.clone(),
65            committee_store: self.committee_store.clone(),
66            safe_client_metrics_base: self.safe_client_metrics_base.clone(),
67            auth_agg_metrics: self.auth_agg_metrics.clone(),
68        })
69    }
70
71    async fn run(&mut self, quorum_driver: Arc<QuorumDriver<NetworkAuthorityClient>>) {
72        loop {
73            match self.reconfig_rx.recv().await {
74                Ok(system_state) => {
75                    let epoch_start_state = system_state.into_epoch_start_state();
76                    let committee = epoch_start_state.get_iota_committee();
77                    info!("Got reconfig message. New committee: {}", committee);
78                    if committee.epoch() > quorum_driver.current_epoch() {
79                        let new_auth_agg = quorum_driver
80                            .authority_aggregator()
81                            .load()
82                            .recreate_with_new_epoch_start_state(&epoch_start_state);
83                        quorum_driver
84                            .update_validators(Arc::new(new_auth_agg))
85                            .await;
86                    } else {
87                        // This should only happen when the node just starts
88                        warn!("Epoch number decreased - ignoring committee: {}", committee);
89                    }
90                }
91                // It's ok to miss messages due to overflow here
92                Err(RecvError::Lagged(_)) => {
93                    continue;
94                }
95                Err(RecvError::Closed) => {
96                    // Closing the channel only happens in simtest when a node is shut down.
97                    if cfg!(msim) {
98                        return;
99                    } else {
100                        panic!("Do not expect the channel to be closed")
101                    }
102                }
103            }
104        }
105    }
106}
107/// A dummy ReconfigObserver for testing.
108pub struct DummyReconfigObserver;
109
110#[async_trait]
111impl<A> ReconfigObserver<A> for DummyReconfigObserver
112where
113    A: AuthorityAPI + Send + Sync + Clone + 'static,
114{
115    fn clone_boxed(&self) -> Box<dyn ReconfigObserver<A> + Send + Sync> {
116        Box::new(Self {})
117    }
118
119    async fn run(&mut self, _quorum_driver: Arc<QuorumDriver<A>>) {}
120}