iota_core/epoch/
committee_store.rs1use std::{
6 collections::HashMap,
7 path::{Path, PathBuf},
8 sync::Arc,
9};
10
11use iota_types::{
12 committee::{Committee, EpochId},
13 error::{IotaError, IotaResult},
14};
15use parking_lot::RwLock;
16use typed_store::{
17 DBMapUtils, Map,
18 rocks::{DBMap, DBOptions, MetricConf, default_db_options},
19 rocksdb::Options,
20};
21
22pub struct CommitteeStore {
23 tables: CommitteeStoreTables,
24 cache: RwLock<HashMap<EpochId, Arc<Committee>>>,
25}
26
27#[derive(DBMapUtils)]
28pub struct CommitteeStoreTables {
29 #[default_options_override_fn = "committee_table_default_config"]
31 committee_map: DBMap<EpochId, Committee>,
32}
33
34fn committee_table_default_config() -> DBOptions {
36 default_db_options().optimize_for_point_lookup(64)
37}
38
39impl CommitteeStore {
40 pub fn new(path: PathBuf, genesis_committee: &Committee, db_options: Option<Options>) -> Self {
41 let tables = CommitteeStoreTables::open_tables_read_write(
42 path,
43 MetricConf::new("committee"),
44 db_options,
45 None,
46 );
47 let store = Self {
48 tables,
49 cache: RwLock::new(HashMap::new()),
50 };
51 if store
52 .database_is_empty()
53 .expect("CommitteeStore initialization failed")
54 {
55 store
56 .init_genesis_committee(genesis_committee.clone())
57 .expect("Init genesis committee data must not fail");
58 }
59 store
60 }
61
62 pub fn new_for_testing(genesis_committee: &Committee) -> Self {
63 let path = iota_common::tempdir().keep();
64 Self::new(path, genesis_committee, None)
65 }
66
67 pub fn init_genesis_committee(&self, genesis_committee: Committee) -> IotaResult {
68 assert_eq!(genesis_committee.epoch, 0);
69 self.tables.committee_map.insert(&0, &genesis_committee)?;
70 self.cache.write().insert(0, Arc::new(genesis_committee));
71 Ok(())
72 }
73
74 pub fn insert_new_committee(&self, new_committee: &Committee) -> IotaResult {
75 if let Some(old_committee) = self.get_committee(&new_committee.epoch)? {
76 assert_eq!(&*old_committee, new_committee);
79 } else {
80 self.tables
81 .committee_map
82 .insert(&new_committee.epoch, new_committee)?;
83 self.cache
84 .write()
85 .insert(new_committee.epoch, Arc::new(new_committee.clone()));
86 }
87 Ok(())
88 }
89
90 pub fn get_committee(&self, epoch_id: &EpochId) -> IotaResult<Option<Arc<Committee>>> {
91 if let Some(committee) = self.cache.read().get(epoch_id) {
92 return Ok(Some(committee.clone()));
93 }
94 let committee = self.tables.committee_map.get(epoch_id)?;
95 let committee = committee.map(Arc::new);
96 if let Some(committee) = committee.as_ref() {
97 self.cache.write().insert(*epoch_id, committee.clone());
98 }
99 Ok(committee)
100 }
101
102 pub fn get_latest_committee(&self) -> IotaResult<Committee> {
104 Ok(self
105 .tables
106 .committee_map
107 .reversed_safe_iter_with_bounds(None, None)?
108 .next()
109 .transpose()?
110 .unwrap()
113 .1)
114 }
115 pub fn get_or_latest_committee(&self, epoch: Option<EpochId>) -> IotaResult<Committee> {
119 Ok(match epoch {
120 Some(epoch) => self
121 .get_committee(&epoch)?
122 .ok_or(IotaError::MissingCommitteeAtEpoch(epoch))
123 .map(|c| Committee::clone(&*c))?,
124 None => self.get_latest_committee()?,
125 })
126 }
127
128 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
129 self.tables
130 .committee_map
131 .checkpoint_db(path)
132 .map_err(Into::into)
133 }
134
135 fn database_is_empty(&self) -> IotaResult<bool> {
136 Ok(self
137 .tables
138 .committee_map
139 .safe_iter()
140 .next()
141 .transpose()?
142 .is_none())
143 }
144}