consensus_core/storage/
rocksdb_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{ops::Bound::Included, time::Duration};
6
7use bytes::Bytes;
8use consensus_config::AuthorityIndex;
9use iota_macros::fail_point;
10use typed_store::{
11    Map as _,
12    metrics::SamplingInterval,
13    reopen,
14    rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options, open_cf_opts},
15};
16
17use super::{CommitInfo, Store, WriteBatch};
18use crate::{
19    block::{BlockAPI as _, BlockDigest, BlockRef, Round, SignedBlock, VerifiedBlock},
20    commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
21    error::{ConsensusError, ConsensusResult},
22    metrics::StoredScoringMetricsU64,
23};
24
25/// Persistent storage with RocksDB.
26pub(crate) struct RocksDBStore {
27    /// Stores SignedBlock by refs.
28    blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
29    /// A secondary index that orders refs first by authors.
30    digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
31    /// Maps commit index to Commit.
32    commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
33    /// Collects votes on commits.
34    /// TODO: batch multiple votes into a single row.
35    commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
36    /// Stores info related to Commit that helps recovery.
37    commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
38    /// Stores scoring metrics for each authority.
39    scoring_metrics: DBMap<AuthorityIndex, StoredScoringMetricsU64>,
40}
41
42impl RocksDBStore {
43    const BLOCKS_CF: &'static str = "blocks";
44    const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
45    const COMMITS_CF: &'static str = "commits";
46    const COMMIT_VOTES_CF: &'static str = "commit_votes";
47    const COMMIT_INFO_CF: &'static str = "commit_info";
48    const SCORING_METRICS_CF: &'static str = "scoring_metrics";
49
50    /// Creates a new instance of RocksDB storage.
51    pub(crate) fn new(path: &str) -> Self {
52        // Consensus data has high write throughput (all transactions) and is rarely
53        // read (only during recovery and when helping peers catch up).
54        let db_options = default_db_options().optimize_db_for_write_throughput(2);
55        let mut metrics_conf = MetricConf::new("consensus");
56        metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
57        let cf_options = default_db_options().optimize_for_write_throughput().options;
58        let column_family_options = vec![
59            (
60                Self::BLOCKS_CF,
61                default_db_options()
62                    .optimize_for_write_throughput_no_deletion()
63                    // Using larger block is ok since there is not much point reads on the cf.
64                    .set_block_options(512, 128 << 10)
65                    .options,
66            ),
67            (Self::DIGESTS_BY_AUTHORITIES_CF, cf_options.clone()),
68            (Self::COMMITS_CF, cf_options.clone()),
69            (Self::COMMIT_VOTES_CF, cf_options.clone()),
70            (Self::COMMIT_INFO_CF, cf_options.clone()),
71            (Self::SCORING_METRICS_CF, cf_options.clone()),
72        ];
73        let rocksdb = open_cf_opts(
74            path,
75            Some(db_options.options),
76            metrics_conf,
77            &column_family_options,
78        )
79        .expect("Cannot open database");
80
81        let (blocks, digests_by_authorities, commits, commit_votes, commit_info, scoring_metrics) = reopen!(&rocksdb,
82            Self::BLOCKS_CF;<(Round, AuthorityIndex, BlockDigest), bytes::Bytes>,
83            Self::DIGESTS_BY_AUTHORITIES_CF;<(AuthorityIndex, Round, BlockDigest), ()>,
84            Self::COMMITS_CF;<(CommitIndex, CommitDigest), Bytes>,
85            Self::COMMIT_VOTES_CF;<(CommitIndex, CommitDigest, BlockRef), ()>,
86            Self::COMMIT_INFO_CF;<(CommitIndex, CommitDigest), CommitInfo>,
87            Self::SCORING_METRICS_CF;<AuthorityIndex, StoredScoringMetricsU64>
88        );
89
90        Self {
91            blocks,
92            digests_by_authorities,
93            commits,
94            commit_votes,
95            commit_info,
96            scoring_metrics,
97        }
98    }
99}
100
101impl Store for RocksDBStore {
102    fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
103        fail_point!("consensus-store-before-write");
104
105        let mut batch = self.blocks.batch();
106        for block in write_batch.blocks {
107            let block_ref = block.reference();
108            batch
109                .insert_batch(
110                    &self.blocks,
111                    [(
112                        (block_ref.round, block_ref.author, block_ref.digest),
113                        block.serialized(),
114                    )],
115                )
116                .map_err(ConsensusError::RocksDBFailure)?;
117            batch
118                .insert_batch(
119                    &self.digests_by_authorities,
120                    [((block_ref.author, block_ref.round, block_ref.digest), ())],
121                )
122                .map_err(ConsensusError::RocksDBFailure)?;
123            for vote in block.commit_votes() {
124                batch
125                    .insert_batch(
126                        &self.commit_votes,
127                        [((vote.index, vote.digest, block_ref), ())],
128                    )
129                    .map_err(ConsensusError::RocksDBFailure)?;
130            }
131        }
132
133        for commit in write_batch.commits {
134            batch
135                .insert_batch(
136                    &self.commits,
137                    [((commit.index(), commit.digest()), commit.serialized())],
138                )
139                .map_err(ConsensusError::RocksDBFailure)?;
140        }
141
142        for (commit_ref, commit_info) in write_batch.commit_info {
143            batch
144                .insert_batch(
145                    &self.commit_info,
146                    [((commit_ref.index, commit_ref.digest), commit_info)],
147                )
148                .map_err(ConsensusError::RocksDBFailure)?;
149        }
150        for (authority, metrics) in write_batch.scoring_metrics {
151            batch
152                .insert_batch(&self.scoring_metrics, [(authority, metrics)])
153                .map_err(ConsensusError::RocksDBFailure)?;
154        }
155        batch.write()?;
156        fail_point!("consensus-store-after-write");
157        Ok(())
158    }
159
160    fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
161        let keys = refs
162            .iter()
163            .map(|r| (r.round, r.author, r.digest))
164            .collect::<Vec<_>>();
165        let serialized = self.blocks.multi_get(keys)?;
166        let mut blocks = vec![];
167        for (key, serialized) in refs.iter().zip(serialized) {
168            if let Some(serialized) = serialized {
169                let signed_block: SignedBlock =
170                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
171                // Only accepted blocks should have been written to storage.
172                let block = VerifiedBlock::new_verified(signed_block, serialized);
173                // Makes sure block data is not corrupted, by comparing digests.
174                assert_eq!(*key, block.reference());
175                blocks.push(Some(block));
176            } else {
177                blocks.push(None);
178            }
179        }
180        Ok(blocks)
181    }
182
183    fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
184        let refs = refs
185            .iter()
186            .map(|r| (r.round, r.author, r.digest))
187            .collect::<Vec<_>>();
188        let exist = self.blocks.multi_contains_keys(refs)?;
189        Ok(exist)
190    }
191
192    fn contains_block_at_slot(&self, slot: crate::block::Slot) -> ConsensusResult<bool> {
193        let found = self
194            .digests_by_authorities
195            .safe_range_iter((
196                Included((slot.authority, slot.round, BlockDigest::MIN)),
197                Included((slot.authority, slot.round, BlockDigest::MAX)),
198            ))
199            .next()
200            .is_some();
201        Ok(found)
202    }
203
204    fn scan_blocks_by_author(
205        &self,
206        author: AuthorityIndex,
207        start_round: Round,
208    ) -> ConsensusResult<Vec<VerifiedBlock>> {
209        let mut refs = vec![];
210        for kv in self.digests_by_authorities.safe_range_iter((
211            Included((author, start_round, BlockDigest::MIN)),
212            Included((author, Round::MAX, BlockDigest::MAX)),
213        )) {
214            let ((author, round, digest), _) = kv?;
215            refs.push(BlockRef::new(round, author, digest));
216        }
217        let results = self.read_blocks(refs.as_slice())?;
218        let mut blocks = Vec::with_capacity(refs.len());
219        for (r, block) in refs.into_iter().zip(results.into_iter()) {
220            blocks.push(
221                block.unwrap_or_else(|| panic!("Storage inconsistency: block {r:?} not found!")),
222            );
223        }
224        Ok(blocks)
225    }
226
227    fn scan_metrics(&self) -> ConsensusResult<Vec<(AuthorityIndex, StoredScoringMetricsU64)>> {
228        let mut metrics_by_author = vec![];
229        for kv in self.scoring_metrics.safe_iter() {
230            metrics_by_author.push(kv?);
231        }
232        Ok(metrics_by_author)
233    }
234
235    // The method returns the last `num_of_rounds` rounds blocks by author in round
236    // ascending order. When a `before_round` is defined then the blocks of
237    // round `<=before_round` are returned. If not then the max value for round
238    // will be used as cut off.
239    fn scan_last_blocks_by_author(
240        &self,
241        author: AuthorityIndex,
242        num_of_rounds: u64,
243        before_round: Option<Round>,
244    ) -> ConsensusResult<Vec<VerifiedBlock>> {
245        let before_round = before_round.unwrap_or(Round::MAX);
246        let mut refs = std::collections::VecDeque::new();
247        for kv in self
248            .digests_by_authorities
249            .reversed_safe_iter_with_bounds(
250                Some((author, Round::MIN, BlockDigest::MIN)),
251                Some((author, before_round, BlockDigest::MAX)),
252            )?
253            .take(num_of_rounds as usize)
254        {
255            let ((author, round, digest), _) = kv?;
256            refs.push_front(BlockRef::new(round, author, digest));
257        }
258        let results = self.read_blocks(refs.as_slices().0)?;
259        let mut blocks = vec![];
260        for (r, block) in refs.into_iter().zip(results.into_iter()) {
261            blocks.push(
262                block.unwrap_or_else(|| panic!("Storage inconsistency: block {r:?} not found!")),
263            );
264        }
265        Ok(blocks)
266    }
267
268    fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
269        let Some(result) = self
270            .commits
271            .reversed_safe_iter_with_bounds(None, None)?
272            .next()
273        else {
274            return Ok(None);
275        };
276        let ((_index, digest), serialized) = result?;
277        let commit = TrustedCommit::new_trusted(
278            bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
279            serialized,
280        );
281        assert_eq!(commit.digest(), digest);
282        Ok(Some(commit))
283    }
284
285    fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
286        let mut commits = vec![];
287        for result in self.commits.safe_range_iter((
288            Included((range.start(), CommitDigest::MIN)),
289            Included((range.end(), CommitDigest::MAX)),
290        )) {
291            let ((_index, digest), serialized) = result?;
292            let commit = TrustedCommit::new_trusted(
293                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
294                serialized,
295            );
296            assert_eq!(commit.digest(), digest);
297            commits.push(commit);
298        }
299        Ok(commits)
300    }
301
302    fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
303        let mut votes = Vec::new();
304        for vote in self.commit_votes.safe_range_iter((
305            Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
306            Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
307        )) {
308            let ((_, _, block_ref), _) = vote?;
309            votes.push(block_ref);
310        }
311        Ok(votes)
312    }
313
314    fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
315        let Some(result) = self
316            .commit_info
317            .reversed_safe_iter_with_bounds(None, None)?
318            .next()
319        else {
320            return Ok(None);
321        };
322        let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
323        Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
324    }
325}