1use std::{ops::Bound::Included, time::Duration};
6
7use bytes::Bytes;
8use consensus_config::AuthorityIndex;
9use iota_macros::fail_point;
10use typed_store::{
11 Map as _,
12 metrics::SamplingInterval,
13 reopen,
14 rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options, open_cf_opts},
15};
16
17use super::{CommitInfo, Store, WriteBatch};
18use crate::{
19 block::{BlockAPI as _, BlockDigest, BlockRef, Round, SignedBlock, VerifiedBlock},
20 commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
21 error::{ConsensusError, ConsensusResult},
22 metrics::StoredScoringMetricsU64,
23};
24
25pub(crate) struct RocksDBStore {
27 blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
29 digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
31 commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
33 commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
36 commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
38 scoring_metrics: DBMap<AuthorityIndex, StoredScoringMetricsU64>,
40}
41
42impl RocksDBStore {
43 const BLOCKS_CF: &'static str = "blocks";
44 const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
45 const COMMITS_CF: &'static str = "commits";
46 const COMMIT_VOTES_CF: &'static str = "commit_votes";
47 const COMMIT_INFO_CF: &'static str = "commit_info";
48 const SCORING_METRICS_CF: &'static str = "scoring_metrics";
49
50 pub(crate) fn new(path: &str) -> Self {
52 let db_options = default_db_options().optimize_db_for_write_throughput(2);
55 let mut metrics_conf = MetricConf::new("consensus");
56 metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
57 let cf_options = default_db_options().optimize_for_write_throughput().options;
58 let column_family_options = vec![
59 (
60 Self::BLOCKS_CF,
61 default_db_options()
62 .optimize_for_write_throughput_no_deletion()
63 .set_block_options(512, 128 << 10)
65 .options,
66 ),
67 (Self::DIGESTS_BY_AUTHORITIES_CF, cf_options.clone()),
68 (Self::COMMITS_CF, cf_options.clone()),
69 (Self::COMMIT_VOTES_CF, cf_options.clone()),
70 (Self::COMMIT_INFO_CF, cf_options.clone()),
71 (Self::SCORING_METRICS_CF, cf_options.clone()),
72 ];
73 let rocksdb = open_cf_opts(
74 path,
75 Some(db_options.options),
76 metrics_conf,
77 &column_family_options,
78 )
79 .expect("Cannot open database");
80
81 let (blocks, digests_by_authorities, commits, commit_votes, commit_info, scoring_metrics) = reopen!(&rocksdb,
82 Self::BLOCKS_CF;<(Round, AuthorityIndex, BlockDigest), bytes::Bytes>,
83 Self::DIGESTS_BY_AUTHORITIES_CF;<(AuthorityIndex, Round, BlockDigest), ()>,
84 Self::COMMITS_CF;<(CommitIndex, CommitDigest), Bytes>,
85 Self::COMMIT_VOTES_CF;<(CommitIndex, CommitDigest, BlockRef), ()>,
86 Self::COMMIT_INFO_CF;<(CommitIndex, CommitDigest), CommitInfo>,
87 Self::SCORING_METRICS_CF;<AuthorityIndex, StoredScoringMetricsU64>
88 );
89
90 Self {
91 blocks,
92 digests_by_authorities,
93 commits,
94 commit_votes,
95 commit_info,
96 scoring_metrics,
97 }
98 }
99}
100
101impl Store for RocksDBStore {
102 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
103 fail_point!("consensus-store-before-write");
104
105 let mut batch = self.blocks.batch();
106 for block in write_batch.blocks {
107 let block_ref = block.reference();
108 batch
109 .insert_batch(
110 &self.blocks,
111 [(
112 (block_ref.round, block_ref.author, block_ref.digest),
113 block.serialized(),
114 )],
115 )
116 .map_err(ConsensusError::RocksDBFailure)?;
117 batch
118 .insert_batch(
119 &self.digests_by_authorities,
120 [((block_ref.author, block_ref.round, block_ref.digest), ())],
121 )
122 .map_err(ConsensusError::RocksDBFailure)?;
123 for vote in block.commit_votes() {
124 batch
125 .insert_batch(
126 &self.commit_votes,
127 [((vote.index, vote.digest, block_ref), ())],
128 )
129 .map_err(ConsensusError::RocksDBFailure)?;
130 }
131 }
132
133 for commit in write_batch.commits {
134 batch
135 .insert_batch(
136 &self.commits,
137 [((commit.index(), commit.digest()), commit.serialized())],
138 )
139 .map_err(ConsensusError::RocksDBFailure)?;
140 }
141
142 for (commit_ref, commit_info) in write_batch.commit_info {
143 batch
144 .insert_batch(
145 &self.commit_info,
146 [((commit_ref.index, commit_ref.digest), commit_info)],
147 )
148 .map_err(ConsensusError::RocksDBFailure)?;
149 }
150 for (authority, metrics) in write_batch.scoring_metrics {
151 batch
152 .insert_batch(&self.scoring_metrics, [(authority, metrics)])
153 .map_err(ConsensusError::RocksDBFailure)?;
154 }
155 batch.write()?;
156 fail_point!("consensus-store-after-write");
157 Ok(())
158 }
159
160 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
161 let keys = refs
162 .iter()
163 .map(|r| (r.round, r.author, r.digest))
164 .collect::<Vec<_>>();
165 let serialized = self.blocks.multi_get(keys)?;
166 let mut blocks = vec![];
167 for (key, serialized) in refs.iter().zip(serialized) {
168 if let Some(serialized) = serialized {
169 let signed_block: SignedBlock =
170 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
171 let block = VerifiedBlock::new_verified(signed_block, serialized);
173 assert_eq!(*key, block.reference());
175 blocks.push(Some(block));
176 } else {
177 blocks.push(None);
178 }
179 }
180 Ok(blocks)
181 }
182
183 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
184 let refs = refs
185 .iter()
186 .map(|r| (r.round, r.author, r.digest))
187 .collect::<Vec<_>>();
188 let exist = self.blocks.multi_contains_keys(refs)?;
189 Ok(exist)
190 }
191
192 fn contains_block_at_slot(&self, slot: crate::block::Slot) -> ConsensusResult<bool> {
193 let found = self
194 .digests_by_authorities
195 .safe_range_iter((
196 Included((slot.authority, slot.round, BlockDigest::MIN)),
197 Included((slot.authority, slot.round, BlockDigest::MAX)),
198 ))
199 .next()
200 .is_some();
201 Ok(found)
202 }
203
204 fn scan_blocks_by_author(
205 &self,
206 author: AuthorityIndex,
207 start_round: Round,
208 ) -> ConsensusResult<Vec<VerifiedBlock>> {
209 let mut refs = vec![];
210 for kv in self.digests_by_authorities.safe_range_iter((
211 Included((author, start_round, BlockDigest::MIN)),
212 Included((author, Round::MAX, BlockDigest::MAX)),
213 )) {
214 let ((author, round, digest), _) = kv?;
215 refs.push(BlockRef::new(round, author, digest));
216 }
217 let results = self.read_blocks(refs.as_slice())?;
218 let mut blocks = Vec::with_capacity(refs.len());
219 for (r, block) in refs.into_iter().zip(results.into_iter()) {
220 blocks.push(
221 block.unwrap_or_else(|| panic!("Storage inconsistency: block {r:?} not found!")),
222 );
223 }
224 Ok(blocks)
225 }
226
227 fn scan_metrics(&self) -> ConsensusResult<Vec<(AuthorityIndex, StoredScoringMetricsU64)>> {
228 let mut metrics_by_author = vec![];
229 for kv in self.scoring_metrics.safe_iter() {
230 metrics_by_author.push(kv?);
231 }
232 Ok(metrics_by_author)
233 }
234
235 fn scan_last_blocks_by_author(
240 &self,
241 author: AuthorityIndex,
242 num_of_rounds: u64,
243 before_round: Option<Round>,
244 ) -> ConsensusResult<Vec<VerifiedBlock>> {
245 let before_round = before_round.unwrap_or(Round::MAX);
246 let mut refs = std::collections::VecDeque::new();
247 for kv in self
248 .digests_by_authorities
249 .reversed_safe_iter_with_bounds(
250 Some((author, Round::MIN, BlockDigest::MIN)),
251 Some((author, before_round, BlockDigest::MAX)),
252 )?
253 .take(num_of_rounds as usize)
254 {
255 let ((author, round, digest), _) = kv?;
256 refs.push_front(BlockRef::new(round, author, digest));
257 }
258 let results = self.read_blocks(refs.as_slices().0)?;
259 let mut blocks = vec![];
260 for (r, block) in refs.into_iter().zip(results.into_iter()) {
261 blocks.push(
262 block.unwrap_or_else(|| panic!("Storage inconsistency: block {r:?} not found!")),
263 );
264 }
265 Ok(blocks)
266 }
267
268 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
269 let Some(result) = self
270 .commits
271 .reversed_safe_iter_with_bounds(None, None)?
272 .next()
273 else {
274 return Ok(None);
275 };
276 let ((_index, digest), serialized) = result?;
277 let commit = TrustedCommit::new_trusted(
278 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
279 serialized,
280 );
281 assert_eq!(commit.digest(), digest);
282 Ok(Some(commit))
283 }
284
285 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
286 let mut commits = vec![];
287 for result in self.commits.safe_range_iter((
288 Included((range.start(), CommitDigest::MIN)),
289 Included((range.end(), CommitDigest::MAX)),
290 )) {
291 let ((_index, digest), serialized) = result?;
292 let commit = TrustedCommit::new_trusted(
293 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
294 serialized,
295 );
296 assert_eq!(commit.digest(), digest);
297 commits.push(commit);
298 }
299 Ok(commits)
300 }
301
302 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
303 let mut votes = Vec::new();
304 for vote in self.commit_votes.safe_range_iter((
305 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
306 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
307 )) {
308 let ((_, _, block_ref), _) = vote?;
309 votes.push(block_ref);
310 }
311 Ok(votes)
312 }
313
314 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
315 let Some(result) = self
316 .commit_info
317 .reversed_safe_iter_with_bounds(None, None)?
318 .next()
319 else {
320 return Ok(None);
321 };
322 let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
323 Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
324 }
325}