typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod keys;
8pub(crate) mod safe_iter;
9pub mod util;
10pub(crate) mod values;
11
12use std::{
13    borrow::Borrow,
14    collections::{BTreeMap, HashSet},
15    env,
16    ffi::CStr,
17    marker::PhantomData,
18    ops::{Bound, RangeBounds},
19    path::{Path, PathBuf},
20    sync::Arc,
21    time::Duration,
22};
23
24use bincode::Options;
25use collectable::TryExtend;
26use iota_macros::{fail_point, nondeterministic};
27use itertools::Itertools;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::{
30    AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
31    ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
32    IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
33    ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
34    WriteOptions, checkpoint::Checkpoint, properties,
35};
36use serde::{Serialize, de::DeserializeOwned};
37use tap::TapFallible;
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, instrument, warn};
40
41use self::{iter::Iter, keys::Keys, values::Values};
42use crate::{
43    TypedStoreError,
44    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
45    rocks::{
46        errors::{
47            typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
48            typed_store_err_from_rocks_err,
49        },
50        safe_iter::SafeIter,
51    },
52    traits::{Map, TableSummary},
53};
54
55// Write buffer size per RocksDB instance can be set via the env var below.
56// If the env var is not set, use the default value in MiB.
57const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
58const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
59
60// Write ahead log size per RocksDB instance can be set via the env var below.
61// If the env var is not set, use the default value in MiB.
62const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
63const DEFAULT_DB_WAL_SIZE: usize = 1024;
64
65// Environment variable to control behavior of write throughput optimized
66// tables.
67const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
68const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
69const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
70const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
71const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
72const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
73const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
74const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
75const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
76
77// Set to 1 to disable blob storage for transactions and effects.
78const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
79
80const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
81
82// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property
83// built-in. From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
84const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
85    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
86
87#[cfg(test)]
88mod tests;
89
90/// A helper macro to reopen multiple column families. The macro returns
91/// a tuple of DBMap structs in the same order that the column families
92/// are defined.
93///
94/// # Arguments
95///
96/// * `db` - a reference to a rocks DB object
97/// * `cf;<ty,ty>` - a comma separated list of column families to open. For each
98///   column family a concatenation of column family name (cf) and Key-Value
99///   <ty, ty> should be provided.
100///
101/// # Examples
102///
103/// We successfully open two different column families.
104/// ```
105/// use typed_store::reopen;
106/// use typed_store::rocks::*;
107/// use tempfile::tempdir;
108/// use prometheus::Registry;
109/// use std::sync::Arc;
110/// use typed_store::metrics::DBMetrics;
111/// use core::fmt::Error;
112///
113/// #[tokio::main]
114/// async fn main() -> Result<(), Error> {
115/// const FIRST_CF: &str = "First_CF";
116/// const SECOND_CF: &str = "Second_CF";
117///
118///
119/// /// Create the rocks database reference for the desired column families
120/// let rocks = open_cf(tempdir().unwrap(), None, MetricConf::default(), &[FIRST_CF, SECOND_CF]).unwrap();
121///
122/// /// Now simply open all the column families for their expected Key-Value types
123/// let (db_map_1, db_map_2) = reopen!(&rocks, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
124/// Ok(())
125/// }
126/// ```
127#[macro_export]
128macro_rules! reopen {
129    ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
130        (
131            $(
132                DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
133            ),*
134        )
135    };
136}
137
138/// Repeatedly attempt an Optimistic Transaction until it succeeds.
139/// Since many callsites (e.g. the consensus handler) cannot proceed in the case
140/// of failed writes, this will loop forever until the transaction succeeds.
141#[macro_export]
142macro_rules! retry_transaction {
143    ($transaction:expr) => {
144        retry_transaction!($transaction, Some(20))
145    };
146
147    (
148        $transaction:expr,
149        $max_retries:expr // should be an Option<int type>, None for unlimited
150        $(,)?
151
152    ) => {{
153        use rand::{
154            distributions::{Distribution, Uniform},
155            rngs::ThreadRng,
156        };
157        use tokio::time::{Duration, sleep};
158        use tracing::{error, info};
159
160        let mut retries = 0;
161        let max_retries = $max_retries;
162        loop {
163            let status = $transaction;
164            match status {
165                Err(TypedStoreError::RetryableTransaction) => {
166                    retries += 1;
167                    // Randomized delay to help racing transactions get out of each other's way.
168                    let delay = {
169                        let mut rng = ThreadRng::default();
170                        Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
171                    };
172                    if let Some(max_retries) = max_retries {
173                        if retries > max_retries {
174                            error!(?max_retries, "max retries exceeded");
175                            break status;
176                        }
177                    }
178                    if retries > 10 {
179                        // TODO: monitoring needed?
180                        error!(?delay, ?retries, "excessive transaction retries...");
181                    } else {
182                        info!(
183                            ?delay,
184                            ?retries,
185                            "transaction write conflict detected, sleeping"
186                        );
187                    }
188                    sleep(delay).await;
189                }
190                _ => break status,
191            }
192        }
193    }};
194}
195
196#[macro_export]
197macro_rules! retry_transaction_forever {
198    ($transaction:expr) => {
199        $crate::retry_transaction!($transaction, None)
200    };
201}
202
203#[derive(Debug)]
204pub struct DBWithThreadModeWrapper {
205    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
206    pub metric_conf: MetricConf,
207    pub db_path: PathBuf,
208}
209
210impl DBWithThreadModeWrapper {
211    fn new(
212        underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
213        metric_conf: MetricConf,
214        db_path: PathBuf,
215    ) -> Self {
216        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
217        Self {
218            underlying,
219            metric_conf,
220            db_path,
221        }
222    }
223}
224
225impl Drop for DBWithThreadModeWrapper {
226    fn drop(&mut self) {
227        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
228    }
229}
230
231#[derive(Debug)]
232pub struct OptimisticTransactionDBWrapper {
233    pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
234    pub metric_conf: MetricConf,
235    pub db_path: PathBuf,
236}
237
238impl OptimisticTransactionDBWrapper {
239    fn new(
240        underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
241        metric_conf: MetricConf,
242        db_path: PathBuf,
243    ) -> Self {
244        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
245        Self {
246            underlying,
247            metric_conf,
248            db_path,
249        }
250    }
251}
252
253impl Drop for OptimisticTransactionDBWrapper {
254    fn drop(&mut self) {
255        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
256    }
257}
258
259/// Thin wrapper to unify interface across different db types
260#[derive(Debug)]
261pub enum RocksDB {
262    DBWithThreadMode(DBWithThreadModeWrapper),
263    OptimisticTransactionDB(OptimisticTransactionDBWrapper),
264}
265
266macro_rules! delegate_call {
267    ($self:ident.$method:ident($($args:ident),*)) => {
268        match $self {
269            Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
270            Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
271        }
272    }
273}
274
275impl Drop for RocksDB {
276    fn drop(&mut self) {
277        delegate_call!(self.cancel_all_background_work(/* wait */ true))
278    }
279}
280
281impl RocksDB {
282    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
283        delegate_call!(self.get(key))
284    }
285
286    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
287        &'a self,
288        keys: I,
289        readopts: &ReadOptions,
290    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
291    where
292        K: AsRef<[u8]>,
293        I: IntoIterator<Item = (&'b W, K)>,
294        W: 'b + AsColumnFamilyRef,
295    {
296        delegate_call!(self.multi_get_cf_opt(keys, readopts))
297    }
298
299    pub fn batched_multi_get_cf_opt<I, K>(
300        &self,
301        cf: &impl AsColumnFamilyRef,
302        keys: I,
303        sorted_input: bool,
304        readopts: &ReadOptions,
305    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
306    where
307        I: IntoIterator<Item = K>,
308        K: AsRef<[u8]>,
309    {
310        delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
311    }
312
313    pub fn property_int_value_cf(
314        &self,
315        cf: &impl AsColumnFamilyRef,
316        name: impl CStrLike,
317    ) -> Result<Option<u64>, rocksdb::Error> {
318        delegate_call!(self.property_int_value_cf(cf, name))
319    }
320
321    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
322        &self,
323        cf: &impl AsColumnFamilyRef,
324        key: K,
325        readopts: &ReadOptions,
326    ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
327        delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
328    }
329
330    pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
331        delegate_call!(self.cf_handle(name))
332    }
333
334    pub fn create_cf<N: AsRef<str>>(
335        &self,
336        name: N,
337        opts: &rocksdb::Options,
338    ) -> Result<(), rocksdb::Error> {
339        delegate_call!(self.create_cf(name, opts))
340    }
341
342    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
343        delegate_call!(self.drop_cf(name))
344    }
345
346    pub fn delete_cf<K: AsRef<[u8]>>(
347        &self,
348        cf: &impl AsColumnFamilyRef,
349        key: K,
350        writeopts: &WriteOptions,
351    ) -> Result<(), rocksdb::Error> {
352        fail_point!("delete-cf-before");
353        let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
354        fail_point!("delete-cf-after");
355        #[expect(clippy::let_and_return)]
356        ret
357    }
358
359    pub fn path(&self) -> &Path {
360        delegate_call!(self.path())
361    }
362
363    pub fn put_cf<K, V>(
364        &self,
365        cf: &impl AsColumnFamilyRef,
366        key: K,
367        value: V,
368        writeopts: &WriteOptions,
369    ) -> Result<(), rocksdb::Error>
370    where
371        K: AsRef<[u8]>,
372        V: AsRef<[u8]>,
373    {
374        fail_point!("put-cf-before");
375        let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
376        fail_point!("put-cf-after");
377        #[expect(clippy::let_and_return)]
378        ret
379    }
380
381    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
382        &self,
383        cf: &impl AsColumnFamilyRef,
384        key: K,
385        readopts: &ReadOptions,
386    ) -> bool {
387        delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
388    }
389
390    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
391        delegate_call!(self.try_catch_up_with_primary())
392    }
393
394    pub fn write(
395        &self,
396        batch: RocksDBBatch,
397        writeopts: &WriteOptions,
398    ) -> Result<(), TypedStoreError> {
399        fail_point!("batch-write-before");
400        let ret = match (self, batch) {
401            (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
402                db.underlying
403                    .write_opt(batch, writeopts)
404                    .map_err(typed_store_err_from_rocks_err)?;
405                Ok(())
406            }
407            (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
408                db.underlying
409                    .write_opt(batch, writeopts)
410                    .map_err(typed_store_err_from_rocks_err)?;
411                Ok(())
412            }
413            _ => Err(TypedStoreError::RocksDB(
414                "using invalid batch type for the database".to_string(),
415            )),
416        };
417        fail_point!("batch-write-after");
418        #[expect(clippy::let_and_return)]
419        ret
420    }
421
422    pub fn transaction_without_snapshot(
423        &self,
424    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
425        match self {
426            Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
427            Self::DBWithThreadMode(_) => panic!(),
428        }
429    }
430
431    pub fn transaction(
432        &self,
433    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
434        match self {
435            Self::OptimisticTransactionDB(db) => {
436                let mut tx_opts = OptimisticTransactionOptions::new();
437                tx_opts.set_snapshot(true);
438
439                Ok(db
440                    .underlying
441                    .transaction_opt(&WriteOptions::default(), &tx_opts))
442            }
443            Self::DBWithThreadMode(_) => panic!(),
444        }
445    }
446
447    pub fn raw_iterator_cf<'a: 'b, 'b>(
448        &'a self,
449        cf_handle: &impl AsColumnFamilyRef,
450        readopts: ReadOptions,
451    ) -> RocksDBRawIter<'b> {
452        match self {
453            Self::DBWithThreadMode(db) => {
454                RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
455            }
456            Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
457                db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
458            ),
459        }
460    }
461
462    pub fn iterator_cf<'a: 'b, 'b>(
463        &'a self,
464        cf_handle: &impl AsColumnFamilyRef,
465        readopts: ReadOptions,
466        mode: IteratorMode<'_>,
467    ) -> RocksDBIter<'b> {
468        match self {
469            Self::DBWithThreadMode(db) => {
470                RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
471            }
472            Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
473                db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
474            ),
475        }
476    }
477
478    pub fn compact_range_cf<K: AsRef<[u8]>>(
479        &self,
480        cf: &impl AsColumnFamilyRef,
481        start: Option<K>,
482        end: Option<K>,
483    ) {
484        delegate_call!(self.compact_range_cf(cf, start, end))
485    }
486
487    pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
488        &self,
489        cf: &impl AsColumnFamilyRef,
490        start: Option<K>,
491        end: Option<K>,
492    ) {
493        let opt = &mut CompactOptions::default();
494        opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
495        delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
496    }
497
498    pub fn flush(&self) -> Result<(), TypedStoreError> {
499        delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
500    }
501
502    pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
503        match self {
504            Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
505            Self::OptimisticTransactionDB(d) => {
506                RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
507            }
508        }
509    }
510
511    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
512        let checkpoint = match self {
513            Self::DBWithThreadMode(d) => {
514                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
515            }
516            Self::OptimisticTransactionDB(d) => {
517                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
518            }
519        };
520        checkpoint
521            .create_checkpoint(path)
522            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
523        Ok(())
524    }
525
526    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
527        delegate_call!(self.flush_cf(cf))
528    }
529
530    pub fn set_options_cf(
531        &self,
532        cf: &impl AsColumnFamilyRef,
533        opts: &[(&str, &str)],
534    ) -> Result<(), rocksdb::Error> {
535        delegate_call!(self.set_options_cf(cf, opts))
536    }
537
538    pub fn get_sampling_interval(&self) -> SamplingInterval {
539        match self {
540            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
541            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
542        }
543    }
544
545    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
546        match self {
547            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
548            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
549        }
550    }
551
552    pub fn write_sampling_interval(&self) -> SamplingInterval {
553        match self {
554            Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
555            Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
556        }
557    }
558
559    pub fn iter_sampling_interval(&self) -> SamplingInterval {
560        match self {
561            Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
562            Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
563        }
564    }
565
566    pub fn db_name(&self) -> String {
567        let name = match self {
568            Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
569            Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
570        };
571        if name.is_empty() {
572            self.default_db_name()
573        } else {
574            name.clone()
575        }
576    }
577
578    fn default_db_name(&self) -> String {
579        self.path()
580            .file_name()
581            .and_then(|f| f.to_str())
582            .unwrap_or("unknown")
583            .to_string()
584    }
585
586    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
587        delegate_call!(self.live_files())
588    }
589}
590
591pub enum RocksDBSnapshot<'a> {
592    DBWithThreadMode(rocksdb::Snapshot<'a>),
593    OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
594}
595
596impl<'a> RocksDBSnapshot<'a> {
597    pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
598        &'a self,
599        keys: I,
600        readopts: ReadOptions,
601    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
602    where
603        K: AsRef<[u8]>,
604        I: IntoIterator<Item = (&'b W, K)>,
605        W: 'b + AsColumnFamilyRef,
606    {
607        match self {
608            Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
609            Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
610        }
611    }
612    pub fn multi_get_cf<'b: 'a, K, I, W>(
613        &'a self,
614        keys: I,
615    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
616    where
617        K: AsRef<[u8]>,
618        I: IntoIterator<Item = (&'b W, K)>,
619        W: 'b + AsColumnFamilyRef,
620    {
621        match self {
622            Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
623            Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
624        }
625    }
626}
627
628pub enum RocksDBBatch {
629    Regular(rocksdb::WriteBatch),
630    Transactional(rocksdb::WriteBatchWithTransaction<true>),
631}
632
633macro_rules! delegate_batch_call {
634    ($self:ident.$method:ident($($args:ident),*)) => {
635        match $self {
636            Self::Regular(b) => b.$method($($args),*),
637            Self::Transactional(b) => b.$method($($args),*),
638        }
639    }
640}
641
642impl RocksDBBatch {
643    fn size_in_bytes(&self) -> usize {
644        delegate_batch_call!(self.size_in_bytes())
645    }
646
647    pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
648        delegate_batch_call!(self.delete_cf(cf, key))
649    }
650
651    pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
652    where
653        K: AsRef<[u8]>,
654        V: AsRef<[u8]>,
655    {
656        delegate_batch_call!(self.put_cf(cf, key, value))
657    }
658
659    pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
660    where
661        K: AsRef<[u8]>,
662        V: AsRef<[u8]>,
663    {
664        delegate_batch_call!(self.merge_cf(cf, key, value))
665    }
666
667    pub fn delete_range_cf<K: AsRef<[u8]>>(
668        &mut self,
669        cf: &impl AsColumnFamilyRef,
670        from: K,
671        to: K,
672    ) -> Result<(), TypedStoreError> {
673        match self {
674            Self::Regular(batch) => {
675                batch.delete_range_cf(cf, from, to);
676                Ok(())
677            }
678            Self::Transactional(_) => panic!(),
679        }
680    }
681}
682
683#[derive(Debug, Default)]
684pub struct MetricConf {
685    pub db_name: String,
686    pub read_sample_interval: SamplingInterval,
687    pub write_sample_interval: SamplingInterval,
688    pub iter_sample_interval: SamplingInterval,
689}
690
691impl MetricConf {
692    pub fn new(db_name: &str) -> Self {
693        if db_name.is_empty() {
694            error!("A meaningful db name should be used for metrics reporting.")
695        }
696        Self {
697            db_name: db_name.to_string(),
698            read_sample_interval: SamplingInterval::default(),
699            write_sample_interval: SamplingInterval::default(),
700            iter_sample_interval: SamplingInterval::default(),
701        }
702    }
703
704    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
705        Self {
706            db_name: self.db_name,
707            read_sample_interval: read_interval,
708            write_sample_interval: SamplingInterval::default(),
709            iter_sample_interval: SamplingInterval::default(),
710        }
711    }
712}
713const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
714const METRICS_ERROR: i64 = -1;
715
716/// An interface to a rocksDB database, keyed by a columnfamily
717#[derive(Clone, Debug)]
718pub struct DBMap<K, V> {
719    pub rocksdb: Arc<RocksDB>,
720    _phantom: PhantomData<fn(K) -> V>,
721    // the rocksDB ColumnFamily under which the map is stored
722    cf: String,
723    pub opts: ReadWriteOptions,
724    db_metrics: Arc<DBMetrics>,
725    get_sample_interval: SamplingInterval,
726    multiget_sample_interval: SamplingInterval,
727    write_sample_interval: SamplingInterval,
728    iter_sample_interval: SamplingInterval,
729    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
730}
731
732unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
733
734impl<K, V> DBMap<K, V> {
735    pub(crate) fn new(
736        db: Arc<RocksDB>,
737        opts: &ReadWriteOptions,
738        opt_cf: &str,
739        is_deprecated: bool,
740    ) -> Self {
741        let db_cloned = db.clone();
742        let db_metrics = DBMetrics::get();
743        let db_metrics_cloned = db_metrics.clone();
744        let cf = opt_cf.to_string();
745        let (sender, mut recv) = tokio::sync::oneshot::channel();
746        if !is_deprecated {
747            tokio::task::spawn(async move {
748                let mut interval =
749                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
750                loop {
751                    tokio::select! {
752                        _ = interval.tick() => {
753                            let db = db_cloned.clone();
754                            let cf = cf.clone();
755                            let db_metrics = db_metrics.clone();
756                            if let Err(e) = tokio::task::spawn_blocking(move || {
757                                Self::report_metrics(&db, &cf, &db_metrics);
758                            }).await {
759                                error!("Failed to log metrics with error: {}", e);
760                            }
761                        }
762                        _ = &mut recv => break,
763                    }
764                }
765                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
766            });
767        }
768        DBMap {
769            rocksdb: db.clone(),
770            opts: opts.clone(),
771            _phantom: PhantomData,
772            cf: opt_cf.to_string(),
773            db_metrics: db_metrics_cloned,
774            _metrics_task_cancel_handle: Arc::new(sender),
775            get_sample_interval: db.get_sampling_interval(),
776            multiget_sample_interval: db.multiget_sampling_interval(),
777            write_sample_interval: db.write_sampling_interval(),
778            iter_sample_interval: db.iter_sampling_interval(),
779        }
780    }
781
782    /// Opens a database from a path, with specific options and an optional
783    /// column family.
784    ///
785    /// This database is used to perform operations on single column family, and
786    /// parametrizes all operations in `DBBatch` when writing across column
787    /// families.
788    #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
789    pub fn open<P: AsRef<Path>>(
790        path: P,
791        metric_conf: MetricConf,
792        db_options: Option<rocksdb::Options>,
793        opt_cf: Option<&str>,
794        rw_options: &ReadWriteOptions,
795    ) -> Result<Self, TypedStoreError> {
796        let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
797        let cfs = vec![cf_key];
798        let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
799        Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
800    }
801
802    /// Reopens an open database as a typed map operating under a specific
803    /// column family. if no column family is passed, the default column
804    /// family is used.
805    ///
806    /// ```
807    /// use core::fmt::Error;
808    /// use std::sync::Arc;
809    ///
810    /// use prometheus::Registry;
811    /// use tempfile::tempdir;
812    /// use typed_store::{metrics::DBMetrics, rocks::*};
813    /// #[tokio::main]
814    /// async fn main() -> Result<(), Error> {
815    ///     /// Open the DB with all needed column families first.
816    ///     let rocks = open_cf(
817    ///         tempdir().unwrap(),
818    ///         None,
819    ///         MetricConf::default(),
820    ///         &["First_CF", "Second_CF"],
821    ///     )
822    ///     .unwrap();
823    ///     /// Attach the column families to specific maps.
824    ///     let db_cf_1 = DBMap::<u32, u32>::reopen(
825    ///         &rocks,
826    ///         Some("First_CF"),
827    ///         &ReadWriteOptions::default(),
828    ///         false,
829    ///     )
830    ///     .expect("Failed to open storage");
831    ///     let db_cf_2 = DBMap::<u32, u32>::reopen(
832    ///         &rocks,
833    ///         Some("Second_CF"),
834    ///         &ReadWriteOptions::default(),
835    ///         false,
836    ///     )
837    ///     .expect("Failed to open storage");
838    ///     Ok(())
839    /// }
840    /// ```
841    #[instrument(level = "debug", skip(db), err)]
842    pub fn reopen(
843        db: &Arc<RocksDB>,
844        opt_cf: Option<&str>,
845        rw_options: &ReadWriteOptions,
846        is_deprecated: bool,
847    ) -> Result<Self, TypedStoreError> {
848        let cf_key = opt_cf
849            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
850            .to_owned();
851
852        db.cf_handle(&cf_key)
853            .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
854
855        Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
856    }
857
858    pub fn batch(&self) -> DBBatch {
859        let batch = match *self.rocksdb {
860            RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
861            RocksDB::OptimisticTransactionDB(_) => {
862                RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
863            }
864        };
865        DBBatch::new(
866            &self.rocksdb,
867            batch,
868            self.opts.writeopts(),
869            &self.db_metrics,
870            &self.write_sample_interval,
871        )
872    }
873
874    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
875        let from_buf = be_fix_int_ser(start)?;
876        let to_buf = be_fix_int_ser(end)?;
877        self.rocksdb
878            .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
879        Ok(())
880    }
881
882    pub fn compact_range_raw(
883        &self,
884        cf_name: &str,
885        start: Vec<u8>,
886        end: Vec<u8>,
887    ) -> Result<(), TypedStoreError> {
888        let cf = self
889            .rocksdb
890            .cf_handle(cf_name)
891            .expect("compact range: column family does not exist");
892        self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
893        Ok(())
894    }
895
896    pub fn compact_range_to_bottom<J: Serialize>(
897        &self,
898        start: &J,
899        end: &J,
900    ) -> Result<(), TypedStoreError> {
901        let from_buf = be_fix_int_ser(start)?;
902        let to_buf = be_fix_int_ser(end)?;
903        self.rocksdb
904            .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
905        Ok(())
906    }
907
908    pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
909        self.rocksdb
910            .cf_handle(&self.cf)
911            .expect("Map-keying column family should have been checked at DB creation")
912    }
913
914    pub fn iterator_cf(&self) -> RocksDBIter<'_> {
915        self.rocksdb
916            .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
917    }
918
919    pub fn flush(&self) -> Result<(), TypedStoreError> {
920        self.rocksdb
921            .flush_cf(&self.cf())
922            .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
923    }
924
925    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
926        self.rocksdb.set_options_cf(&self.cf(), opts)
927    }
928
929    fn get_int_property(
930        rocksdb: &RocksDB,
931        cf: &impl AsColumnFamilyRef,
932        property_name: &'static std::ffi::CStr,
933    ) -> Result<i64, TypedStoreError> {
934        match rocksdb.property_int_value_cf(cf, property_name) {
935            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
936            Ok(None) => Ok(0),
937            Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
938        }
939    }
940
941    /// Returns a vector of raw values corresponding to the keys provided.
942    fn multi_get_pinned<J>(
943        &self,
944        keys: impl IntoIterator<Item = J>,
945    ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
946    where
947        J: Borrow<K>,
948        K: Serialize,
949    {
950        let _timer = self
951            .db_metrics
952            .op_metrics
953            .rocksdb_multiget_latency_seconds
954            .with_label_values(&[&self.cf])
955            .start_timer();
956        let perf_ctx = if self.multiget_sample_interval.sample() {
957            Some(RocksDBPerfContext)
958        } else {
959            None
960        };
961        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
962            .into_iter()
963            .map(|k| be_fix_int_ser(k.borrow()))
964            .collect();
965        let results: Result<Vec<_>, TypedStoreError> = self
966            .rocksdb
967            .batched_multi_get_cf_opt(
968                &self.cf(),
969                keys_bytes?,
970                // sorted_keys=
971                false,
972                &self.opts.readopts(),
973            )
974            .into_iter()
975            .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
976            .collect();
977        let entries = results?;
978        let entry_size = entries
979            .iter()
980            .flatten()
981            .map(|entry| entry.len())
982            .sum::<usize>();
983        self.db_metrics
984            .op_metrics
985            .rocksdb_multiget_bytes
986            .with_label_values(&[&self.cf])
987            .observe(entry_size as f64);
988        if perf_ctx.is_some() {
989            self.db_metrics
990                .read_perf_ctx_metrics
991                .report_metrics(&self.cf);
992        }
993        Ok(entries)
994    }
995
996    fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
997        let cf = rocksdb.cf_handle(cf_name).expect("Failed to get cf");
998        db_metrics
999            .cf_metrics
1000            .rocksdb_total_sst_files_size
1001            .with_label_values(&[cf_name])
1002            .set(
1003                Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1004                    .unwrap_or(METRICS_ERROR),
1005            );
1006        db_metrics
1007            .cf_metrics
1008            .rocksdb_total_blob_files_size
1009            .with_label_values(&[cf_name])
1010            .set(
1011                Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1012                    .unwrap_or(METRICS_ERROR),
1013            );
1014        db_metrics
1015            .cf_metrics
1016            .rocksdb_current_size_active_mem_tables
1017            .with_label_values(&[cf_name])
1018            .set(
1019                Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1020                    .unwrap_or(METRICS_ERROR),
1021            );
1022        db_metrics
1023            .cf_metrics
1024            .rocksdb_size_all_mem_tables
1025            .with_label_values(&[cf_name])
1026            .set(
1027                Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1028                    .unwrap_or(METRICS_ERROR),
1029            );
1030        db_metrics
1031            .cf_metrics
1032            .rocksdb_num_snapshots
1033            .with_label_values(&[cf_name])
1034            .set(
1035                Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1036                    .unwrap_or(METRICS_ERROR),
1037            );
1038        db_metrics
1039            .cf_metrics
1040            .rocksdb_oldest_snapshot_time
1041            .with_label_values(&[cf_name])
1042            .set(
1043                Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1044                    .unwrap_or(METRICS_ERROR),
1045            );
1046        db_metrics
1047            .cf_metrics
1048            .rocksdb_actual_delayed_write_rate
1049            .with_label_values(&[cf_name])
1050            .set(
1051                Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1052                    .unwrap_or(METRICS_ERROR),
1053            );
1054        db_metrics
1055            .cf_metrics
1056            .rocksdb_is_write_stopped
1057            .with_label_values(&[cf_name])
1058            .set(
1059                Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1060                    .unwrap_or(METRICS_ERROR),
1061            );
1062        db_metrics
1063            .cf_metrics
1064            .rocksdb_block_cache_capacity
1065            .with_label_values(&[cf_name])
1066            .set(
1067                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1068                    .unwrap_or(METRICS_ERROR),
1069            );
1070        db_metrics
1071            .cf_metrics
1072            .rocksdb_block_cache_usage
1073            .with_label_values(&[cf_name])
1074            .set(
1075                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1076                    .unwrap_or(METRICS_ERROR),
1077            );
1078        db_metrics
1079            .cf_metrics
1080            .rocksdb_block_cache_pinned_usage
1081            .with_label_values(&[cf_name])
1082            .set(
1083                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1084                    .unwrap_or(METRICS_ERROR),
1085            );
1086        db_metrics
1087            .cf_metrics
1088            .rocksdb_estimate_table_readers_mem
1089            .with_label_values(&[cf_name])
1090            .set(
1091                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1092                    .unwrap_or(METRICS_ERROR),
1093            );
1094        db_metrics
1095            .cf_metrics
1096            .rocksdb_estimated_num_keys
1097            .with_label_values(&[cf_name])
1098            .set(
1099                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1100                    .unwrap_or(METRICS_ERROR),
1101            );
1102        db_metrics
1103            .cf_metrics
1104            .rocksdb_num_immutable_mem_tables
1105            .with_label_values(&[cf_name])
1106            .set(
1107                Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1108                    .unwrap_or(METRICS_ERROR),
1109            );
1110        db_metrics
1111            .cf_metrics
1112            .rocksdb_mem_table_flush_pending
1113            .with_label_values(&[cf_name])
1114            .set(
1115                Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1116                    .unwrap_or(METRICS_ERROR),
1117            );
1118        db_metrics
1119            .cf_metrics
1120            .rocksdb_compaction_pending
1121            .with_label_values(&[cf_name])
1122            .set(
1123                Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1124                    .unwrap_or(METRICS_ERROR),
1125            );
1126        db_metrics
1127            .cf_metrics
1128            .rocksdb_estimate_pending_compaction_bytes
1129            .with_label_values(&[cf_name])
1130            .set(
1131                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1132                    .unwrap_or(METRICS_ERROR),
1133            );
1134        db_metrics
1135            .cf_metrics
1136            .rocksdb_num_running_compactions
1137            .with_label_values(&[cf_name])
1138            .set(
1139                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1140                    .unwrap_or(METRICS_ERROR),
1141            );
1142        db_metrics
1143            .cf_metrics
1144            .rocksdb_num_running_flushes
1145            .with_label_values(&[cf_name])
1146            .set(
1147                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1148                    .unwrap_or(METRICS_ERROR),
1149            );
1150        db_metrics
1151            .cf_metrics
1152            .rocksdb_estimate_oldest_key_time
1153            .with_label_values(&[cf_name])
1154            .set(
1155                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1156                    .unwrap_or(METRICS_ERROR),
1157            );
1158        db_metrics
1159            .cf_metrics
1160            .rocksdb_background_errors
1161            .with_label_values(&[cf_name])
1162            .set(
1163                Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1164                    .unwrap_or(METRICS_ERROR),
1165            );
1166        db_metrics
1167            .cf_metrics
1168            .rocksdb_base_level
1169            .with_label_values(&[cf_name])
1170            .set(
1171                Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1172                    .unwrap_or(METRICS_ERROR),
1173            );
1174    }
1175
1176    pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1177        DBTransaction::new(&self.rocksdb)
1178    }
1179
1180    pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1181        DBTransaction::new_without_snapshot(&self.rocksdb)
1182    }
1183
1184    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1185        self.rocksdb.checkpoint(path)
1186    }
1187
1188    pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1189        Ok(self.rocksdb.snapshot())
1190    }
1191
1192    pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1193        let mut num_keys = 0;
1194        let mut key_bytes_total = 0;
1195        let mut value_bytes_total = 0;
1196        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1197        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1198        let iter = self.iterator_cf().map(Result::unwrap);
1199        for (key, value) in iter {
1200            num_keys += 1;
1201            key_bytes_total += key.len();
1202            value_bytes_total += value.len();
1203            key_hist.record(key.len() as u64)?;
1204            value_hist.record(value.len() as u64)?;
1205        }
1206        Ok(TableSummary {
1207            num_keys,
1208            key_bytes_total,
1209            value_bytes_total,
1210            key_hist,
1211            value_hist,
1212        })
1213    }
1214
1215    // Creates metrics and context for tracking an iterator usage and performance.
1216    fn create_iter_context(
1217        &self,
1218    ) -> (
1219        Option<HistogramTimer>,
1220        Option<Histogram>,
1221        Option<Histogram>,
1222        Option<RocksDBPerfContext>,
1223    ) {
1224        let timer = self
1225            .db_metrics
1226            .op_metrics
1227            .rocksdb_iter_latency_seconds
1228            .with_label_values(&[&self.cf])
1229            .start_timer();
1230        let bytes_scanned = self
1231            .db_metrics
1232            .op_metrics
1233            .rocksdb_iter_bytes
1234            .with_label_values(&[&self.cf]);
1235        let keys_scanned = self
1236            .db_metrics
1237            .op_metrics
1238            .rocksdb_iter_keys
1239            .with_label_values(&[&self.cf]);
1240        let perf_ctx = if self.iter_sample_interval.sample() {
1241            Some(RocksDBPerfContext)
1242        } else {
1243            None
1244        };
1245        (
1246            Some(timer),
1247            Some(bytes_scanned),
1248            Some(keys_scanned),
1249            perf_ctx,
1250        )
1251    }
1252
1253    // Creates a RocksDB read option with specified lower and upper bounds.
1254    /// Lower bound is inclusive, and upper bound is exclusive.
1255    fn create_read_options_with_bounds(
1256        &self,
1257        lower_bound: Option<K>,
1258        upper_bound: Option<K>,
1259    ) -> ReadOptions
1260    where
1261        K: Serialize,
1262    {
1263        let mut readopts = self.opts.readopts();
1264        if let Some(lower_bound) = lower_bound {
1265            let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1266            readopts.set_iterate_lower_bound(key_buf);
1267        }
1268        if let Some(upper_bound) = upper_bound {
1269            let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1270            readopts.set_iterate_upper_bound(key_buf);
1271        }
1272        readopts
1273    }
1274
1275    // Creates a RocksDB read option with lower and upper bounds set corresponding
1276    // to `range`.
1277    fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1278    where
1279        K: Serialize,
1280    {
1281        let mut readopts = self.opts.readopts();
1282
1283        let lower_bound = range.start_bound();
1284        let upper_bound = range.end_bound();
1285
1286        match lower_bound {
1287            Bound::Included(lower_bound) => {
1288                // Rocksdb lower bound is inclusive by default so nothing to do
1289                let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1290                readopts.set_iterate_lower_bound(key_buf);
1291            }
1292            Bound::Excluded(lower_bound) => {
1293                let mut key_buf =
1294                    be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1295
1296                // Since we want exclusive, we need to increment the key to exclude the previous
1297                big_endian_saturating_add_one(&mut key_buf);
1298                readopts.set_iterate_lower_bound(key_buf);
1299            }
1300            Bound::Unbounded => (),
1301        };
1302
1303        match upper_bound {
1304            Bound::Included(upper_bound) => {
1305                let mut key_buf =
1306                    be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1307
1308                // If the key is already at the limit, there's nowhere else to go, so no upper
1309                // bound
1310                if !is_max(&key_buf) {
1311                    // Since we want exclusive, we need to increment the key to get the upper bound
1312                    big_endian_saturating_add_one(&mut key_buf);
1313                    readopts.set_iterate_upper_bound(key_buf);
1314                }
1315            }
1316            Bound::Excluded(upper_bound) => {
1317                // Rocksdb upper bound is inclusive by default so nothing to do
1318                let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1319                readopts.set_iterate_upper_bound(key_buf);
1320            }
1321            Bound::Unbounded => (),
1322        };
1323
1324        readopts
1325    }
1326}
1327
1328/// Provides a mutable struct to form a collection of database write operations,
1329/// and execute them.
1330///
1331/// Batching write and delete operations is faster than performing them one by
1332/// one and ensures their atomicity,  ie. they are all written or none is.
1333/// This is also true of operations across column families in the same database.
1334///
1335/// Serializations / Deserialization, and naming of column families is performed
1336/// by passing a DBMap<K,V> with each operation.
1337///
1338/// ```
1339/// use core::fmt::Error;
1340/// use std::sync::Arc;
1341///
1342/// use prometheus::Registry;
1343/// use tempfile::tempdir;
1344/// use typed_store::{Map, metrics::DBMetrics, rocks::*};
1345///
1346/// #[tokio::main]
1347/// async fn main() -> Result<(), Error> {
1348///     let rocks = open_cf(
1349///         tempfile::tempdir().unwrap(),
1350///         None,
1351///         MetricConf::default(),
1352///         &["First_CF", "Second_CF"],
1353///     )
1354///     .unwrap();
1355///
1356///     let db_cf_1 = DBMap::reopen(
1357///         &rocks,
1358///         Some("First_CF"),
1359///         &ReadWriteOptions::default(),
1360///         false,
1361///     )
1362///     .expect("Failed to open storage");
1363///     let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1364///
1365///     let db_cf_2 = DBMap::reopen(
1366///         &rocks,
1367///         Some("Second_CF"),
1368///         &ReadWriteOptions::default(),
1369///         false,
1370///     )
1371///     .expect("Failed to open storage");
1372///     let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1373///
1374///     let mut batch = db_cf_1.batch();
1375///     batch
1376///         .insert_batch(&db_cf_1, keys_vals_1.clone())
1377///         .expect("Failed to batch insert")
1378///         .insert_batch(&db_cf_2, keys_vals_2.clone())
1379///         .expect("Failed to batch insert");
1380///
1381///     let _ = batch.write().expect("Failed to execute batch");
1382///     for (k, v) in keys_vals_1 {
1383///         let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1384///         assert_eq!(Some(v), val);
1385///     }
1386///
1387///     for (k, v) in keys_vals_2 {
1388///         let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1389///         assert_eq!(Some(v), val);
1390///     }
1391///     Ok(())
1392/// }
1393/// ```
1394pub struct DBBatch {
1395    rocksdb: Arc<RocksDB>,
1396    batch: RocksDBBatch,
1397    opts: WriteOptions,
1398    db_metrics: Arc<DBMetrics>,
1399    write_sample_interval: SamplingInterval,
1400}
1401
1402impl DBBatch {
1403    /// Create a new batch associated with a DB reference.
1404    ///
1405    /// Use `open_cf` to get the DB reference or an existing open database.
1406    pub fn new(
1407        dbref: &Arc<RocksDB>,
1408        batch: RocksDBBatch,
1409        opts: WriteOptions,
1410        db_metrics: &Arc<DBMetrics>,
1411        write_sample_interval: &SamplingInterval,
1412    ) -> Self {
1413        DBBatch {
1414            rocksdb: dbref.clone(),
1415            batch,
1416            opts,
1417            db_metrics: db_metrics.clone(),
1418            write_sample_interval: write_sample_interval.clone(),
1419        }
1420    }
1421
1422    /// Consume the batch and write its operations to the database
1423    #[instrument(level = "trace", skip_all, err)]
1424    pub fn write(self) -> Result<(), TypedStoreError> {
1425        let db_name = self.rocksdb.db_name();
1426        let timer = self
1427            .db_metrics
1428            .op_metrics
1429            .rocksdb_batch_commit_latency_seconds
1430            .with_label_values(&[&db_name])
1431            .start_timer();
1432        let batch_size = self.batch.size_in_bytes();
1433
1434        let perf_ctx = if self.write_sample_interval.sample() {
1435            Some(RocksDBPerfContext)
1436        } else {
1437            None
1438        };
1439        self.rocksdb.write(self.batch, &self.opts)?;
1440        self.db_metrics
1441            .op_metrics
1442            .rocksdb_batch_commit_bytes
1443            .with_label_values(&[&db_name])
1444            .observe(batch_size as f64);
1445
1446        if perf_ctx.is_some() {
1447            self.db_metrics
1448                .write_perf_ctx_metrics
1449                .report_metrics(&db_name);
1450        }
1451        let elapsed = timer.stop_and_record();
1452        if elapsed > 1.0 {
1453            warn!(?elapsed, ?db_name, "very slow batch write");
1454            self.db_metrics
1455                .op_metrics
1456                .rocksdb_very_slow_batch_writes_count
1457                .with_label_values(&[&db_name])
1458                .inc();
1459            self.db_metrics
1460                .op_metrics
1461                .rocksdb_very_slow_batch_writes_duration_ms
1462                .with_label_values(&[&db_name])
1463                .inc_by((elapsed * 1000.0) as u64);
1464        }
1465        Ok(())
1466    }
1467
1468    pub fn size_in_bytes(&self) -> usize {
1469        self.batch.size_in_bytes()
1470    }
1471}
1472
1473// TODO: Remove this entire implementation once we switch to sally
1474impl DBBatch {
1475    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1476        &mut self,
1477        db: &DBMap<K, V>,
1478        purged_vals: impl IntoIterator<Item = J>,
1479    ) -> Result<(), TypedStoreError> {
1480        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1481            return Err(TypedStoreError::CrossDBBatch);
1482        }
1483
1484        purged_vals
1485            .into_iter()
1486            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1487                let k_buf = be_fix_int_ser(k.borrow())?;
1488                self.batch.delete_cf(&db.cf(), k_buf);
1489
1490                Ok(())
1491            })?;
1492        Ok(())
1493    }
1494
1495    /// Deletes a range of keys between `from` (inclusive) and `to`
1496    /// (non-inclusive) by writing a range delete tombstone in the db map
1497    /// If the DBMap is configured with ignore_range_deletions set to false,
1498    /// the effect of this write will be visible immediately i.e. you won't
1499    /// see old values when you do a lookup or scan. But if it is configured
1500    /// with ignore_range_deletions set to true, the old value are visible until
1501    /// compaction actually deletes them which will happen sometime after. By
1502    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1503    /// overridden in the config), so please use this function with caution
1504    pub fn schedule_delete_range<K: Serialize, V>(
1505        &mut self,
1506        db: &DBMap<K, V>,
1507        from: &K,
1508        to: &K,
1509    ) -> Result<(), TypedStoreError> {
1510        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1511            return Err(TypedStoreError::CrossDBBatch);
1512        }
1513
1514        let from_buf = be_fix_int_ser(from)?;
1515        let to_buf = be_fix_int_ser(to)?;
1516
1517        self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1518        Ok(())
1519    }
1520
1521    /// inserts a range of (key, value) pairs given as an iterator
1522    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1523        &mut self,
1524        db: &DBMap<K, V>,
1525        new_vals: impl IntoIterator<Item = (J, U)>,
1526    ) -> Result<&mut Self, TypedStoreError> {
1527        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1528            return Err(TypedStoreError::CrossDBBatch);
1529        }
1530        let mut total = 0usize;
1531        new_vals
1532            .into_iter()
1533            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1534                let k_buf = be_fix_int_ser(k.borrow())?;
1535                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1536                total += k_buf.len() + v_buf.len();
1537                self.batch.put_cf(&db.cf(), k_buf, v_buf);
1538                Ok(())
1539            })?;
1540        self.db_metrics
1541            .op_metrics
1542            .rocksdb_batch_put_bytes
1543            .with_label_values(&[&db.cf])
1544            .observe(total as f64);
1545        Ok(self)
1546    }
1547
1548    /// merges a range of (key, value) pairs given as an iterator
1549    pub fn merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1550        &mut self,
1551        db: &DBMap<K, V>,
1552        new_vals: impl IntoIterator<Item = (J, U)>,
1553    ) -> Result<&mut Self, TypedStoreError> {
1554        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1555            return Err(TypedStoreError::CrossDBBatch);
1556        }
1557
1558        new_vals
1559            .into_iter()
1560            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1561                let k_buf = be_fix_int_ser(k.borrow())?;
1562                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1563                self.batch.merge_cf(&db.cf(), k_buf, v_buf);
1564                Ok(())
1565            })?;
1566        Ok(self)
1567    }
1568
1569    /// similar to `merge_batch` but allows merge with partial values
1570    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, V: Serialize, B: AsRef<[u8]>>(
1571        &mut self,
1572        db: &DBMap<K, V>,
1573        new_vals: impl IntoIterator<Item = (J, B)>,
1574    ) -> Result<&mut Self, TypedStoreError> {
1575        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1576            return Err(TypedStoreError::CrossDBBatch);
1577        }
1578        new_vals
1579            .into_iter()
1580            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1581                let k_buf = be_fix_int_ser(k.borrow())?;
1582                self.batch.merge_cf(&db.cf(), k_buf, v);
1583                Ok(())
1584            })?;
1585        Ok(self)
1586    }
1587}
1588
1589pub struct DBTransaction<'a> {
1590    rocksdb: Arc<RocksDB>,
1591    transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1592}
1593
1594impl<'a> DBTransaction<'a> {
1595    pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1596        Ok(Self {
1597            rocksdb: db.clone(),
1598            transaction: db.transaction()?,
1599        })
1600    }
1601
1602    pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1603        Ok(Self {
1604            rocksdb: db.clone(),
1605            transaction: db.transaction_without_snapshot()?,
1606        })
1607    }
1608
1609    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1610        &mut self,
1611        db: &DBMap<K, V>,
1612        new_vals: impl IntoIterator<Item = (J, U)>,
1613    ) -> Result<&mut Self, TypedStoreError> {
1614        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1615            return Err(TypedStoreError::CrossDBBatch);
1616        }
1617
1618        new_vals
1619            .into_iter()
1620            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1621                let k_buf = be_fix_int_ser(k.borrow())?;
1622                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1623                self.transaction
1624                    .put_cf(&db.cf(), k_buf, v_buf)
1625                    .map_err(typed_store_err_from_rocks_err)?;
1626                Ok(())
1627            })?;
1628        Ok(self)
1629    }
1630
1631    /// Deletes a set of keys given as an iterator
1632    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1633        &mut self,
1634        db: &DBMap<K, V>,
1635        purged_vals: impl IntoIterator<Item = J>,
1636    ) -> Result<&mut Self, TypedStoreError> {
1637        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1638            return Err(TypedStoreError::CrossDBBatch);
1639        }
1640        purged_vals
1641            .into_iter()
1642            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1643                let k_buf = be_fix_int_ser(k.borrow())?;
1644                self.transaction
1645                    .delete_cf(&db.cf(), k_buf)
1646                    .map_err(typed_store_err_from_rocks_err)?;
1647                Ok(())
1648            })?;
1649        Ok(self)
1650    }
1651
1652    pub fn snapshot(
1653        &self,
1654    ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1655    {
1656        self.transaction.snapshot()
1657    }
1658
1659    pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1660        &self,
1661        db: &DBMap<K, V>,
1662        key: &K,
1663    ) -> Result<Option<V>, TypedStoreError> {
1664        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1665            return Err(TypedStoreError::CrossDBBatch);
1666        }
1667        let k_buf = be_fix_int_ser(key)?;
1668        match self
1669            .transaction
1670            .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1671            .map_err(typed_store_err_from_rocks_err)?
1672        {
1673            Some(data) => Ok(Some(
1674                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1675            )),
1676            None => Ok(None),
1677        }
1678    }
1679
1680    pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1681        &self,
1682        db: &DBMap<K, V>,
1683        key: &K,
1684    ) -> Result<Option<V>, TypedStoreError> {
1685        let key_buf = be_fix_int_ser(key)?;
1686        self.transaction
1687            .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1688            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1689            .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1690    }
1691
1692    pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1693        &self,
1694        db: &DBMap<K, V>,
1695        keys: impl IntoIterator<Item = J>,
1696    ) -> Result<Vec<Option<V>>, TypedStoreError> {
1697        let cf = db.cf();
1698        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1699            .into_iter()
1700            .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1701            .collect();
1702
1703        let results = self
1704            .transaction
1705            .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1706
1707        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1708            .into_iter()
1709            .map(
1710                |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1711                    Some(data) => Ok(Some(
1712                        bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1713                    )),
1714                    None => Ok(None),
1715                },
1716            )
1717            .collect();
1718
1719        values_parsed
1720    }
1721
1722    pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1723        &'a self,
1724        db: &DBMap<K, V>,
1725    ) -> Iter<'a, K, V> {
1726        let db_iter = self
1727            .transaction
1728            .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1729        Iter::new(
1730            db.cf.clone(),
1731            RocksDBRawIter::OptimisticTransaction(db_iter),
1732            None,
1733            None,
1734            None,
1735            None,
1736            None,
1737        )
1738    }
1739
1740    pub fn keys<K: DeserializeOwned, V: DeserializeOwned>(
1741        &'a self,
1742        db: &DBMap<K, V>,
1743    ) -> Keys<'a, K> {
1744        let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1745            self.transaction
1746                .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1747        );
1748        db_iter.seek_to_first();
1749
1750        Keys::new(db_iter)
1751    }
1752
1753    pub fn values<K: DeserializeOwned, V: DeserializeOwned>(
1754        &'a self,
1755        db: &DBMap<K, V>,
1756    ) -> Values<'a, V> {
1757        let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1758            self.transaction
1759                .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1760        );
1761        db_iter.seek_to_first();
1762
1763        Values::new(db_iter)
1764    }
1765
1766    pub fn commit(self) -> Result<(), TypedStoreError> {
1767        fail_point!("transaction-commit");
1768        self.transaction.commit().map_err(|e| match e.kind() {
1769            // empirically, this is what you get when there is a write conflict. it is not
1770            // documented whether this is the only time you can get this error.
1771            ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1772            _ => typed_store_err_from_rocks_err(e),
1773        })?;
1774        Ok(())
1775    }
1776}
1777
1778macro_rules! delegate_iter_call {
1779    ($self:ident.$method:ident($($args:ident),*)) => {
1780        match $self {
1781            Self::DB(db) => db.$method($($args),*),
1782            Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1783            Self::OptimisticTransaction(db) => db.$method($($args),*),
1784        }
1785    }
1786}
1787
1788pub enum RocksDBRawIter<'a> {
1789    DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1790    OptimisticTransactionDB(
1791        rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1792    ),
1793    OptimisticTransaction(
1794        rocksdb::DBRawIteratorWithThreadMode<
1795            'a,
1796            Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1797        >,
1798    ),
1799}
1800
1801impl RocksDBRawIter<'_> {
1802    pub fn valid(&self) -> bool {
1803        delegate_iter_call!(self.valid())
1804    }
1805    pub fn key(&self) -> Option<&[u8]> {
1806        delegate_iter_call!(self.key())
1807    }
1808    pub fn value(&self) -> Option<&[u8]> {
1809        delegate_iter_call!(self.value())
1810    }
1811    pub fn next(&mut self) {
1812        delegate_iter_call!(self.next())
1813    }
1814    pub fn prev(&mut self) {
1815        delegate_iter_call!(self.prev())
1816    }
1817    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1818        delegate_iter_call!(self.seek(key))
1819    }
1820    pub fn seek_to_last(&mut self) {
1821        delegate_iter_call!(self.seek_to_last())
1822    }
1823    pub fn seek_to_first(&mut self) {
1824        delegate_iter_call!(self.seek_to_first())
1825    }
1826    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1827        delegate_iter_call!(self.seek_for_prev(key))
1828    }
1829    pub fn status(&self) -> Result<(), rocksdb::Error> {
1830        delegate_iter_call!(self.status())
1831    }
1832}
1833
1834pub enum RocksDBIter<'a> {
1835    DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1836    OptimisticTransactionDB(
1837        rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1838    ),
1839}
1840
1841impl Iterator for RocksDBIter<'_> {
1842    type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1843    fn next(&mut self) -> Option<Self::Item> {
1844        match self {
1845            Self::DB(db) => db.next(),
1846            Self::OptimisticTransactionDB(db) => db.next(),
1847        }
1848    }
1849}
1850
1851impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1852where
1853    K: Serialize + DeserializeOwned,
1854    V: Serialize + DeserializeOwned,
1855{
1856    type Error = TypedStoreError;
1857    type Iterator = Iter<'a, K, V>;
1858    type SafeIterator = SafeIter<'a, K, V>;
1859    type Keys = Keys<'a, K>;
1860    type Values = Values<'a, V>;
1861
1862    #[instrument(level = "trace", skip_all, err)]
1863    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1864        let key_buf = be_fix_int_ser(key)?;
1865        // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
1866        // but no false negatives. We use it to short-circuit the absent case
1867        let readopts = self.opts.readopts();
1868        Ok(self
1869            .rocksdb
1870            .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1871            && self
1872                .rocksdb
1873                .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1874                .map_err(typed_store_err_from_rocks_err)?
1875                .is_some())
1876    }
1877
1878    #[instrument(level = "trace", skip_all, err)]
1879    fn multi_contains_keys<J>(
1880        &self,
1881        keys: impl IntoIterator<Item = J>,
1882    ) -> Result<Vec<bool>, Self::Error>
1883    where
1884        J: Borrow<K>,
1885    {
1886        let values = self.multi_get_pinned(keys)?;
1887        Ok(values.into_iter().map(|v| v.is_some()).collect())
1888    }
1889
1890    #[instrument(level = "trace", skip_all, err)]
1891    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1892        let _timer = self
1893            .db_metrics
1894            .op_metrics
1895            .rocksdb_get_latency_seconds
1896            .with_label_values(&[&self.cf])
1897            .start_timer();
1898        let perf_ctx = if self.get_sample_interval.sample() {
1899            Some(RocksDBPerfContext)
1900        } else {
1901            None
1902        };
1903        let key_buf = be_fix_int_ser(key)?;
1904        let res = self
1905            .rocksdb
1906            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1907            .map_err(typed_store_err_from_rocks_err)?;
1908        self.db_metrics
1909            .op_metrics
1910            .rocksdb_get_bytes
1911            .with_label_values(&[&self.cf])
1912            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1913        if perf_ctx.is_some() {
1914            self.db_metrics
1915                .read_perf_ctx_metrics
1916                .report_metrics(&self.cf);
1917        }
1918        match res {
1919            Some(data) => Ok(Some(
1920                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1921            )),
1922            None => Ok(None),
1923        }
1924    }
1925
1926    #[instrument(level = "trace", skip_all, err)]
1927    fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
1928        let _timer = self
1929            .db_metrics
1930            .op_metrics
1931            .rocksdb_get_latency_seconds
1932            .with_label_values(&[&self.cf])
1933            .start_timer();
1934        let perf_ctx = if self.get_sample_interval.sample() {
1935            Some(RocksDBPerfContext)
1936        } else {
1937            None
1938        };
1939        let key_buf = be_fix_int_ser(key)?;
1940        let res = self
1941            .rocksdb
1942            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1943            .map_err(typed_store_err_from_rocks_err)?;
1944        self.db_metrics
1945            .op_metrics
1946            .rocksdb_get_bytes
1947            .with_label_values(&[&self.cf])
1948            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1949        if perf_ctx.is_some() {
1950            self.db_metrics
1951                .read_perf_ctx_metrics
1952                .report_metrics(&self.cf);
1953        }
1954        match res {
1955            Some(data) => Ok(Some(data.to_vec())),
1956            None => Ok(None),
1957        }
1958    }
1959
1960    #[instrument(level = "trace", skip_all, err)]
1961    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1962        let timer = self
1963            .db_metrics
1964            .op_metrics
1965            .rocksdb_put_latency_seconds
1966            .with_label_values(&[&self.cf])
1967            .start_timer();
1968        let perf_ctx = if self.write_sample_interval.sample() {
1969            Some(RocksDBPerfContext)
1970        } else {
1971            None
1972        };
1973        let key_buf = be_fix_int_ser(key)?;
1974        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1975        self.db_metrics
1976            .op_metrics
1977            .rocksdb_put_bytes
1978            .with_label_values(&[&self.cf])
1979            .observe((key_buf.len() + value_buf.len()) as f64);
1980        if perf_ctx.is_some() {
1981            self.db_metrics
1982                .write_perf_ctx_metrics
1983                .report_metrics(&self.cf);
1984        }
1985        self.rocksdb
1986            .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
1987            .map_err(typed_store_err_from_rocks_err)?;
1988
1989        let elapsed = timer.stop_and_record();
1990        if elapsed > 1.0 {
1991            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1992            self.db_metrics
1993                .op_metrics
1994                .rocksdb_very_slow_puts_count
1995                .with_label_values(&[&self.cf])
1996                .inc();
1997            self.db_metrics
1998                .op_metrics
1999                .rocksdb_very_slow_puts_duration_ms
2000                .with_label_values(&[&self.cf])
2001                .inc_by((elapsed * 1000.0) as u64);
2002        }
2003
2004        Ok(())
2005    }
2006
2007    #[instrument(level = "trace", skip_all, err)]
2008    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
2009        let _timer = self
2010            .db_metrics
2011            .op_metrics
2012            .rocksdb_delete_latency_seconds
2013            .with_label_values(&[&self.cf])
2014            .start_timer();
2015        let perf_ctx = if self.write_sample_interval.sample() {
2016            Some(RocksDBPerfContext)
2017        } else {
2018            None
2019        };
2020        let key_buf = be_fix_int_ser(key)?;
2021        self.rocksdb
2022            .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2023            .map_err(typed_store_err_from_rocks_err)?;
2024        self.db_metrics
2025            .op_metrics
2026            .rocksdb_deletes
2027            .with_label_values(&[&self.cf])
2028            .inc();
2029        if perf_ctx.is_some() {
2030            self.db_metrics
2031                .write_perf_ctx_metrics
2032                .report_metrics(&self.cf);
2033        }
2034        Ok(())
2035    }
2036
2037    /// This method first drops the existing column family and then creates a
2038    /// new one with the same name. The two operations are not atomic and
2039    /// hence it is possible to get into a race condition where the column
2040    /// family has been dropped but new one is not created yet
2041    #[instrument(level = "trace", skip_all, err)]
2042    fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2043        let _ = self.rocksdb.drop_cf(&self.cf);
2044        self.rocksdb
2045            .create_cf(self.cf.clone(), &default_db_options().options)
2046            .map_err(typed_store_err_from_rocks_err)?;
2047        Ok(())
2048    }
2049
2050    /// Writes a range delete tombstone to delete all entries in the db map
2051    /// If the DBMap is configured with ignore_range_deletions set to false,
2052    /// the effect of this write will be visible immediately i.e. you won't
2053    /// see old values when you do a lookup or scan. But if it is configured
2054    /// with ignore_range_deletions set to true, the old value are visible until
2055    /// compaction actually deletes them which will happen sometime after. By
2056    /// default ignore_range_deletions is set to true on a DBMap (unless it is
2057    /// overridden in the config), so please use this function with caution
2058    #[instrument(level = "trace", skip_all, err)]
2059    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2060        let mut iter = self.unbounded_iter().seek_to_first();
2061        let first_key = iter.next().map(|(k, _v)| k);
2062        let last_key = iter.skip_to_last().next().map(|(k, _v)| k);
2063        if let Some((first_key, last_key)) = first_key.zip(last_key) {
2064            let mut batch = self.batch();
2065            batch.schedule_delete_range(self, &first_key, &last_key)?;
2066            batch.write()?;
2067        }
2068        Ok(())
2069    }
2070
2071    fn is_empty(&self) -> bool {
2072        self.safe_iter().next().is_none()
2073    }
2074
2075    /// Returns an unbounded iterator visiting each key-value pair in the map.
2076    /// This is potentially unsafe as it can perform a full table scan
2077    fn unbounded_iter(&'a self) -> Self::Iterator {
2078        let db_iter = self
2079            .rocksdb
2080            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2081        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2082        Iter::new(
2083            self.cf.clone(),
2084            db_iter,
2085            _timer,
2086            _perf_ctx,
2087            bytes_scanned,
2088            keys_scanned,
2089            Some(self.db_metrics.clone()),
2090        )
2091    }
2092
2093    /// Returns an iterator visiting each key-value pair in the map. By proving
2094    /// bounds of the scan range, RocksDB scan avoid unnecessary scans.
2095    /// Lower bound is inclusive, while upper bound is exclusive.
2096    fn iter_with_bounds(
2097        &'a self,
2098        lower_bound: Option<K>,
2099        upper_bound: Option<K>,
2100    ) -> Self::Iterator {
2101        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2102        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2103        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2104        Iter::new(
2105            self.cf.clone(),
2106            db_iter,
2107            _timer,
2108            _perf_ctx,
2109            bytes_scanned,
2110            keys_scanned,
2111            Some(self.db_metrics.clone()),
2112        )
2113    }
2114
2115    /// Similar to `iter_with_bounds` but allows specifying
2116    /// inclusivity/exclusivity of ranges explicitly. TODO: find better name
2117    fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2118        let readopts = self.create_read_options_with_range(range);
2119        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2120        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2121        Iter::new(
2122            self.cf.clone(),
2123            db_iter,
2124            _timer,
2125            _perf_ctx,
2126            bytes_scanned,
2127            keys_scanned,
2128            Some(self.db_metrics.clone()),
2129        )
2130    }
2131
2132    fn safe_iter(&'a self) -> Self::SafeIterator {
2133        let db_iter = self
2134            .rocksdb
2135            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2136        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2137        SafeIter::new(
2138            self.cf.clone(),
2139            db_iter,
2140            _timer,
2141            _perf_ctx,
2142            bytes_scanned,
2143            keys_scanned,
2144            Some(self.db_metrics.clone()),
2145        )
2146    }
2147
2148    fn safe_iter_with_bounds(
2149        &'a self,
2150        lower_bound: Option<K>,
2151        upper_bound: Option<K>,
2152    ) -> Self::SafeIterator {
2153        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2154        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2155        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2156        SafeIter::new(
2157            self.cf.clone(),
2158            db_iter,
2159            _timer,
2160            _perf_ctx,
2161            bytes_scanned,
2162            keys_scanned,
2163            Some(self.db_metrics.clone()),
2164        )
2165    }
2166
2167    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2168        let readopts = self.create_read_options_with_range(range);
2169        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2170        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2171        SafeIter::new(
2172            self.cf.clone(),
2173            db_iter,
2174            _timer,
2175            _perf_ctx,
2176            bytes_scanned,
2177            keys_scanned,
2178            Some(self.db_metrics.clone()),
2179        )
2180    }
2181
2182    fn keys(&'a self) -> Self::Keys {
2183        let mut db_iter = self
2184            .rocksdb
2185            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2186        db_iter.seek_to_first();
2187
2188        Keys::new(db_iter)
2189    }
2190
2191    fn values(&'a self) -> Self::Values {
2192        let mut db_iter = self
2193            .rocksdb
2194            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2195        db_iter.seek_to_first();
2196
2197        Values::new(db_iter)
2198    }
2199
2200    /// Returns a vector of raw values corresponding to the keys provided.
2201    #[instrument(level = "trace", skip_all, err)]
2202    fn multi_get_raw_bytes<J>(
2203        &self,
2204        keys: impl IntoIterator<Item = J>,
2205    ) -> Result<Vec<Option<Vec<u8>>>, TypedStoreError>
2206    where
2207        J: Borrow<K>,
2208    {
2209        let results = self
2210            .multi_get_pinned(keys)?
2211            .into_iter()
2212            .map(|val| val.map(|v| v.to_vec()))
2213            .collect();
2214        Ok(results)
2215    }
2216
2217    /// Returns a vector of values corresponding to the keys provided.
2218    #[instrument(level = "trace", skip_all, err)]
2219    fn multi_get<J>(
2220        &self,
2221        keys: impl IntoIterator<Item = J>,
2222    ) -> Result<Vec<Option<V>>, TypedStoreError>
2223    where
2224        J: Borrow<K>,
2225    {
2226        let results = self.multi_get_pinned(keys)?;
2227        let values_parsed: Result<Vec<_>, TypedStoreError> = results
2228            .into_iter()
2229            .map(|value_byte| match value_byte {
2230                Some(data) => Ok(Some(
2231                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2232                )),
2233                None => Ok(None),
2234            })
2235            .collect();
2236
2237        values_parsed
2238    }
2239
2240    /// Returns a vector of values corresponding to the keys provided.
2241    #[instrument(level = "trace", skip_all, err)]
2242    fn chunked_multi_get<J>(
2243        &self,
2244        keys: impl IntoIterator<Item = J>,
2245        chunk_size: usize,
2246    ) -> Result<Vec<Option<V>>, TypedStoreError>
2247    where
2248        J: Borrow<K>,
2249    {
2250        let cf = self.cf();
2251        let keys_bytes = keys
2252            .into_iter()
2253            .map(|k| (&cf, be_fix_int_ser(k.borrow()).unwrap()));
2254        let chunked_keys = keys_bytes.into_iter().chunks(chunk_size);
2255        let snapshot = self.snapshot()?;
2256        let mut results = vec![];
2257        for chunk in chunked_keys.into_iter() {
2258            let chunk_result = snapshot.multi_get_cf(chunk);
2259            let values_parsed: Result<Vec<_>, TypedStoreError> = chunk_result
2260                .into_iter()
2261                .map(|value_byte| {
2262                    let value_byte = value_byte.map_err(typed_store_err_from_rocks_err)?;
2263                    match value_byte {
2264                        Some(data) => Ok(Some(
2265                            bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2266                        )),
2267                        None => Ok(None),
2268                    }
2269                })
2270                .collect();
2271            results.extend(values_parsed?);
2272        }
2273        Ok(results)
2274    }
2275
2276    /// Convenience method for batch insertion
2277    #[instrument(level = "trace", skip_all, err)]
2278    fn multi_insert<J, U>(
2279        &self,
2280        key_val_pairs: impl IntoIterator<Item = (J, U)>,
2281    ) -> Result<(), Self::Error>
2282    where
2283        J: Borrow<K>,
2284        U: Borrow<V>,
2285    {
2286        let mut batch = self.batch();
2287        batch.insert_batch(self, key_val_pairs)?;
2288        batch.write()
2289    }
2290
2291    /// Convenience method for batch removal
2292    #[instrument(level = "trace", skip_all, err)]
2293    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2294    where
2295        J: Borrow<K>,
2296    {
2297        let mut batch = self.batch();
2298        batch.delete_batch(self, keys)?;
2299        batch.write()
2300    }
2301
2302    /// Try to catch up with primary when running as secondary
2303    #[instrument(level = "trace", skip_all, err)]
2304    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2305        self.rocksdb
2306            .try_catch_up_with_primary()
2307            .map_err(typed_store_err_from_rocks_err)
2308    }
2309}
2310
2311impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2312where
2313    J: Borrow<K>,
2314    U: Borrow<V>,
2315    K: Serialize,
2316    V: Serialize,
2317{
2318    type Error = TypedStoreError;
2319
2320    fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2321    where
2322        T: Iterator<Item = (J, U)>,
2323    {
2324        let mut batch = self.batch();
2325        batch.insert_batch(self, iter)?;
2326        batch.write()
2327    }
2328
2329    fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2330        let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2331        let mut batch = self.batch();
2332        batch.insert_batch(self, slice_of_refs)?;
2333        batch.write()
2334    }
2335}
2336
2337pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2338    env::var(var_name)
2339        .ok()?
2340        .parse::<usize>()
2341        .tap_err(|e| {
2342            warn!(
2343                "Env var {} does not contain valid usize integer: {}",
2344                var_name, e
2345            )
2346        })
2347        .ok()
2348}
2349
2350#[derive(Clone, Debug)]
2351pub struct ReadWriteOptions {
2352    pub ignore_range_deletions: bool,
2353    // Whether to sync to disk on every write.
2354    sync_to_disk: bool,
2355}
2356
2357impl ReadWriteOptions {
2358    pub fn readopts(&self) -> ReadOptions {
2359        let mut readopts = ReadOptions::default();
2360        readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2361        readopts
2362    }
2363
2364    pub fn writeopts(&self) -> WriteOptions {
2365        let mut opts = WriteOptions::default();
2366        opts.set_sync(self.sync_to_disk);
2367        opts
2368    }
2369
2370    pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2371        self.ignore_range_deletions = ignore;
2372        self
2373    }
2374}
2375
2376impl Default for ReadWriteOptions {
2377    fn default() -> Self {
2378        Self {
2379            ignore_range_deletions: true,
2380            sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2381        }
2382    }
2383}
2384// TODO: refactor this into a builder pattern, where rocksdb::Options are
2385// generated after a call to build().
2386#[derive(Default, Clone)]
2387pub struct DBOptions {
2388    pub options: rocksdb::Options,
2389    pub rw_options: ReadWriteOptions,
2390}
2391
2392impl DBOptions {
2393    // Optimize lookup perf for tables where no scans are performed.
2394    // If non-trivial number of values can be > 512B in size, it is beneficial to
2395    // also specify optimize_for_large_values_no_scan().
2396    pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2397        // NOTE: this overwrites the block options.
2398        self.options
2399            .optimize_for_point_lookup(block_cache_size_mb as u64);
2400        self
2401    }
2402
2403    // Optimize write and lookup perf for tables which are rarely scanned, and have
2404    // large values. https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html
2405    pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2406        if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2407            info!("Large value blob storage optimization is disabled via env var.");
2408            return self;
2409        }
2410
2411        // Blob settings.
2412        self.options.set_enable_blob_files(true);
2413        self.options
2414            .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2415        self.options.set_enable_blob_gc(true);
2416        // Since each blob can have non-trivial size overhead, and compression does not
2417        // work across blobs, set a min blob size in bytes to so small
2418        // transactions and effects are kept in sst files.
2419        self.options.set_min_blob_size(min_blob_size);
2420
2421        // Increase write buffer size to 256MiB.
2422        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2423            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2424            * 1024
2425            * 1024;
2426        self.options.set_write_buffer_size(write_buffer_size);
2427        // Since large blobs are not in sst files, reduce the target file size and base
2428        // level target size.
2429        let target_file_size_base = 64 << 20;
2430        self.options
2431            .set_target_file_size_base(target_file_size_base);
2432        // Level 1 default to 64MiB * 4 ~ 256MiB.
2433        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2434            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2435        self.options
2436            .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2437
2438        self
2439    }
2440
2441    // Optimize tables with a mix of lookup and scan workloads.
2442    pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2443        self.options
2444            .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2445        self
2446    }
2447
2448    // Optimize DB receiving significant insertions.
2449    pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2450        self.options
2451            .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2452        self.options
2453            .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2454        self
2455    }
2456
2457    // Optimize tables receiving significant insertions.
2458    pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2459        // Increase write buffer size to 256MiB.
2460        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2461            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2462            * 1024
2463            * 1024;
2464        self.options.set_write_buffer_size(write_buffer_size);
2465        // Increase write buffers to keep to 6 before slowing down writes.
2466        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2467            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2468        self.options
2469            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2470        // Keep 1 write buffer so recent writes can be read from memory.
2471        self.options
2472            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2473
2474        // Increase compaction trigger for level 0 to 6.
2475        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2476            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2477        self.options.set_level_zero_file_num_compaction_trigger(
2478            max_level_zero_file_num.try_into().unwrap(),
2479        );
2480        self.options.set_level_zero_slowdown_writes_trigger(
2481            (max_level_zero_file_num * 12).try_into().unwrap(),
2482        );
2483        self.options
2484            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2485
2486        // Increase sst file size to 128MiB.
2487        self.options.set_target_file_size_base(
2488            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2489                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2490                * 1024
2491                * 1024,
2492        );
2493
2494        // Increase level 1 target size to 256MiB * 6 ~ 1.5GiB.
2495        self.options
2496            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2497
2498        self
2499    }
2500
2501    // Optimize tables receiving significant insertions, without any deletions.
2502    // TODO: merge this function with optimize_for_write_throughput(), and use a
2503    // flag to indicate if deletion is received.
2504    pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2505        // Increase write buffer size to 256MiB.
2506        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2507            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2508            * 1024
2509            * 1024;
2510        self.options.set_write_buffer_size(write_buffer_size);
2511        // Increase write buffers to keep to 6 before slowing down writes.
2512        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2513            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2514        self.options
2515            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2516        // Keep 1 write buffer so recent writes can be read from memory.
2517        self.options
2518            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2519
2520        // Switch to universal compactions.
2521        self.options
2522            .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2523        let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2524        compaction_options.set_max_size_amplification_percent(10000);
2525        compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2526        self.options
2527            .set_universal_compaction_options(&compaction_options);
2528
2529        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2530            .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2531        self.options.set_level_zero_file_num_compaction_trigger(
2532            max_level_zero_file_num.try_into().unwrap(),
2533        );
2534        self.options.set_level_zero_slowdown_writes_trigger(
2535            (max_level_zero_file_num * 12).try_into().unwrap(),
2536        );
2537        self.options
2538            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2539
2540        // Increase sst file size to 128MiB.
2541        self.options.set_target_file_size_base(
2542            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2543                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2544                * 1024
2545                * 1024,
2546        );
2547
2548        // This should be a no-op for universal compaction but increasing it to be safe.
2549        self.options
2550            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2551
2552        self
2553    }
2554
2555    // Overrides the block options with different block cache size and block size.
2556    pub fn set_block_options(
2557        mut self,
2558        block_cache_size_mb: usize,
2559        block_size_bytes: usize,
2560    ) -> DBOptions {
2561        self.options
2562            .set_block_based_table_factory(&get_block_options(
2563                block_cache_size_mb,
2564                block_size_bytes,
2565            ));
2566        self
2567    }
2568
2569    // Disables write stalling and stopping based on pending compaction bytes.
2570    pub fn disable_write_throttling(mut self) -> DBOptions {
2571        self.options.set_soft_pending_compaction_bytes_limit(0);
2572        self.options.set_hard_pending_compaction_bytes_limit(0);
2573        self
2574    }
2575}
2576
2577/// Creates a default RocksDB option, to be used when RocksDB option is
2578/// unspecified.
2579pub fn default_db_options() -> DBOptions {
2580    let mut opt = rocksdb::Options::default();
2581
2582    // One common issue when running tests on Mac is that the default ulimit is too
2583    // low, leading to I/O errors such as "Too many open files". Raising fdlimit
2584    // to bypass it.
2585    if let Some(limit) = fdlimit::raise_fd_limit() {
2586        // on windows raise_fd_limit return None
2587        opt.set_max_open_files((limit / 8) as i32);
2588    }
2589
2590    // The table cache is locked for updates and this determines the number
2591    // of shards, ie 2^10. Increase in case of lock contentions.
2592    opt.set_table_cache_num_shard_bits(10);
2593
2594    // LSM compression settings
2595    opt.set_min_level_to_compress(2);
2596    opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2597    opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2598    opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2599
2600    // IOTA uses multiple RocksDB in a node, so total sizes of write buffers and WAL
2601    // can be higher than the limits below.
2602    //
2603    // RocksDB also exposes the option to configure total write buffer size across
2604    // multiple instances via `write_buffer_manager`. But the write buffer flush
2605    // policy (flushing the buffer receiving the next write) may not work well.
2606    // So sticking to per-db write buffer size limit for now.
2607    //
2608    // The environment variables are only meant to be emergency overrides. They may
2609    // go away in future. It is preferable to update the default value, or
2610    // override the option in code.
2611    opt.set_db_write_buffer_size(
2612        read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2613            * 1024
2614            * 1024,
2615    );
2616    opt.set_max_total_wal_size(
2617        read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2618    );
2619
2620    // Num threads for compactions and memtable flushes.
2621    opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2622
2623    opt.set_enable_pipelined_write(true);
2624
2625    // Increase block size to 16KiB.
2626    // https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md#indexes-and-filter-blocks
2627    opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2628
2629    // Set memtable bloomfilter.
2630    opt.set_memtable_prefix_bloom_ratio(0.02);
2631
2632    DBOptions {
2633        options: opt,
2634        rw_options: ReadWriteOptions::default(),
2635    }
2636}
2637
2638fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2639    // Set options mostly similar to those used in optimize_for_point_lookup(),
2640    // except non-default binary and hash index, to hopefully reduce lookup
2641    // latencies without causing any regression for scanning, with slightly more
2642    // memory usages. https://github.com/facebook/rocksdb/blob/11cb6af6e5009c51794641905ca40ce5beec7fee/options/options.cc#L611-L621
2643    let mut block_options = BlockBasedOptions::default();
2644    // Overrides block size.
2645    block_options.set_block_size(block_size_bytes);
2646    // Configure a block cache.
2647    block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2648    // Set a bloomfilter with 1% false positive rate.
2649    block_options.set_bloom_filter(10.0, false);
2650    // From https://github.com/EighteenZi/rocksdb_wiki/blob/master/Block-Cache.md#caching-index-and-filter-blocks
2651    block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2652    block_options
2653}
2654
2655/// Opens a database with options, and a number of column families that are
2656/// created if they do not exist.
2657#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2658pub fn open_cf<P: AsRef<Path>>(
2659    path: P,
2660    db_options: Option<rocksdb::Options>,
2661    metric_conf: MetricConf,
2662    opt_cfs: &[&str],
2663) -> Result<Arc<RocksDB>, TypedStoreError> {
2664    let options = db_options.unwrap_or_else(|| default_db_options().options);
2665    let column_descriptors: Vec<_> = opt_cfs
2666        .iter()
2667        .map(|name| (*name, options.clone()))
2668        .collect();
2669    open_cf_opts(
2670        path,
2671        Some(options.clone()),
2672        metric_conf,
2673        &column_descriptors[..],
2674    )
2675}
2676
2677fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2678    // Customize database options
2679    let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2680    options.create_if_missing(true);
2681    options.create_missing_column_families(true);
2682    options
2683}
2684
2685/// Opens a database with options, and a number of column families with
2686/// individual options that are created if they do not exist.
2687#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2688pub fn open_cf_opts<P: AsRef<Path>>(
2689    path: P,
2690    db_options: Option<rocksdb::Options>,
2691    metric_conf: MetricConf,
2692    opt_cfs: &[(&str, rocksdb::Options)],
2693) -> Result<Arc<RocksDB>, TypedStoreError> {
2694    let path = path.as_ref();
2695    // In the simulator, we intercept the wall clock in the test thread only. This
2696    // causes problems because rocksdb uses the simulated clock when creating
2697    // its background threads, but then those threads see the real wall clock
2698    // (because they are not the test thread), which causes rocksdb to panic.
2699    // The `nondeterministic` macro evaluates expressions in new threads, which
2700    // resolves the issue.
2701    //
2702    // This is a no-op in non-simulator builds.
2703
2704    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2705    nondeterministic!({
2706        let options = prepare_db_options(db_options);
2707        let rocksdb = {
2708            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2709                &options,
2710                path,
2711                cfs.into_iter()
2712                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2713            )
2714            .map_err(typed_store_err_from_rocks_err)?
2715        };
2716        Ok(Arc::new(RocksDB::DBWithThreadMode(
2717            DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2718        )))
2719    })
2720}
2721
2722/// Opens a database with options, and a number of column families with
2723/// individual options that are created if they do not exist.
2724#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2725pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2726    path: P,
2727    db_options: Option<rocksdb::Options>,
2728    metric_conf: MetricConf,
2729    opt_cfs: &[(&str, rocksdb::Options)],
2730) -> Result<Arc<RocksDB>, TypedStoreError> {
2731    let path = path.as_ref();
2732    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2733    // See comment above for explanation of why nondeterministic is necessary here.
2734    nondeterministic!({
2735        let options = prepare_db_options(db_options);
2736        let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2737            &options,
2738            path,
2739            cfs.into_iter()
2740                .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2741        )
2742        .map_err(typed_store_err_from_rocks_err)?;
2743        Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2744            OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2745        )))
2746    })
2747}
2748
2749/// Opens a database with options, and a number of column families with
2750/// individual options that are created if they do not exist.
2751pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2752    primary_path: P,
2753    secondary_path: Option<P>,
2754    db_options: Option<rocksdb::Options>,
2755    metric_conf: MetricConf,
2756    opt_cfs: &[(&str, rocksdb::Options)],
2757) -> Result<Arc<RocksDB>, TypedStoreError> {
2758    let primary_path = primary_path.as_ref();
2759    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2760    // See comment above for explanation of why nondeterministic is necessary here.
2761    nondeterministic!({
2762        // Customize database options
2763        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2764
2765        fdlimit::raise_fd_limit();
2766        // This is a requirement by RocksDB when opening as secondary
2767        options.set_max_open_files(-1);
2768
2769        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2770        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2771            .ok()
2772            .unwrap_or_default();
2773
2774        let default_db_options = default_db_options();
2775        // Add CFs not explicitly listed
2776        for cf_key in cfs.iter() {
2777            if !opt_cfs.contains_key(&cf_key[..]) {
2778                opt_cfs.insert(cf_key, default_db_options.options.clone());
2779            }
2780        }
2781
2782        let primary_path = primary_path.to_path_buf();
2783        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2784            let mut s = primary_path.clone();
2785            s.pop();
2786            s.push("SECONDARY");
2787            s.as_path().to_path_buf()
2788        });
2789
2790        let rocksdb = {
2791            options.create_if_missing(true);
2792            options.create_missing_column_families(true);
2793            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2794                &options,
2795                &primary_path,
2796                &secondary_path,
2797                opt_cfs
2798                    .iter()
2799                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2800            )
2801            .map_err(typed_store_err_from_rocks_err)?;
2802            db.try_catch_up_with_primary()
2803                .map_err(typed_store_err_from_rocks_err)?;
2804            db
2805        };
2806        Ok(Arc::new(RocksDB::DBWithThreadMode(
2807            DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2808        )))
2809    })
2810}
2811
2812pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2813    const DB_DEFAULT_CF_NAME: &str = "default";
2814
2815    let opts = rocksdb::Options::default();
2816    rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2817        .map_err(|e| e.into())
2818        .map(|q| {
2819            q.iter()
2820                .filter_map(|s| {
2821                    // The `default` table is not used
2822                    if s != DB_DEFAULT_CF_NAME {
2823                        Some(s.clone())
2824                    } else {
2825                        None
2826                    }
2827                })
2828                .collect()
2829        })
2830}
2831
2832/// TODO: Good description of why we're doing this : RocksDB stores keys in BE and has a seek operator on iterators, see `https://github.com/facebook/rocksdb/wiki/Iterator#introduction`
2833#[inline]
2834pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2835where
2836    S: ?Sized + serde::Serialize,
2837{
2838    bincode::DefaultOptions::new()
2839        .with_big_endian()
2840        .with_fixint_encoding()
2841        .serialize(t)
2842        .map_err(typed_store_err_from_bincode_err)
2843}
2844
2845#[derive(Clone)]
2846pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2847impl DBMapTableConfigMap {
2848    pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2849        Self(map)
2850    }
2851
2852    pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2853        self.0.clone()
2854    }
2855}
2856
2857pub enum RocksDBAccessType {
2858    Primary,
2859    Secondary(Option<PathBuf>),
2860}
2861
2862pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2863    rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2864}
2865
2866fn populate_missing_cfs(
2867    input_cfs: &[(&str, rocksdb::Options)],
2868    path: &Path,
2869) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2870    let mut cfs = vec![];
2871    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2872    let existing_cfs =
2873        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2874            .ok()
2875            .unwrap_or_default();
2876
2877    for cf_name in existing_cfs {
2878        if !input_cf_index.contains(&cf_name[..]) {
2879            cfs.push((cf_name, rocksdb::Options::default()));
2880        }
2881    }
2882    cfs.extend(
2883        input_cfs
2884            .iter()
2885            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2886    );
2887    Ok(cfs)
2888}
2889
2890/// Given a vec<u8>, find the value which is one more than the vector
2891/// if the vector was a big endian number.
2892/// If the vector is already minimum, don't change it.
2893fn big_endian_saturating_add_one(v: &mut [u8]) {
2894    if is_max(v) {
2895        return;
2896    }
2897    for i in (0..v.len()).rev() {
2898        if v[i] == u8::MAX {
2899            v[i] = 0;
2900        } else {
2901            v[i] += 1;
2902            break;
2903        }
2904    }
2905}
2906
2907/// Check if all the bytes in the vector are 0xFF
2908fn is_max(v: &[u8]) -> bool {
2909    v.iter().all(|&x| x == u8::MAX)
2910}
2911
2912// `clippy::manual_div_ceil` is raised by code expanded by the
2913// `uint::construct_uint!` macro so it needs to be fixed by `uint`
2914#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2915#[test]
2916fn test_helpers() {
2917    let v = vec![];
2918    assert!(is_max(&v));
2919
2920    fn check_add(v: Vec<u8>) {
2921        let mut v = v;
2922        let num = Num32::from_big_endian(&v);
2923        big_endian_saturating_add_one(&mut v);
2924        assert!(num + 1 == Num32::from_big_endian(&v));
2925    }
2926
2927    uint::construct_uint! {
2928        // 32 byte number
2929        struct Num32(4);
2930    }
2931
2932    let mut v = vec![255; 32];
2933    big_endian_saturating_add_one(&mut v);
2934    assert!(Num32::MAX == Num32::from_big_endian(&v));
2935
2936    check_add(vec![1; 32]);
2937    check_add(vec![6; 32]);
2938    check_add(vec![254; 32]);
2939
2940    // TBD: More tests coming with randomized arrays
2941}