iota_core/epoch/
committee_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    path::{Path, PathBuf},
8    sync::Arc,
9};
10
11use iota_macros::nondeterministic;
12use iota_types::{
13    base_types::ObjectID,
14    committee::{Committee, EpochId},
15    error::{IotaError, IotaResult},
16};
17use parking_lot::RwLock;
18use typed_store::{
19    DBMapUtils, Map,
20    rocks::{DBMap, DBOptions, MetricConf, default_db_options},
21    rocksdb::Options,
22    traits::{TableSummary, TypedStoreDebug},
23};
24
25pub struct CommitteeStore {
26    tables: CommitteeStoreTables,
27    cache: RwLock<HashMap<EpochId, Arc<Committee>>>,
28}
29
30#[derive(DBMapUtils)]
31pub struct CommitteeStoreTables {
32    /// Map from each epoch ID to the committee information.
33    #[default_options_override_fn = "committee_table_default_config"]
34    committee_map: DBMap<EpochId, Committee>,
35}
36
37// These functions are used to initialize the DB tables
38fn committee_table_default_config() -> DBOptions {
39    default_db_options().optimize_for_point_lookup(64)
40}
41
42impl CommitteeStore {
43    pub fn new(path: PathBuf, genesis_committee: &Committee, db_options: Option<Options>) -> Self {
44        let tables = CommitteeStoreTables::open_tables_read_write(
45            path,
46            MetricConf::new("committee"),
47            db_options,
48            None,
49        );
50        let store = Self {
51            tables,
52            cache: RwLock::new(HashMap::new()),
53        };
54        if store.database_is_empty() {
55            store
56                .init_genesis_committee(genesis_committee.clone())
57                .expect("Init genesis committee data must not fail");
58        }
59        store
60    }
61
62    pub fn new_for_testing(genesis_committee: &Committee) -> Self {
63        let dir = std::env::temp_dir();
64        let path = dir.join(format!("DB_{:?}", nondeterministic!(ObjectID::random())));
65        Self::new(path, genesis_committee, None)
66    }
67
68    pub fn init_genesis_committee(&self, genesis_committee: Committee) -> IotaResult {
69        assert_eq!(genesis_committee.epoch, 0);
70        self.tables.committee_map.insert(&0, &genesis_committee)?;
71        self.cache.write().insert(0, Arc::new(genesis_committee));
72        Ok(())
73    }
74
75    pub fn insert_new_committee(&self, new_committee: &Committee) -> IotaResult {
76        if let Some(old_committee) = self.get_committee(&new_committee.epoch)? {
77            // If somehow we already have this committee in the store, they must be the
78            // same.
79            assert_eq!(&*old_committee, new_committee);
80        } else {
81            self.tables
82                .committee_map
83                .insert(&new_committee.epoch, new_committee)?;
84            self.cache
85                .write()
86                .insert(new_committee.epoch, Arc::new(new_committee.clone()));
87        }
88        Ok(())
89    }
90
91    pub fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Option<Arc<Committee>>> {
92        if let Some(committee) = self.cache.read().get(epoch_id) {
93            return Ok(Some(committee.clone()));
94        }
95        let committee = self.tables.committee_map.get(epoch_id)?;
96        let committee = committee.map(Arc::new);
97        if let Some(committee) = committee.as_ref() {
98            self.cache.write().insert(*epoch_id, committee.clone());
99        }
100        Ok(committee)
101    }
102
103    // todo - make use of cache or remove this method
104    pub fn get_latest_committee(&self) -> Committee {
105        self.tables
106            .committee_map
107            .unbounded_iter()
108            .skip_to_last()
109            .next()
110            // unwrap safe because we guarantee there is at least a genesis epoch
111            // when initializing the store.
112            .unwrap()
113            .1
114    }
115    /// Return the committee specified by `epoch`. If `epoch` is `None`, return
116    /// the latest committee.
117    // todo - make use of cache or remove this method
118    pub fn get_or_latest_committee(&self, epoch: Option<EpochId>) -> IotaResult<Committee> {
119        Ok(match epoch {
120            Some(epoch) => self
121                .get_committee(&epoch)?
122                .ok_or(IotaError::MissingCommitteeAtEpoch(epoch))
123                .map(|c| Committee::clone(&*c))?,
124            None => self.get_latest_committee(),
125        })
126    }
127
128    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
129        self.tables
130            .committee_map
131            .checkpoint_db(path)
132            .map_err(Into::into)
133    }
134
135    fn database_is_empty(&self) -> bool {
136        self.tables.committee_map.unbounded_iter().next().is_none()
137    }
138}