iota_core/epoch/
committee_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    path::{Path, PathBuf},
8    sync::Arc,
9};
10
11use iota_macros::nondeterministic;
12use iota_types::{
13    base_types::ObjectID,
14    committee::{Committee, EpochId},
15    error::{IotaError, IotaResult},
16};
17use parking_lot::RwLock;
18use typed_store::{
19    DBMapUtils, Map,
20    rocks::{DBMap, DBOptions, MetricConf, default_db_options},
21    rocksdb::Options,
22    traits::{TableSummary, TypedStoreDebug},
23};
24
25pub struct CommitteeStore {
26    tables: CommitteeStoreTables,
27    cache: RwLock<HashMap<EpochId, Arc<Committee>>>,
28}
29
30#[derive(DBMapUtils)]
31pub struct CommitteeStoreTables {
32    /// Map from each epoch ID to the committee information.
33    #[default_options_override_fn = "committee_table_default_config"]
34    committee_map: DBMap<EpochId, Committee>,
35}
36
37// These functions are used to initialize the DB tables
38fn committee_table_default_config() -> DBOptions {
39    default_db_options().optimize_for_point_lookup(64)
40}
41
42impl CommitteeStore {
43    pub fn new(path: PathBuf, genesis_committee: &Committee, db_options: Option<Options>) -> Self {
44        let tables = CommitteeStoreTables::open_tables_read_write(
45            path,
46            MetricConf::new("committee"),
47            db_options,
48            None,
49        );
50        let store = Self {
51            tables,
52            cache: RwLock::new(HashMap::new()),
53        };
54        if store
55            .database_is_empty()
56            .expect("CommitteeStore initialization failed")
57        {
58            store
59                .init_genesis_committee(genesis_committee.clone())
60                .expect("Init genesis committee data must not fail");
61        }
62        store
63    }
64
65    pub fn new_for_testing(genesis_committee: &Committee) -> Self {
66        let dir = std::env::temp_dir();
67        let path = dir.join(format!("DB_{:?}", nondeterministic!(ObjectID::random())));
68        Self::new(path, genesis_committee, None)
69    }
70
71    pub fn init_genesis_committee(&self, genesis_committee: Committee) -> IotaResult {
72        assert_eq!(genesis_committee.epoch, 0);
73        self.tables.committee_map.insert(&0, &genesis_committee)?;
74        self.cache.write().insert(0, Arc::new(genesis_committee));
75        Ok(())
76    }
77
78    pub fn insert_new_committee(&self, new_committee: &Committee) -> IotaResult {
79        if let Some(old_committee) = self.get_committee(&new_committee.epoch)? {
80            // If somehow we already have this committee in the store, they must be the
81            // same.
82            assert_eq!(&*old_committee, new_committee);
83        } else {
84            self.tables
85                .committee_map
86                .insert(&new_committee.epoch, new_committee)?;
87            self.cache
88                .write()
89                .insert(new_committee.epoch, Arc::new(new_committee.clone()));
90        }
91        Ok(())
92    }
93
94    pub fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Option<Arc<Committee>>> {
95        if let Some(committee) = self.cache.read().get(epoch_id) {
96            return Ok(Some(committee.clone()));
97        }
98        let committee = self.tables.committee_map.get(epoch_id)?;
99        let committee = committee.map(Arc::new);
100        if let Some(committee) = committee.as_ref() {
101            self.cache.write().insert(*epoch_id, committee.clone());
102        }
103        Ok(committee)
104    }
105
106    // todo - make use of cache or remove this method
107    pub fn get_latest_committee(&self) -> IotaResult<Committee> {
108        Ok(self
109            .tables
110            .committee_map
111            .reversed_safe_iter_with_bounds(None, None)?
112            .next()
113            .transpose()?
114            // unwrap safe because we guarantee there is at least a genesis epoch
115            // when initializing the store.
116            .unwrap()
117            .1)
118    }
119    /// Return the committee specified by `epoch`. If `epoch` is `None`, return
120    /// the latest committee.
121    // todo - make use of cache or remove this method
122    pub fn get_or_latest_committee(&self, epoch: Option<EpochId>) -> IotaResult<Committee> {
123        Ok(match epoch {
124            Some(epoch) => self
125                .get_committee(&epoch)?
126                .ok_or(IotaError::MissingCommitteeAtEpoch(epoch))
127                .map(|c| Committee::clone(&*c))?,
128            None => self.get_latest_committee()?,
129        })
130    }
131
132    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
133        self.tables
134            .committee_map
135            .checkpoint_db(path)
136            .map_err(Into::into)
137    }
138
139    fn database_is_empty(&self) -> IotaResult<bool> {
140        Ok(self
141            .tables
142            .committee_map
143            .safe_iter()
144            .next()
145            .transpose()?
146            .is_none())
147    }
148}