iota_core/epoch/
committee_store.rs1use std::{
6 collections::HashMap,
7 path::{Path, PathBuf},
8 sync::Arc,
9};
10
11use iota_macros::nondeterministic;
12use iota_types::{
13 base_types::ObjectID,
14 committee::{Committee, EpochId},
15 error::{IotaError, IotaResult},
16};
17use parking_lot::RwLock;
18use typed_store::{
19 DBMapUtils, Map,
20 rocks::{DBMap, DBOptions, MetricConf, default_db_options},
21 rocksdb::Options,
22 traits::{TableSummary, TypedStoreDebug},
23};
24
25pub struct CommitteeStore {
26 tables: CommitteeStoreTables,
27 cache: RwLock<HashMap<EpochId, Arc<Committee>>>,
28}
29
30#[derive(DBMapUtils)]
31pub struct CommitteeStoreTables {
32 #[default_options_override_fn = "committee_table_default_config"]
34 committee_map: DBMap<EpochId, Committee>,
35}
36
37fn committee_table_default_config() -> DBOptions {
39 default_db_options().optimize_for_point_lookup(64)
40}
41
42impl CommitteeStore {
43 pub fn new(path: PathBuf, genesis_committee: &Committee, db_options: Option<Options>) -> Self {
44 let tables = CommitteeStoreTables::open_tables_read_write(
45 path,
46 MetricConf::new("committee"),
47 db_options,
48 None,
49 );
50 let store = Self {
51 tables,
52 cache: RwLock::new(HashMap::new()),
53 };
54 if store.database_is_empty() {
55 store
56 .init_genesis_committee(genesis_committee.clone())
57 .expect("Init genesis committee data must not fail");
58 }
59 store
60 }
61
62 pub fn new_for_testing(genesis_committee: &Committee) -> Self {
63 let dir = std::env::temp_dir();
64 let path = dir.join(format!("DB_{:?}", nondeterministic!(ObjectID::random())));
65 Self::new(path, genesis_committee, None)
66 }
67
68 pub fn init_genesis_committee(&self, genesis_committee: Committee) -> IotaResult {
69 assert_eq!(genesis_committee.epoch, 0);
70 self.tables.committee_map.insert(&0, &genesis_committee)?;
71 self.cache.write().insert(0, Arc::new(genesis_committee));
72 Ok(())
73 }
74
75 pub fn insert_new_committee(&self, new_committee: &Committee) -> IotaResult {
76 if let Some(old_committee) = self.get_committee(&new_committee.epoch)? {
77 assert_eq!(&*old_committee, new_committee);
80 } else {
81 self.tables
82 .committee_map
83 .insert(&new_committee.epoch, new_committee)?;
84 self.cache
85 .write()
86 .insert(new_committee.epoch, Arc::new(new_committee.clone()));
87 }
88 Ok(())
89 }
90
91 pub fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Option<Arc<Committee>>> {
92 if let Some(committee) = self.cache.read().get(epoch_id) {
93 return Ok(Some(committee.clone()));
94 }
95 let committee = self.tables.committee_map.get(epoch_id)?;
96 let committee = committee.map(Arc::new);
97 if let Some(committee) = committee.as_ref() {
98 self.cache.write().insert(*epoch_id, committee.clone());
99 }
100 Ok(committee)
101 }
102
103 pub fn get_latest_committee(&self) -> Committee {
105 self.tables
106 .committee_map
107 .unbounded_iter()
108 .skip_to_last()
109 .next()
110 .unwrap()
113 .1
114 }
115 pub fn get_or_latest_committee(&self, epoch: Option<EpochId>) -> IotaResult<Committee> {
119 Ok(match epoch {
120 Some(epoch) => self
121 .get_committee(&epoch)?
122 .ok_or(IotaError::MissingCommitteeAtEpoch(epoch))
123 .map(|c| Committee::clone(&*c))?,
124 None => self.get_latest_committee(),
125 })
126 }
127
128 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
129 self.tables
130 .committee_map
131 .checkpoint_db(path)
132 .map_err(Into::into)
133 }
134
135 fn database_is_empty(&self) -> bool {
136 self.tables.committee_map.unbounded_iter().next().is_none()
137 }
138}