1use std::{ops::Bound::Included, time::Duration};
6
7use bytes::Bytes;
8use consensus_config::AuthorityIndex;
9use iota_macros::fail_point;
10use typed_store::{
11 Map as _,
12 metrics::SamplingInterval,
13 reopen,
14 rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options, open_cf_opts},
15};
16
17use super::{CommitInfo, Store, WriteBatch};
18use crate::{
19 block::{BlockAPI as _, BlockDigest, BlockRef, Round, SignedBlock, VerifiedBlock},
20 commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
21 error::{ConsensusError, ConsensusResult},
22};
23
24pub(crate) struct RocksDBStore {
26 blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
28 digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
30 commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
32 commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
35 commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
37}
38
39impl RocksDBStore {
40 const BLOCKS_CF: &'static str = "blocks";
41 const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
42 const COMMITS_CF: &'static str = "commits";
43 const COMMIT_VOTES_CF: &'static str = "commit_votes";
44 const COMMIT_INFO_CF: &'static str = "commit_info";
45
46 pub(crate) fn new(path: &str) -> Self {
48 let db_options = default_db_options().optimize_db_for_write_throughput(2);
51 let mut metrics_conf = MetricConf::new("consensus");
52 metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
53 let cf_options = default_db_options().optimize_for_write_throughput().options;
54 let column_family_options = vec![
55 (
56 Self::BLOCKS_CF,
57 default_db_options()
58 .optimize_for_write_throughput_no_deletion()
59 .set_block_options(512, 128 << 10)
61 .options,
62 ),
63 (Self::DIGESTS_BY_AUTHORITIES_CF, cf_options.clone()),
64 (Self::COMMITS_CF, cf_options.clone()),
65 (Self::COMMIT_VOTES_CF, cf_options.clone()),
66 (Self::COMMIT_INFO_CF, cf_options.clone()),
67 ];
68 let rocksdb = open_cf_opts(
69 path,
70 Some(db_options.options),
71 metrics_conf,
72 &column_family_options,
73 )
74 .expect("Cannot open database");
75
76 let (blocks, digests_by_authorities, commits, commit_votes, commit_info) = reopen!(&rocksdb,
77 Self::BLOCKS_CF;<(Round, AuthorityIndex, BlockDigest), bytes::Bytes>,
78 Self::DIGESTS_BY_AUTHORITIES_CF;<(AuthorityIndex, Round, BlockDigest), ()>,
79 Self::COMMITS_CF;<(CommitIndex, CommitDigest), Bytes>,
80 Self::COMMIT_VOTES_CF;<(CommitIndex, CommitDigest, BlockRef), ()>,
81 Self::COMMIT_INFO_CF;<(CommitIndex, CommitDigest), CommitInfo>
82 );
83
84 Self {
85 blocks,
86 digests_by_authorities,
87 commits,
88 commit_votes,
89 commit_info,
90 }
91 }
92}
93
94impl Store for RocksDBStore {
95 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
96 fail_point!("consensus-store-before-write");
97
98 let mut batch = self.blocks.batch();
99 for block in write_batch.blocks {
100 let block_ref = block.reference();
101 batch
102 .insert_batch(
103 &self.blocks,
104 [(
105 (block_ref.round, block_ref.author, block_ref.digest),
106 block.serialized(),
107 )],
108 )
109 .map_err(ConsensusError::RocksDBFailure)?;
110 batch
111 .insert_batch(
112 &self.digests_by_authorities,
113 [((block_ref.author, block_ref.round, block_ref.digest), ())],
114 )
115 .map_err(ConsensusError::RocksDBFailure)?;
116 for vote in block.commit_votes() {
117 batch
118 .insert_batch(
119 &self.commit_votes,
120 [((vote.index, vote.digest, block_ref), ())],
121 )
122 .map_err(ConsensusError::RocksDBFailure)?;
123 }
124 }
125
126 for commit in write_batch.commits {
127 batch
128 .insert_batch(
129 &self.commits,
130 [((commit.index(), commit.digest()), commit.serialized())],
131 )
132 .map_err(ConsensusError::RocksDBFailure)?;
133 }
134
135 for (commit_ref, commit_info) in write_batch.commit_info {
136 batch
137 .insert_batch(
138 &self.commit_info,
139 [((commit_ref.index, commit_ref.digest), commit_info)],
140 )
141 .map_err(ConsensusError::RocksDBFailure)?;
142 }
143
144 batch.write()?;
145 fail_point!("consensus-store-after-write");
146 Ok(())
147 }
148
149 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
150 let keys = refs
151 .iter()
152 .map(|r| (r.round, r.author, r.digest))
153 .collect::<Vec<_>>();
154 let serialized = self.blocks.multi_get(keys)?;
155 let mut blocks = vec![];
156 for (key, serialized) in refs.iter().zip(serialized) {
157 if let Some(serialized) = serialized {
158 let signed_block: SignedBlock =
159 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
160 let block = VerifiedBlock::new_verified(signed_block, serialized);
162 assert_eq!(*key, block.reference());
164 blocks.push(Some(block));
165 } else {
166 blocks.push(None);
167 }
168 }
169 Ok(blocks)
170 }
171
172 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
173 let refs = refs
174 .iter()
175 .map(|r| (r.round, r.author, r.digest))
176 .collect::<Vec<_>>();
177 let exist = self.blocks.multi_contains_keys(refs)?;
178 Ok(exist)
179 }
180
181 fn contains_block_at_slot(&self, slot: crate::block::Slot) -> ConsensusResult<bool> {
182 let found = self
183 .digests_by_authorities
184 .safe_range_iter((
185 Included((slot.authority, slot.round, BlockDigest::MIN)),
186 Included((slot.authority, slot.round, BlockDigest::MAX)),
187 ))
188 .next()
189 .is_some();
190 Ok(found)
191 }
192
193 fn scan_blocks_by_author(
194 &self,
195 author: AuthorityIndex,
196 start_round: Round,
197 ) -> ConsensusResult<Vec<VerifiedBlock>> {
198 let mut refs = vec![];
199 for kv in self.digests_by_authorities.safe_range_iter((
200 Included((author, start_round, BlockDigest::MIN)),
201 Included((author, Round::MAX, BlockDigest::MAX)),
202 )) {
203 let ((author, round, digest), _) = kv?;
204 refs.push(BlockRef::new(round, author, digest));
205 }
206 let results = self.read_blocks(refs.as_slice())?;
207 let mut blocks = Vec::with_capacity(refs.len());
208 for (r, block) in refs.into_iter().zip(results.into_iter()) {
209 blocks.push(
210 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
211 );
212 }
213 Ok(blocks)
214 }
215
216 fn scan_last_blocks_by_author(
221 &self,
222 author: AuthorityIndex,
223 num_of_rounds: u64,
224 before_round: Option<Round>,
225 ) -> ConsensusResult<Vec<VerifiedBlock>> {
226 let before_round = before_round.unwrap_or(Round::MAX);
227 let mut refs = std::collections::VecDeque::new();
228 for kv in self
229 .digests_by_authorities
230 .safe_range_iter((
231 Included((author, Round::MIN, BlockDigest::MIN)),
232 Included((author, before_round, BlockDigest::MAX)),
233 ))
234 .skip_to_last()
235 .reverse()
236 .take(num_of_rounds as usize)
237 {
238 let ((author, round, digest), _) = kv?;
239 refs.push_front(BlockRef::new(round, author, digest));
240 }
241 let results = self.read_blocks(refs.as_slices().0)?;
242 let mut blocks = vec![];
243 for (r, block) in refs.into_iter().zip(results.into_iter()) {
244 blocks.push(
245 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
246 );
247 }
248 Ok(blocks)
249 }
250
251 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
252 let Some(result) = self.commits.safe_iter().skip_to_last().next() else {
253 return Ok(None);
254 };
255 let ((_index, digest), serialized) = result?;
256 let commit = TrustedCommit::new_trusted(
257 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
258 serialized,
259 );
260 assert_eq!(commit.digest(), digest);
261 Ok(Some(commit))
262 }
263
264 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
265 let mut commits = vec![];
266 for result in self.commits.safe_range_iter((
267 Included((range.start(), CommitDigest::MIN)),
268 Included((range.end(), CommitDigest::MAX)),
269 )) {
270 let ((_index, digest), serialized) = result?;
271 let commit = TrustedCommit::new_trusted(
272 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
273 serialized,
274 );
275 assert_eq!(commit.digest(), digest);
276 commits.push(commit);
277 }
278 Ok(commits)
279 }
280
281 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
282 let mut votes = Vec::new();
283 for vote in self.commit_votes.safe_range_iter((
284 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
285 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
286 )) {
287 let ((_, _, block_ref), _) = vote?;
288 votes.push(block_ref);
289 }
290 Ok(votes)
291 }
292
293 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
294 let Some(result) = self.commit_info.safe_iter().skip_to_last().next() else {
295 return Ok(None);
296 };
297 let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
298 Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
299 }
300}