iota_core/epoch/
committee_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    path::{Path, PathBuf},
8    sync::Arc,
9};
10
11use iota_types::{
12    committee::{Committee, EpochId},
13    error::{IotaError, IotaResult},
14};
15use parking_lot::RwLock;
16use typed_store::{
17    DBMapUtils, Map,
18    rocks::{DBMap, DBOptions, MetricConf, default_db_options},
19    rocksdb::Options,
20};
21
22pub struct CommitteeStore {
23    tables: CommitteeStoreTables,
24    cache: RwLock<HashMap<EpochId, Arc<Committee>>>,
25}
26
27#[derive(DBMapUtils)]
28pub struct CommitteeStoreTables {
29    /// Map from each epoch ID to the committee information.
30    #[default_options_override_fn = "committee_table_default_config"]
31    committee_map: DBMap<EpochId, Committee>,
32}
33
34// These functions are used to initialize the DB tables
35fn committee_table_default_config() -> DBOptions {
36    default_db_options().optimize_for_point_lookup(64)
37}
38
39impl CommitteeStore {
40    pub fn new(path: PathBuf, genesis_committee: &Committee, db_options: Option<Options>) -> Self {
41        let tables = CommitteeStoreTables::open_tables_read_write(
42            path,
43            MetricConf::new("committee"),
44            db_options,
45            None,
46        );
47        let store = Self {
48            tables,
49            cache: RwLock::new(HashMap::new()),
50        };
51        if store
52            .database_is_empty()
53            .expect("CommitteeStore initialization failed")
54        {
55            store
56                .init_genesis_committee(genesis_committee.clone())
57                .expect("Init genesis committee data must not fail");
58        }
59        store
60    }
61
62    pub fn new_for_testing(genesis_committee: &Committee) -> Self {
63        let path = iota_common::tempdir().keep();
64        Self::new(path, genesis_committee, None)
65    }
66
67    pub fn init_genesis_committee(&self, genesis_committee: Committee) -> IotaResult {
68        assert_eq!(genesis_committee.epoch, 0);
69        self.tables.committee_map.insert(&0, &genesis_committee)?;
70        self.cache.write().insert(0, Arc::new(genesis_committee));
71        Ok(())
72    }
73
74    pub fn insert_new_committee(&self, new_committee: &Committee) -> IotaResult {
75        if let Some(old_committee) = self.get_committee(&new_committee.epoch)? {
76            // If somehow we already have this committee in the store, they must be the
77            // same.
78            assert_eq!(&*old_committee, new_committee);
79        } else {
80            self.tables
81                .committee_map
82                .insert(&new_committee.epoch, new_committee)?;
83            self.cache
84                .write()
85                .insert(new_committee.epoch, Arc::new(new_committee.clone()));
86        }
87        Ok(())
88    }
89
90    pub fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Option<Arc<Committee>>> {
91        if let Some(committee) = self.cache.read().get(epoch_id) {
92            return Ok(Some(committee.clone()));
93        }
94        let committee = self.tables.committee_map.get(epoch_id)?;
95        let committee = committee.map(Arc::new);
96        if let Some(committee) = committee.as_ref() {
97            self.cache.write().insert(*epoch_id, committee.clone());
98        }
99        Ok(committee)
100    }
101
102    // todo - make use of cache or remove this method
103    pub fn get_latest_committee(&self) -> IotaResult<Committee> {
104        Ok(self
105            .tables
106            .committee_map
107            .reversed_safe_iter_with_bounds(None, None)?
108            .next()
109            .transpose()?
110            // unwrap safe because we guarantee there is at least a genesis epoch
111            // when initializing the store.
112            .unwrap()
113            .1)
114    }
115    /// Return the committee specified by `epoch`. If `epoch` is `None`, return
116    /// the latest committee.
117    // todo - make use of cache or remove this method
118    pub fn get_or_latest_committee(&self, epoch: Option<EpochId>) -> IotaResult<Committee> {
119        Ok(match epoch {
120            Some(epoch) => self
121                .get_committee(&epoch)?
122                .ok_or(IotaError::MissingCommitteeAtEpoch(epoch))
123                .map(|c| Committee::clone(&*c))?,
124            None => self.get_latest_committee()?,
125        })
126    }
127
128    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
129        self.tables
130            .committee_map
131            .checkpoint_db(path)
132            .map_err(Into::into)
133    }
134
135    fn database_is_empty(&self) -> IotaResult<bool> {
136        Ok(self
137            .tables
138            .committee_map
139            .safe_iter()
140            .next()
141            .transpose()?
142            .is_none())
143    }
144}