consensus_core/storage/
rocksdb_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{ops::Bound::Included, time::Duration};
6
7use bytes::Bytes;
8use consensus_config::AuthorityIndex;
9use iota_macros::fail_point;
10use typed_store::{
11    Map as _,
12    metrics::SamplingInterval,
13    reopen,
14    rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options, open_cf_opts},
15};
16
17use super::{CommitInfo, Store, WriteBatch};
18use crate::{
19    block::{BlockAPI as _, BlockDigest, BlockRef, Round, SignedBlock, VerifiedBlock},
20    commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
21    error::{ConsensusError, ConsensusResult},
22};
23
24/// Persistent storage with RocksDB.
25pub(crate) struct RocksDBStore {
26    /// Stores SignedBlock by refs.
27    blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
28    /// A secondary index that orders refs first by authors.
29    digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
30    /// Maps commit index to Commit.
31    commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
32    /// Collects votes on commits.
33    /// TODO: batch multiple votes into a single row.
34    commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
35    /// Stores info related to Commit that helps recovery.
36    commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
37}
38
39impl RocksDBStore {
40    const BLOCKS_CF: &'static str = "blocks";
41    const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
42    const COMMITS_CF: &'static str = "commits";
43    const COMMIT_VOTES_CF: &'static str = "commit_votes";
44    const COMMIT_INFO_CF: &'static str = "commit_info";
45
46    /// Creates a new instance of RocksDB storage.
47    pub(crate) fn new(path: &str) -> Self {
48        // Consensus data has high write throughput (all transactions) and is rarely
49        // read (only during recovery and when helping peers catch up).
50        let db_options = default_db_options().optimize_db_for_write_throughput(2);
51        let mut metrics_conf = MetricConf::new("consensus");
52        metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
53        let cf_options = default_db_options().optimize_for_write_throughput().options;
54        let column_family_options = vec![
55            (
56                Self::BLOCKS_CF,
57                default_db_options()
58                    .optimize_for_write_throughput_no_deletion()
59                    // Using larger block is ok since there is not much point reads on the cf.
60                    .set_block_options(512, 128 << 10)
61                    .options,
62            ),
63            (Self::DIGESTS_BY_AUTHORITIES_CF, cf_options.clone()),
64            (Self::COMMITS_CF, cf_options.clone()),
65            (Self::COMMIT_VOTES_CF, cf_options.clone()),
66            (Self::COMMIT_INFO_CF, cf_options.clone()),
67        ];
68        let rocksdb = open_cf_opts(
69            path,
70            Some(db_options.options),
71            metrics_conf,
72            &column_family_options,
73        )
74        .expect("Cannot open database");
75
76        let (blocks, digests_by_authorities, commits, commit_votes, commit_info) = reopen!(&rocksdb,
77            Self::BLOCKS_CF;<(Round, AuthorityIndex, BlockDigest), bytes::Bytes>,
78            Self::DIGESTS_BY_AUTHORITIES_CF;<(AuthorityIndex, Round, BlockDigest), ()>,
79            Self::COMMITS_CF;<(CommitIndex, CommitDigest), Bytes>,
80            Self::COMMIT_VOTES_CF;<(CommitIndex, CommitDigest, BlockRef), ()>,
81            Self::COMMIT_INFO_CF;<(CommitIndex, CommitDigest), CommitInfo>
82        );
83
84        Self {
85            blocks,
86            digests_by_authorities,
87            commits,
88            commit_votes,
89            commit_info,
90        }
91    }
92}
93
94impl Store for RocksDBStore {
95    fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
96        fail_point!("consensus-store-before-write");
97
98        let mut batch = self.blocks.batch();
99        for block in write_batch.blocks {
100            let block_ref = block.reference();
101            batch
102                .insert_batch(
103                    &self.blocks,
104                    [(
105                        (block_ref.round, block_ref.author, block_ref.digest),
106                        block.serialized(),
107                    )],
108                )
109                .map_err(ConsensusError::RocksDBFailure)?;
110            batch
111                .insert_batch(
112                    &self.digests_by_authorities,
113                    [((block_ref.author, block_ref.round, block_ref.digest), ())],
114                )
115                .map_err(ConsensusError::RocksDBFailure)?;
116            for vote in block.commit_votes() {
117                batch
118                    .insert_batch(
119                        &self.commit_votes,
120                        [((vote.index, vote.digest, block_ref), ())],
121                    )
122                    .map_err(ConsensusError::RocksDBFailure)?;
123            }
124        }
125
126        for commit in write_batch.commits {
127            batch
128                .insert_batch(
129                    &self.commits,
130                    [((commit.index(), commit.digest()), commit.serialized())],
131                )
132                .map_err(ConsensusError::RocksDBFailure)?;
133        }
134
135        for (commit_ref, commit_info) in write_batch.commit_info {
136            batch
137                .insert_batch(
138                    &self.commit_info,
139                    [((commit_ref.index, commit_ref.digest), commit_info)],
140                )
141                .map_err(ConsensusError::RocksDBFailure)?;
142        }
143
144        batch.write()?;
145        fail_point!("consensus-store-after-write");
146        Ok(())
147    }
148
149    fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
150        let keys = refs
151            .iter()
152            .map(|r| (r.round, r.author, r.digest))
153            .collect::<Vec<_>>();
154        let serialized = self.blocks.multi_get(keys)?;
155        let mut blocks = vec![];
156        for (key, serialized) in refs.iter().zip(serialized) {
157            if let Some(serialized) = serialized {
158                let signed_block: SignedBlock =
159                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
160                // Only accepted blocks should have been written to storage.
161                let block = VerifiedBlock::new_verified(signed_block, serialized);
162                // Makes sure block data is not corrupted, by comparing digests.
163                assert_eq!(*key, block.reference());
164                blocks.push(Some(block));
165            } else {
166                blocks.push(None);
167            }
168        }
169        Ok(blocks)
170    }
171
172    fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
173        let refs = refs
174            .iter()
175            .map(|r| (r.round, r.author, r.digest))
176            .collect::<Vec<_>>();
177        let exist = self.blocks.multi_contains_keys(refs)?;
178        Ok(exist)
179    }
180
181    fn contains_block_at_slot(&self, slot: crate::block::Slot) -> ConsensusResult<bool> {
182        let found = self
183            .digests_by_authorities
184            .safe_range_iter((
185                Included((slot.authority, slot.round, BlockDigest::MIN)),
186                Included((slot.authority, slot.round, BlockDigest::MAX)),
187            ))
188            .next()
189            .is_some();
190        Ok(found)
191    }
192
193    fn scan_blocks_by_author(
194        &self,
195        author: AuthorityIndex,
196        start_round: Round,
197    ) -> ConsensusResult<Vec<VerifiedBlock>> {
198        let mut refs = vec![];
199        for kv in self.digests_by_authorities.safe_range_iter((
200            Included((author, start_round, BlockDigest::MIN)),
201            Included((author, Round::MAX, BlockDigest::MAX)),
202        )) {
203            let ((author, round, digest), _) = kv?;
204            refs.push(BlockRef::new(round, author, digest));
205        }
206        let results = self.read_blocks(refs.as_slice())?;
207        let mut blocks = Vec::with_capacity(refs.len());
208        for (r, block) in refs.into_iter().zip(results.into_iter()) {
209            blocks.push(
210                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
211            );
212        }
213        Ok(blocks)
214    }
215
216    // The method returns the last `num_of_rounds` rounds blocks by author in round
217    // ascending order. When a `before_round` is defined then the blocks of
218    // round `<=before_round` are returned. If not then the max value for round
219    // will be used as cut off.
220    fn scan_last_blocks_by_author(
221        &self,
222        author: AuthorityIndex,
223        num_of_rounds: u64,
224        before_round: Option<Round>,
225    ) -> ConsensusResult<Vec<VerifiedBlock>> {
226        let before_round = before_round.unwrap_or(Round::MAX);
227        let mut refs = std::collections::VecDeque::new();
228        for kv in self
229            .digests_by_authorities
230            .safe_range_iter((
231                Included((author, Round::MIN, BlockDigest::MIN)),
232                Included((author, before_round, BlockDigest::MAX)),
233            ))
234            .skip_to_last()
235            .reverse()
236            .take(num_of_rounds as usize)
237        {
238            let ((author, round, digest), _) = kv?;
239            refs.push_front(BlockRef::new(round, author, digest));
240        }
241        let results = self.read_blocks(refs.as_slices().0)?;
242        let mut blocks = vec![];
243        for (r, block) in refs.into_iter().zip(results.into_iter()) {
244            blocks.push(
245                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
246            );
247        }
248        Ok(blocks)
249    }
250
251    fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
252        let Some(result) = self.commits.safe_iter().skip_to_last().next() else {
253            return Ok(None);
254        };
255        let ((_index, digest), serialized) = result?;
256        let commit = TrustedCommit::new_trusted(
257            bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
258            serialized,
259        );
260        assert_eq!(commit.digest(), digest);
261        Ok(Some(commit))
262    }
263
264    fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
265        let mut commits = vec![];
266        for result in self.commits.safe_range_iter((
267            Included((range.start(), CommitDigest::MIN)),
268            Included((range.end(), CommitDigest::MAX)),
269        )) {
270            let ((_index, digest), serialized) = result?;
271            let commit = TrustedCommit::new_trusted(
272                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
273                serialized,
274            );
275            assert_eq!(commit.digest(), digest);
276            commits.push(commit);
277        }
278        Ok(commits)
279    }
280
281    fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
282        let mut votes = Vec::new();
283        for vote in self.commit_votes.safe_range_iter((
284            Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
285            Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
286        )) {
287            let ((_, _, block_ref), _) = vote?;
288            votes.push(block_ref);
289        }
290        Ok(votes)
291    }
292
293    fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
294        let Some(result) = self.commit_info.safe_iter().skip_to_last().next() else {
295            return Ok(None);
296        };
297        let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
298        Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
299    }
300}