iota_core/epoch/
committee_store.rs1use std::{
6 collections::HashMap,
7 path::{Path, PathBuf},
8 sync::Arc,
9};
10
11use iota_macros::nondeterministic;
12use iota_types::{
13 base_types::ObjectID,
14 committee::{Committee, EpochId},
15 error::{IotaError, IotaResult},
16};
17use parking_lot::RwLock;
18use typed_store::{
19 DBMapUtils, Map,
20 rocks::{DBMap, DBOptions, MetricConf, default_db_options},
21 rocksdb::Options,
22 traits::{TableSummary, TypedStoreDebug},
23};
24
25pub struct CommitteeStore {
26 tables: CommitteeStoreTables,
27 cache: RwLock<HashMap<EpochId, Arc<Committee>>>,
28}
29
30#[derive(DBMapUtils)]
31pub struct CommitteeStoreTables {
32 #[default_options_override_fn = "committee_table_default_config"]
34 committee_map: DBMap<EpochId, Committee>,
35}
36
37fn committee_table_default_config() -> DBOptions {
39 default_db_options().optimize_for_point_lookup(64)
40}
41
42impl CommitteeStore {
43 pub fn new(path: PathBuf, genesis_committee: &Committee, db_options: Option<Options>) -> Self {
44 let tables = CommitteeStoreTables::open_tables_read_write(
45 path,
46 MetricConf::new("committee"),
47 db_options,
48 None,
49 );
50 let store = Self {
51 tables,
52 cache: RwLock::new(HashMap::new()),
53 };
54 if store
55 .database_is_empty()
56 .expect("CommitteeStore initialization failed")
57 {
58 store
59 .init_genesis_committee(genesis_committee.clone())
60 .expect("Init genesis committee data must not fail");
61 }
62 store
63 }
64
65 pub fn new_for_testing(genesis_committee: &Committee) -> Self {
66 let dir = std::env::temp_dir();
67 let path = dir.join(format!("DB_{:?}", nondeterministic!(ObjectID::random())));
68 Self::new(path, genesis_committee, None)
69 }
70
71 pub fn init_genesis_committee(&self, genesis_committee: Committee) -> IotaResult {
72 assert_eq!(genesis_committee.epoch, 0);
73 self.tables.committee_map.insert(&0, &genesis_committee)?;
74 self.cache.write().insert(0, Arc::new(genesis_committee));
75 Ok(())
76 }
77
78 pub fn insert_new_committee(&self, new_committee: &Committee) -> IotaResult {
79 if let Some(old_committee) = self.get_committee(&new_committee.epoch)? {
80 assert_eq!(&*old_committee, new_committee);
83 } else {
84 self.tables
85 .committee_map
86 .insert(&new_committee.epoch, new_committee)?;
87 self.cache
88 .write()
89 .insert(new_committee.epoch, Arc::new(new_committee.clone()));
90 }
91 Ok(())
92 }
93
94 pub fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Option<Arc<Committee>>> {
95 if let Some(committee) = self.cache.read().get(epoch_id) {
96 return Ok(Some(committee.clone()));
97 }
98 let committee = self.tables.committee_map.get(epoch_id)?;
99 let committee = committee.map(Arc::new);
100 if let Some(committee) = committee.as_ref() {
101 self.cache.write().insert(*epoch_id, committee.clone());
102 }
103 Ok(committee)
104 }
105
106 pub fn get_latest_committee(&self) -> IotaResult<Committee> {
108 Ok(self
109 .tables
110 .committee_map
111 .reversed_safe_iter_with_bounds(None, None)?
112 .next()
113 .transpose()?
114 .unwrap()
117 .1)
118 }
119 pub fn get_or_latest_committee(&self, epoch: Option<EpochId>) -> IotaResult<Committee> {
123 Ok(match epoch {
124 Some(epoch) => self
125 .get_committee(&epoch)?
126 .ok_or(IotaError::MissingCommitteeAtEpoch(epoch))
127 .map(|c| Committee::clone(&*c))?,
128 None => self.get_latest_committee()?,
129 })
130 }
131
132 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
133 self.tables
134 .committee_map
135 .checkpoint_db(path)
136 .map_err(Into::into)
137 }
138
139 fn database_is_empty(&self) -> IotaResult<bool> {
140 Ok(self
141 .tables
142 .committee_map
143 .safe_iter()
144 .next()
145 .transpose()?
146 .is_none())
147 }
148}