typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod safe_iter;
8
9use std::{
10    borrow::Borrow,
11    collections::{BTreeMap, HashSet},
12    env,
13    ffi::CStr,
14    marker::PhantomData,
15    ops::{Bound, RangeBounds},
16    path::{Path, PathBuf},
17    sync::Arc,
18    time::Duration,
19};
20
21use bincode::Options;
22use collectable::TryExtend;
23use iota_macros::{fail_point, nondeterministic};
24use prometheus::{Histogram, HistogramTimer};
25use rocksdb::{
26    AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
27    ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
28    IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
29    ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
30    WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
31};
32use serde::{Serialize, de::DeserializeOwned};
33use tap::TapFallible;
34use tokio::sync::oneshot;
35use tracing::{debug, error, info, instrument, warn};
36
37use self::iter::Iter;
38use crate::{
39    TypedStoreError,
40    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
41    rocks::{
42        errors::{
43            typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
44            typed_store_err_from_rocks_err,
45        },
46        safe_iter::{SafeIter, SafeRevIter},
47    },
48    traits::{Map, TableSummary},
49};
50
51// Write buffer size per RocksDB instance can be set via the env var below.
52// If the env var is not set, use the default value in MiB.
53const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
54const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
55
56// Write ahead log size per RocksDB instance can be set via the env var below.
57// If the env var is not set, use the default value in MiB.
58const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
59const DEFAULT_DB_WAL_SIZE: usize = 1024;
60
61// Environment variable to control behavior of write throughput optimized
62// tables.
63const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
64const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
65const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
66const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
67const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
68const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
69const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
70const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
71const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
72
73// Set to 1 to disable blob storage for transactions and effects.
74const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
75
76const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
77
78// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property
79// built-in. From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
80const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
81    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
82
83const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
84
85#[cfg(test)]
86mod tests;
87
88/// A helper macro to reopen multiple column families. The macro returns
89/// a tuple of DBMap structs in the same order that the column families
90/// are defined.
91///
92/// # Arguments
93///
94/// * `db` - a reference to a rocks DB object
95/// * `cf;<ty,ty>` - a comma separated list of column families to open. For each
96///   column family a concatenation of column family name (cf) and Key-Value
97///   <ty, ty> should be provided.
98///
99/// # Examples
100///
101/// We successfully open two different column families.
102/// ```
103/// use typed_store::reopen;
104/// use typed_store::rocks::*;
105/// use tempfile::tempdir;
106/// use prometheus::Registry;
107/// use std::sync::Arc;
108/// use typed_store::metrics::DBMetrics;
109/// use core::fmt::Error;
110///
111/// #[tokio::main]
112/// async fn main() -> Result<(), Error> {
113/// const FIRST_CF: &str = "First_CF";
114/// const SECOND_CF: &str = "Second_CF";
115///
116///
117/// /// Create the rocks database reference for the desired column families
118/// let rocks = open_cf(tempdir().unwrap(), None, MetricConf::default(), &[FIRST_CF, SECOND_CF]).unwrap();
119///
120/// /// Now simply open all the column families for their expected Key-Value types
121/// let (db_map_1, db_map_2) = reopen!(&rocks, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
122/// Ok(())
123/// }
124/// ```
125#[macro_export]
126macro_rules! reopen {
127    ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
128        (
129            $(
130                DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
131            ),*
132        )
133    };
134}
135
136/// Repeatedly attempt an Optimistic Transaction until it succeeds.
137/// Since many callsites (e.g. the consensus handler) cannot proceed in the case
138/// of failed writes, this will loop forever until the transaction succeeds.
139#[macro_export]
140macro_rules! retry_transaction {
141    ($transaction:expr) => {
142        retry_transaction!($transaction, Some(20))
143    };
144
145    (
146        $transaction:expr,
147        $max_retries:expr // should be an Option<int type>, None for unlimited
148        $(,)?
149
150    ) => {{
151        use std::time::Duration;
152
153        use rand::{
154            distributions::{Distribution, Uniform},
155            rngs::ThreadRng,
156        };
157        use tracing::{error, info};
158
159        let mut retries = 0;
160        let max_retries = $max_retries;
161        loop {
162            let status = $transaction;
163            match status {
164                Err(TypedStoreError::RetryableTransaction) => {
165                    retries += 1;
166                    // Randomized delay to help racing transactions get out of each other's way.
167                    let delay = {
168                        let mut rng = ThreadRng::default();
169                        Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
170                    };
171                    if let Some(max_retries) = max_retries {
172                        if retries > max_retries {
173                            error!(?max_retries, "max retries exceeded");
174                            break status;
175                        }
176                    }
177                    if retries > 10 {
178                        // TODO: monitoring needed?
179                        error!(?delay, ?retries, "excessive transaction retries...");
180                    } else {
181                        info!(
182                            ?delay,
183                            ?retries,
184                            "transaction write conflict detected, sleeping"
185                        );
186                    }
187                    std::thread::sleep(delay);
188                }
189                _ => break status,
190            }
191        }
192    }};
193}
194
195#[macro_export]
196macro_rules! retry_transaction_forever {
197    ($transaction:expr) => {
198        $crate::retry_transaction!($transaction, None)
199    };
200}
201
202#[derive(Debug)]
203pub struct DBWithThreadModeWrapper {
204    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
205    pub metric_conf: MetricConf,
206    pub db_path: PathBuf,
207}
208
209impl DBWithThreadModeWrapper {
210    fn new(
211        underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
212        metric_conf: MetricConf,
213        db_path: PathBuf,
214    ) -> Self {
215        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
216        Self {
217            underlying,
218            metric_conf,
219            db_path,
220        }
221    }
222}
223
224impl Drop for DBWithThreadModeWrapper {
225    fn drop(&mut self) {
226        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
227    }
228}
229
230#[derive(Debug)]
231pub struct OptimisticTransactionDBWrapper {
232    pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
233    pub metric_conf: MetricConf,
234    pub db_path: PathBuf,
235}
236
237impl OptimisticTransactionDBWrapper {
238    fn new(
239        underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
240        metric_conf: MetricConf,
241        db_path: PathBuf,
242    ) -> Self {
243        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
244        Self {
245            underlying,
246            metric_conf,
247            db_path,
248        }
249    }
250}
251
252impl Drop for OptimisticTransactionDBWrapper {
253    fn drop(&mut self) {
254        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
255    }
256}
257
258/// Thin wrapper to unify interface across different db types
259#[derive(Debug)]
260pub enum RocksDB {
261    DBWithThreadMode(DBWithThreadModeWrapper),
262    OptimisticTransactionDB(OptimisticTransactionDBWrapper),
263}
264
265macro_rules! delegate_call {
266    ($self:ident.$method:ident($($args:ident),*)) => {
267        match $self {
268            Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
269            Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
270        }
271    }
272}
273
274impl Drop for RocksDB {
275    fn drop(&mut self) {
276        delegate_call!(self.cancel_all_background_work(/* wait */ true))
277    }
278}
279
280impl RocksDB {
281    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
282        delegate_call!(self.get(key))
283    }
284
285    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
286        &'a self,
287        keys: I,
288        readopts: &ReadOptions,
289    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
290    where
291        K: AsRef<[u8]>,
292        I: IntoIterator<Item = (&'b W, K)>,
293        W: 'b + AsColumnFamilyRef,
294    {
295        delegate_call!(self.multi_get_cf_opt(keys, readopts))
296    }
297
298    pub fn batched_multi_get_cf_opt<I, K>(
299        &self,
300        cf: &impl AsColumnFamilyRef,
301        keys: I,
302        sorted_input: bool,
303        readopts: &ReadOptions,
304    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
305    where
306        I: IntoIterator<Item = K>,
307        K: AsRef<[u8]>,
308    {
309        delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
310    }
311
312    pub fn property_int_value_cf(
313        &self,
314        cf: &impl AsColumnFamilyRef,
315        name: impl CStrLike,
316    ) -> Result<Option<u64>, rocksdb::Error> {
317        delegate_call!(self.property_int_value_cf(cf, name))
318    }
319
320    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
321        &self,
322        cf: &impl AsColumnFamilyRef,
323        key: K,
324        readopts: &ReadOptions,
325    ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
326        delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
327    }
328
329    pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
330        delegate_call!(self.cf_handle(name))
331    }
332
333    pub fn create_cf<N: AsRef<str>>(
334        &self,
335        name: N,
336        opts: &rocksdb::Options,
337    ) -> Result<(), rocksdb::Error> {
338        delegate_call!(self.create_cf(name, opts))
339    }
340
341    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
342        delegate_call!(self.drop_cf(name))
343    }
344
345    pub fn delete_cf<K: AsRef<[u8]>>(
346        &self,
347        cf: &impl AsColumnFamilyRef,
348        key: K,
349        writeopts: &WriteOptions,
350    ) -> Result<(), rocksdb::Error> {
351        fail_point!("delete-cf-before");
352        let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
353        fail_point!("delete-cf-after");
354        #[expect(clippy::let_and_return)]
355        ret
356    }
357
358    pub fn path(&self) -> &Path {
359        delegate_call!(self.path())
360    }
361
362    pub fn put_cf<K, V>(
363        &self,
364        cf: &impl AsColumnFamilyRef,
365        key: K,
366        value: V,
367        writeopts: &WriteOptions,
368    ) -> Result<(), rocksdb::Error>
369    where
370        K: AsRef<[u8]>,
371        V: AsRef<[u8]>,
372    {
373        fail_point!("put-cf-before");
374        let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
375        fail_point!("put-cf-after");
376        #[expect(clippy::let_and_return)]
377        ret
378    }
379
380    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
381        &self,
382        cf: &impl AsColumnFamilyRef,
383        key: K,
384        readopts: &ReadOptions,
385    ) -> bool {
386        delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
387    }
388
389    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
390        delegate_call!(self.try_catch_up_with_primary())
391    }
392
393    pub fn write(
394        &self,
395        batch: RocksDBBatch,
396        writeopts: &WriteOptions,
397    ) -> Result<(), TypedStoreError> {
398        fail_point!("batch-write-before");
399        let ret = match (self, batch) {
400            (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
401                db.underlying
402                    .write_opt(batch, writeopts)
403                    .map_err(typed_store_err_from_rocks_err)?;
404                Ok(())
405            }
406            (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
407                db.underlying
408                    .write_opt(batch, writeopts)
409                    .map_err(typed_store_err_from_rocks_err)?;
410                Ok(())
411            }
412            _ => Err(TypedStoreError::RocksDB(
413                "using invalid batch type for the database".to_string(),
414            )),
415        };
416        fail_point!("batch-write-after");
417        #[expect(clippy::let_and_return)]
418        ret
419    }
420
421    pub fn transaction_without_snapshot(
422        &self,
423    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
424        match self {
425            Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
426            Self::DBWithThreadMode(_) => panic!(),
427        }
428    }
429
430    pub fn transaction(
431        &self,
432    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
433        match self {
434            Self::OptimisticTransactionDB(db) => {
435                let mut tx_opts = OptimisticTransactionOptions::new();
436                tx_opts.set_snapshot(true);
437
438                Ok(db
439                    .underlying
440                    .transaction_opt(&WriteOptions::default(), &tx_opts))
441            }
442            Self::DBWithThreadMode(_) => panic!(),
443        }
444    }
445
446    pub fn raw_iterator_cf<'a: 'b, 'b>(
447        &'a self,
448        cf_handle: &impl AsColumnFamilyRef,
449        readopts: ReadOptions,
450    ) -> RocksDBRawIter<'b> {
451        match self {
452            Self::DBWithThreadMode(db) => {
453                RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
454            }
455            Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
456                db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
457            ),
458        }
459    }
460
461    pub fn iterator_cf<'a: 'b, 'b>(
462        &'a self,
463        cf_handle: &impl AsColumnFamilyRef,
464        readopts: ReadOptions,
465        mode: IteratorMode<'_>,
466    ) -> RocksDBIter<'b> {
467        match self {
468            Self::DBWithThreadMode(db) => {
469                RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
470            }
471            Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
472                db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
473            ),
474        }
475    }
476
477    pub fn compact_range_cf<K: AsRef<[u8]>>(
478        &self,
479        cf: &impl AsColumnFamilyRef,
480        start: Option<K>,
481        end: Option<K>,
482    ) {
483        delegate_call!(self.compact_range_cf(cf, start, end))
484    }
485
486    pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
487        &self,
488        cf: &impl AsColumnFamilyRef,
489        start: Option<K>,
490        end: Option<K>,
491    ) {
492        let opt = &mut CompactOptions::default();
493        opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
494        delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
495    }
496
497    pub fn flush(&self) -> Result<(), TypedStoreError> {
498        delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
499    }
500
501    pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
502        match self {
503            Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
504            Self::OptimisticTransactionDB(d) => {
505                RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
506            }
507        }
508    }
509
510    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
511        let checkpoint = match self {
512            Self::DBWithThreadMode(d) => {
513                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
514            }
515            Self::OptimisticTransactionDB(d) => {
516                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
517            }
518        };
519        checkpoint
520            .create_checkpoint(path)
521            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
522        Ok(())
523    }
524
525    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
526        delegate_call!(self.flush_cf(cf))
527    }
528
529    pub fn set_options_cf(
530        &self,
531        cf: &impl AsColumnFamilyRef,
532        opts: &[(&str, &str)],
533    ) -> Result<(), rocksdb::Error> {
534        delegate_call!(self.set_options_cf(cf, opts))
535    }
536
537    pub fn get_sampling_interval(&self) -> SamplingInterval {
538        match self {
539            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
540            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
541        }
542    }
543
544    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
545        match self {
546            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
547            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
548        }
549    }
550
551    pub fn write_sampling_interval(&self) -> SamplingInterval {
552        match self {
553            Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
554            Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
555        }
556    }
557
558    pub fn iter_sampling_interval(&self) -> SamplingInterval {
559        match self {
560            Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
561            Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
562        }
563    }
564
565    pub fn db_name(&self) -> String {
566        let name = match self {
567            Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
568            Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
569        };
570        if name.is_empty() {
571            self.default_db_name()
572        } else {
573            name.clone()
574        }
575    }
576
577    fn default_db_name(&self) -> String {
578        self.path()
579            .file_name()
580            .and_then(|f| f.to_str())
581            .unwrap_or("unknown")
582            .to_string()
583    }
584
585    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
586        delegate_call!(self.live_files())
587    }
588}
589
590// Check if the database is corrupted, and if so, panic.
591// If the corrupted key is not set, we set it to [1].
592pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
593    let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
594
595    db.get(DB_CORRUPTED_KEY)
596        .map_err(|e| format!("Failed to open database: {e}"))
597        .and_then(|value| match value {
598            Some(v) if v[0] == 1 => Err(
599                "Database is corrupted, please remove the current database and start clean!"
600                    .to_string(),
601            ),
602            Some(_) => Ok(()),
603            None => db
604                .put(DB_CORRUPTED_KEY, [1])
605                .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
606        })?;
607
608    Ok(())
609}
610
611pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
612    rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
613}
614
615pub enum RocksDBSnapshot<'a> {
616    DBWithThreadMode(rocksdb::Snapshot<'a>),
617    OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
618}
619
620impl<'a> RocksDBSnapshot<'a> {
621    pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
622        &'a self,
623        keys: I,
624        readopts: ReadOptions,
625    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
626    where
627        K: AsRef<[u8]>,
628        I: IntoIterator<Item = (&'b W, K)>,
629        W: 'b + AsColumnFamilyRef,
630    {
631        match self {
632            Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
633            Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
634        }
635    }
636    pub fn multi_get_cf<'b: 'a, K, I, W>(
637        &'a self,
638        keys: I,
639    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
640    where
641        K: AsRef<[u8]>,
642        I: IntoIterator<Item = (&'b W, K)>,
643        W: 'b + AsColumnFamilyRef,
644    {
645        match self {
646            Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
647            Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
648        }
649    }
650}
651
652pub enum RocksDBBatch {
653    Regular(rocksdb::WriteBatch),
654    Transactional(rocksdb::WriteBatchWithTransaction<true>),
655}
656
657macro_rules! delegate_batch_call {
658    ($self:ident.$method:ident($($args:ident),*)) => {
659        match $self {
660            Self::Regular(b) => b.$method($($args),*),
661            Self::Transactional(b) => b.$method($($args),*),
662        }
663    }
664}
665
666impl RocksDBBatch {
667    fn size_in_bytes(&self) -> usize {
668        delegate_batch_call!(self.size_in_bytes())
669    }
670
671    pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
672        delegate_batch_call!(self.delete_cf(cf, key))
673    }
674
675    pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
676    where
677        K: AsRef<[u8]>,
678        V: AsRef<[u8]>,
679    {
680        delegate_batch_call!(self.put_cf(cf, key, value))
681    }
682
683    pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
684    where
685        K: AsRef<[u8]>,
686        V: AsRef<[u8]>,
687    {
688        delegate_batch_call!(self.merge_cf(cf, key, value))
689    }
690
691    pub fn delete_range_cf<K: AsRef<[u8]>>(
692        &mut self,
693        cf: &impl AsColumnFamilyRef,
694        from: K,
695        to: K,
696    ) -> Result<(), TypedStoreError> {
697        match self {
698            Self::Regular(batch) => {
699                batch.delete_range_cf(cf, from, to);
700                Ok(())
701            }
702            Self::Transactional(_) => panic!(),
703        }
704    }
705}
706
707#[derive(Debug, Default)]
708pub struct MetricConf {
709    pub db_name: String,
710    pub read_sample_interval: SamplingInterval,
711    pub write_sample_interval: SamplingInterval,
712    pub iter_sample_interval: SamplingInterval,
713}
714
715impl MetricConf {
716    pub fn new(db_name: &str) -> Self {
717        if db_name.is_empty() {
718            error!("A meaningful db name should be used for metrics reporting.")
719        }
720        Self {
721            db_name: db_name.to_string(),
722            read_sample_interval: SamplingInterval::default(),
723            write_sample_interval: SamplingInterval::default(),
724            iter_sample_interval: SamplingInterval::default(),
725        }
726    }
727
728    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
729        Self {
730            db_name: self.db_name,
731            read_sample_interval: read_interval,
732            write_sample_interval: SamplingInterval::default(),
733            iter_sample_interval: SamplingInterval::default(),
734        }
735    }
736}
737const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
738const METRICS_ERROR: i64 = -1;
739
740/// An interface to a rocksDB database, keyed by a columnfamily
741#[derive(Clone, Debug)]
742pub struct DBMap<K, V> {
743    pub rocksdb: Arc<RocksDB>,
744    _phantom: PhantomData<fn(K) -> V>,
745    // the rocksDB ColumnFamily under which the map is stored
746    cf: String,
747    pub opts: ReadWriteOptions,
748    db_metrics: Arc<DBMetrics>,
749    get_sample_interval: SamplingInterval,
750    multiget_sample_interval: SamplingInterval,
751    write_sample_interval: SamplingInterval,
752    iter_sample_interval: SamplingInterval,
753    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
754}
755
756unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
757
758impl<K, V> DBMap<K, V> {
759    pub(crate) fn new(
760        db: Arc<RocksDB>,
761        opts: &ReadWriteOptions,
762        opt_cf: &str,
763        is_deprecated: bool,
764    ) -> Self {
765        let db_cloned = db.clone();
766        let db_metrics = DBMetrics::get();
767        let db_metrics_cloned = db_metrics.clone();
768        let cf = opt_cf.to_string();
769        let (sender, mut recv) = tokio::sync::oneshot::channel();
770        if !is_deprecated {
771            tokio::task::spawn(async move {
772                let mut interval =
773                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
774                loop {
775                    tokio::select! {
776                        _ = interval.tick() => {
777                            let db = db_cloned.clone();
778                            let cf = cf.clone();
779                            let db_metrics = db_metrics.clone();
780                            if let Err(e) = tokio::task::spawn_blocking(move || {
781                                Self::report_metrics(&db, &cf, &db_metrics);
782                            }).await {
783                                error!("Failed to log metrics with error: {}", e);
784                            }
785                        }
786                        _ = &mut recv => break,
787                    }
788                }
789                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
790            });
791        }
792        DBMap {
793            rocksdb: db.clone(),
794            opts: opts.clone(),
795            _phantom: PhantomData,
796            cf: opt_cf.to_string(),
797            db_metrics: db_metrics_cloned,
798            _metrics_task_cancel_handle: Arc::new(sender),
799            get_sample_interval: db.get_sampling_interval(),
800            multiget_sample_interval: db.multiget_sampling_interval(),
801            write_sample_interval: db.write_sampling_interval(),
802            iter_sample_interval: db.iter_sampling_interval(),
803        }
804    }
805
806    /// Opens a database from a path, with specific options and an optional
807    /// column family.
808    ///
809    /// This database is used to perform operations on single column family, and
810    /// parametrizes all operations in `DBBatch` when writing across column
811    /// families.
812    #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
813    pub fn open<P: AsRef<Path>>(
814        path: P,
815        metric_conf: MetricConf,
816        db_options: Option<rocksdb::Options>,
817        opt_cf: Option<&str>,
818        rw_options: &ReadWriteOptions,
819    ) -> Result<Self, TypedStoreError> {
820        let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
821        let cfs = vec![cf_key];
822        let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
823        Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
824    }
825
826    /// Reopens an open database as a typed map operating under a specific
827    /// column family. if no column family is passed, the default column
828    /// family is used.
829    ///
830    /// ```
831    /// use core::fmt::Error;
832    /// use std::sync::Arc;
833    ///
834    /// use prometheus::Registry;
835    /// use tempfile::tempdir;
836    /// use typed_store::{metrics::DBMetrics, rocks::*};
837    /// #[tokio::main]
838    /// async fn main() -> Result<(), Error> {
839    ///     /// Open the DB with all needed column families first.
840    ///     let rocks = open_cf(
841    ///         tempdir().unwrap(),
842    ///         None,
843    ///         MetricConf::default(),
844    ///         &["First_CF", "Second_CF"],
845    ///     )
846    ///     .unwrap();
847    ///     /// Attach the column families to specific maps.
848    ///     let db_cf_1 = DBMap::<u32, u32>::reopen(
849    ///         &rocks,
850    ///         Some("First_CF"),
851    ///         &ReadWriteOptions::default(),
852    ///         false,
853    ///     )
854    ///     .expect("Failed to open storage");
855    ///     let db_cf_2 = DBMap::<u32, u32>::reopen(
856    ///         &rocks,
857    ///         Some("Second_CF"),
858    ///         &ReadWriteOptions::default(),
859    ///         false,
860    ///     )
861    ///     .expect("Failed to open storage");
862    ///     Ok(())
863    /// }
864    /// ```
865    #[instrument(level = "debug", skip(db), err)]
866    pub fn reopen(
867        db: &Arc<RocksDB>,
868        opt_cf: Option<&str>,
869        rw_options: &ReadWriteOptions,
870        is_deprecated: bool,
871    ) -> Result<Self, TypedStoreError> {
872        let cf_key = opt_cf
873            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
874            .to_owned();
875
876        db.cf_handle(&cf_key)
877            .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
878
879        Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
880    }
881
882    pub fn batch(&self) -> DBBatch {
883        let batch = match *self.rocksdb {
884            RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
885            RocksDB::OptimisticTransactionDB(_) => {
886                RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
887            }
888        };
889        DBBatch::new(
890            &self.rocksdb,
891            batch,
892            self.opts.writeopts(),
893            &self.db_metrics,
894            &self.write_sample_interval,
895        )
896    }
897
898    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
899        let from_buf = be_fix_int_ser(start)?;
900        let to_buf = be_fix_int_ser(end)?;
901        self.rocksdb
902            .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
903        Ok(())
904    }
905
906    pub fn compact_range_raw(
907        &self,
908        cf_name: &str,
909        start: Vec<u8>,
910        end: Vec<u8>,
911    ) -> Result<(), TypedStoreError> {
912        let cf = self
913            .rocksdb
914            .cf_handle(cf_name)
915            .expect("compact range: column family does not exist");
916        self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
917        Ok(())
918    }
919
920    pub fn compact_range_to_bottom<J: Serialize>(
921        &self,
922        start: &J,
923        end: &J,
924    ) -> Result<(), TypedStoreError> {
925        let from_buf = be_fix_int_ser(start)?;
926        let to_buf = be_fix_int_ser(end)?;
927        self.rocksdb
928            .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
929        Ok(())
930    }
931
932    pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
933        self.rocksdb
934            .cf_handle(&self.cf)
935            .expect("Map-keying column family should have been checked at DB creation")
936    }
937
938    pub fn iterator_cf(&self) -> RocksDBIter<'_> {
939        self.rocksdb
940            .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
941    }
942
943    pub fn flush(&self) -> Result<(), TypedStoreError> {
944        self.rocksdb
945            .flush_cf(&self.cf())
946            .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
947    }
948
949    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
950        self.rocksdb.set_options_cf(&self.cf(), opts)
951    }
952
953    fn get_int_property(
954        rocksdb: &RocksDB,
955        cf: &impl AsColumnFamilyRef,
956        property_name: &std::ffi::CStr,
957    ) -> Result<i64, TypedStoreError> {
958        match rocksdb.property_int_value_cf(cf, property_name) {
959            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
960            Ok(None) => Ok(0),
961            Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
962        }
963    }
964
965    /// Returns a vector of raw values corresponding to the keys provided.
966    fn multi_get_pinned<J>(
967        &self,
968        keys: impl IntoIterator<Item = J>,
969    ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
970    where
971        J: Borrow<K>,
972        K: Serialize,
973    {
974        let _timer = self
975            .db_metrics
976            .op_metrics
977            .rocksdb_multiget_latency_seconds
978            .with_label_values(&[&self.cf])
979            .start_timer();
980        let perf_ctx = if self.multiget_sample_interval.sample() {
981            Some(RocksDBPerfContext)
982        } else {
983            None
984        };
985        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
986            .into_iter()
987            .map(|k| be_fix_int_ser(k.borrow()))
988            .collect();
989        let results: Result<Vec<_>, TypedStoreError> = self
990            .rocksdb
991            .batched_multi_get_cf_opt(
992                &self.cf(),
993                keys_bytes?,
994                // sorted_keys=
995                false,
996                &self.opts.readopts(),
997            )
998            .into_iter()
999            .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
1000            .collect();
1001        let entries = results?;
1002        let entry_size = entries
1003            .iter()
1004            .flatten()
1005            .map(|entry| entry.len())
1006            .sum::<usize>();
1007        self.db_metrics
1008            .op_metrics
1009            .rocksdb_multiget_bytes
1010            .with_label_values(&[&self.cf])
1011            .observe(entry_size as f64);
1012        if perf_ctx.is_some() {
1013            self.db_metrics
1014                .read_perf_ctx_metrics
1015                .report_metrics(&self.cf);
1016        }
1017        Ok(entries)
1018    }
1019
1020    fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
1021        let Some(cf) = rocksdb.cf_handle(cf_name) else {
1022            tracing::warn!(
1023                "unable to report metrics for cf {cf_name:?} in db {:?}",
1024                rocksdb.db_name()
1025            );
1026            return;
1027        };
1028
1029        db_metrics
1030            .cf_metrics
1031            .rocksdb_total_sst_files_size
1032            .with_label_values(&[cf_name])
1033            .set(
1034                Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1035                    .unwrap_or(METRICS_ERROR),
1036            );
1037        db_metrics
1038            .cf_metrics
1039            .rocksdb_total_blob_files_size
1040            .with_label_values(&[cf_name])
1041            .set(
1042                Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1043                    .unwrap_or(METRICS_ERROR),
1044            );
1045        // 7 is the default number of levels in RocksDB. If we ever change the number of
1046        // levels using `set_num_levels`, we need to update here as well. Note
1047        // that there isn't an API to query the DB to get the number of levels (yet).
1048        let total_num_files: i64 = (0..=6)
1049            .map(|level| {
1050                Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
1051                    .unwrap_or(METRICS_ERROR)
1052            })
1053            .sum();
1054        db_metrics
1055            .cf_metrics
1056            .rocksdb_total_num_files
1057            .with_label_values(&[cf_name])
1058            .set(total_num_files);
1059        db_metrics
1060            .cf_metrics
1061            .rocksdb_num_level0_files
1062            .with_label_values(&[cf_name])
1063            .set(
1064                Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
1065                    .unwrap_or(METRICS_ERROR),
1066            );
1067        db_metrics
1068            .cf_metrics
1069            .rocksdb_current_size_active_mem_tables
1070            .with_label_values(&[cf_name])
1071            .set(
1072                Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1073                    .unwrap_or(METRICS_ERROR),
1074            );
1075        db_metrics
1076            .cf_metrics
1077            .rocksdb_size_all_mem_tables
1078            .with_label_values(&[cf_name])
1079            .set(
1080                Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1081                    .unwrap_or(METRICS_ERROR),
1082            );
1083        db_metrics
1084            .cf_metrics
1085            .rocksdb_num_snapshots
1086            .with_label_values(&[cf_name])
1087            .set(
1088                Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1089                    .unwrap_or(METRICS_ERROR),
1090            );
1091        db_metrics
1092            .cf_metrics
1093            .rocksdb_oldest_snapshot_time
1094            .with_label_values(&[cf_name])
1095            .set(
1096                Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1097                    .unwrap_or(METRICS_ERROR),
1098            );
1099        db_metrics
1100            .cf_metrics
1101            .rocksdb_actual_delayed_write_rate
1102            .with_label_values(&[cf_name])
1103            .set(
1104                Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1105                    .unwrap_or(METRICS_ERROR),
1106            );
1107        db_metrics
1108            .cf_metrics
1109            .rocksdb_is_write_stopped
1110            .with_label_values(&[cf_name])
1111            .set(
1112                Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1113                    .unwrap_or(METRICS_ERROR),
1114            );
1115        db_metrics
1116            .cf_metrics
1117            .rocksdb_block_cache_capacity
1118            .with_label_values(&[cf_name])
1119            .set(
1120                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1121                    .unwrap_or(METRICS_ERROR),
1122            );
1123        db_metrics
1124            .cf_metrics
1125            .rocksdb_block_cache_usage
1126            .with_label_values(&[cf_name])
1127            .set(
1128                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1129                    .unwrap_or(METRICS_ERROR),
1130            );
1131        db_metrics
1132            .cf_metrics
1133            .rocksdb_block_cache_pinned_usage
1134            .with_label_values(&[cf_name])
1135            .set(
1136                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1137                    .unwrap_or(METRICS_ERROR),
1138            );
1139        db_metrics
1140            .cf_metrics
1141            .rocksdb_estimate_table_readers_mem
1142            .with_label_values(&[cf_name])
1143            .set(
1144                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1145                    .unwrap_or(METRICS_ERROR),
1146            );
1147        db_metrics
1148            .cf_metrics
1149            .rocksdb_estimated_num_keys
1150            .with_label_values(&[cf_name])
1151            .set(
1152                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1153                    .unwrap_or(METRICS_ERROR),
1154            );
1155        db_metrics
1156            .cf_metrics
1157            .rocksdb_num_immutable_mem_tables
1158            .with_label_values(&[cf_name])
1159            .set(
1160                Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1161                    .unwrap_or(METRICS_ERROR),
1162            );
1163        db_metrics
1164            .cf_metrics
1165            .rocksdb_mem_table_flush_pending
1166            .with_label_values(&[cf_name])
1167            .set(
1168                Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1169                    .unwrap_or(METRICS_ERROR),
1170            );
1171        db_metrics
1172            .cf_metrics
1173            .rocksdb_compaction_pending
1174            .with_label_values(&[cf_name])
1175            .set(
1176                Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1177                    .unwrap_or(METRICS_ERROR),
1178            );
1179        db_metrics
1180            .cf_metrics
1181            .rocksdb_estimate_pending_compaction_bytes
1182            .with_label_values(&[cf_name])
1183            .set(
1184                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1185                    .unwrap_or(METRICS_ERROR),
1186            );
1187        db_metrics
1188            .cf_metrics
1189            .rocksdb_num_running_compactions
1190            .with_label_values(&[cf_name])
1191            .set(
1192                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1193                    .unwrap_or(METRICS_ERROR),
1194            );
1195        db_metrics
1196            .cf_metrics
1197            .rocksdb_num_running_flushes
1198            .with_label_values(&[cf_name])
1199            .set(
1200                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1201                    .unwrap_or(METRICS_ERROR),
1202            );
1203        db_metrics
1204            .cf_metrics
1205            .rocksdb_estimate_oldest_key_time
1206            .with_label_values(&[cf_name])
1207            .set(
1208                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1209                    .unwrap_or(METRICS_ERROR),
1210            );
1211        db_metrics
1212            .cf_metrics
1213            .rocksdb_background_errors
1214            .with_label_values(&[cf_name])
1215            .set(
1216                Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1217                    .unwrap_or(METRICS_ERROR),
1218            );
1219        db_metrics
1220            .cf_metrics
1221            .rocksdb_base_level
1222            .with_label_values(&[cf_name])
1223            .set(
1224                Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1225                    .unwrap_or(METRICS_ERROR),
1226            );
1227    }
1228
1229    pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1230        DBTransaction::new(&self.rocksdb)
1231    }
1232
1233    pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1234        DBTransaction::new_without_snapshot(&self.rocksdb)
1235    }
1236
1237    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1238        self.rocksdb.checkpoint(path)
1239    }
1240
1241    pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1242        Ok(self.rocksdb.snapshot())
1243    }
1244
1245    pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1246        let mut num_keys = 0;
1247        let mut key_bytes_total = 0;
1248        let mut value_bytes_total = 0;
1249        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1250        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1251        let iter = self.iterator_cf().map(Result::unwrap);
1252        for (key, value) in iter {
1253            num_keys += 1;
1254            key_bytes_total += key.len();
1255            value_bytes_total += value.len();
1256            key_hist.record(key.len() as u64)?;
1257            value_hist.record(value.len() as u64)?;
1258        }
1259        Ok(TableSummary {
1260            num_keys,
1261            key_bytes_total,
1262            value_bytes_total,
1263            key_hist,
1264            value_hist,
1265        })
1266    }
1267
1268    // Creates metrics and context for tracking an iterator usage and performance.
1269    fn create_iter_context(
1270        &self,
1271    ) -> (
1272        Option<HistogramTimer>,
1273        Option<Histogram>,
1274        Option<Histogram>,
1275        Option<RocksDBPerfContext>,
1276    ) {
1277        let timer = self
1278            .db_metrics
1279            .op_metrics
1280            .rocksdb_iter_latency_seconds
1281            .with_label_values(&[&self.cf])
1282            .start_timer();
1283        let bytes_scanned = self
1284            .db_metrics
1285            .op_metrics
1286            .rocksdb_iter_bytes
1287            .with_label_values(&[&self.cf]);
1288        let keys_scanned = self
1289            .db_metrics
1290            .op_metrics
1291            .rocksdb_iter_keys
1292            .with_label_values(&[&self.cf]);
1293        let perf_ctx = if self.iter_sample_interval.sample() {
1294            Some(RocksDBPerfContext)
1295        } else {
1296            None
1297        };
1298        (
1299            Some(timer),
1300            Some(bytes_scanned),
1301            Some(keys_scanned),
1302            perf_ctx,
1303        )
1304    }
1305
1306    // Creates a RocksDB read option with specified lower and upper bounds.
1307    /// Lower bound is inclusive, and upper bound is exclusive.
1308    fn create_read_options_with_bounds(
1309        &self,
1310        lower_bound: Option<K>,
1311        upper_bound: Option<K>,
1312    ) -> ReadOptions
1313    where
1314        K: Serialize,
1315    {
1316        let mut readopts = self.opts.readopts();
1317        if let Some(lower_bound) = lower_bound {
1318            let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1319            readopts.set_iterate_lower_bound(key_buf);
1320        }
1321        if let Some(upper_bound) = upper_bound {
1322            let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1323            readopts.set_iterate_upper_bound(key_buf);
1324        }
1325        readopts
1326    }
1327
1328    /// Creates a safe reversed iterator with optional bounds.
1329    /// Upper bound is included.
1330    pub fn reversed_safe_iter_with_bounds(
1331        &self,
1332        lower_bound: Option<K>,
1333        upper_bound: Option<K>,
1334    ) -> Result<SafeRevIter<'_, K, V>, TypedStoreError>
1335    where
1336        K: Serialize + DeserializeOwned,
1337        V: Serialize + DeserializeOwned,
1338    {
1339        let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1340        let readopts = self.create_read_options_with_range((
1341            lower_bound
1342                .as_ref()
1343                .map(Bound::Included)
1344                .unwrap_or(Bound::Unbounded),
1345            upper_bound
1346                .as_ref()
1347                .map(Bound::Included)
1348                .unwrap_or(Bound::Unbounded),
1349        ));
1350
1351        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1352        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1353        let iter = SafeIter::new(
1354            self.cf.clone(),
1355            db_iter,
1356            _timer,
1357            _perf_ctx,
1358            bytes_scanned,
1359            keys_scanned,
1360            Some(self.db_metrics.clone()),
1361        );
1362        Ok(SafeRevIter::new(iter, upper_bound_key.transpose()?))
1363    }
1364
1365    // Creates a RocksDB read option with lower and upper bounds set corresponding
1366    // to `range`.
1367    fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1368    where
1369        K: Serialize,
1370    {
1371        let mut readopts = self.opts.readopts();
1372
1373        let lower_bound = range.start_bound();
1374        let upper_bound = range.end_bound();
1375
1376        match lower_bound {
1377            Bound::Included(lower_bound) => {
1378                // Rocksdb lower bound is inclusive by default so nothing to do
1379                let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1380                readopts.set_iterate_lower_bound(key_buf);
1381            }
1382            Bound::Excluded(lower_bound) => {
1383                let mut key_buf =
1384                    be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1385
1386                // Since we want exclusive, we need to increment the key to exclude the previous
1387                big_endian_saturating_add_one(&mut key_buf);
1388                readopts.set_iterate_lower_bound(key_buf);
1389            }
1390            Bound::Unbounded => (),
1391        };
1392
1393        match upper_bound {
1394            Bound::Included(upper_bound) => {
1395                let mut key_buf =
1396                    be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1397
1398                // If the key is already at the limit, there's nowhere else to go, so no upper
1399                // bound
1400                if !is_max(&key_buf) {
1401                    // Since we want exclusive, we need to increment the key to get the upper bound
1402                    big_endian_saturating_add_one(&mut key_buf);
1403                    readopts.set_iterate_upper_bound(key_buf);
1404                }
1405            }
1406            Bound::Excluded(upper_bound) => {
1407                // Rocksdb upper bound is inclusive by default so nothing to do
1408                let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1409                readopts.set_iterate_upper_bound(key_buf);
1410            }
1411            Bound::Unbounded => (),
1412        };
1413
1414        readopts
1415    }
1416}
1417
1418/// Provides a mutable struct to form a collection of database write operations,
1419/// and execute them.
1420///
1421/// Batching write and delete operations is faster than performing them one by
1422/// one and ensures their atomicity,  ie. they are all written or none is.
1423/// This is also true of operations across column families in the same database.
1424///
1425/// Serializations / Deserialization, and naming of column families is performed
1426/// by passing a DBMap<K,V> with each operation.
1427///
1428/// ```
1429/// use core::fmt::Error;
1430/// use std::sync::Arc;
1431///
1432/// use prometheus::Registry;
1433/// use tempfile::tempdir;
1434/// use typed_store::{Map, metrics::DBMetrics, rocks::*};
1435///
1436/// #[tokio::main]
1437/// async fn main() -> Result<(), Error> {
1438///     let rocks = open_cf(
1439///         tempfile::tempdir().unwrap(),
1440///         None,
1441///         MetricConf::default(),
1442///         &["First_CF", "Second_CF"],
1443///     )
1444///     .unwrap();
1445///
1446///     let db_cf_1 = DBMap::reopen(
1447///         &rocks,
1448///         Some("First_CF"),
1449///         &ReadWriteOptions::default(),
1450///         false,
1451///     )
1452///     .expect("Failed to open storage");
1453///     let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1454///
1455///     let db_cf_2 = DBMap::reopen(
1456///         &rocks,
1457///         Some("Second_CF"),
1458///         &ReadWriteOptions::default(),
1459///         false,
1460///     )
1461///     .expect("Failed to open storage");
1462///     let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1463///
1464///     let mut batch = db_cf_1.batch();
1465///     batch
1466///         .insert_batch(&db_cf_1, keys_vals_1.clone())
1467///         .expect("Failed to batch insert")
1468///         .insert_batch(&db_cf_2, keys_vals_2.clone())
1469///         .expect("Failed to batch insert");
1470///
1471///     let _ = batch.write().expect("Failed to execute batch");
1472///     for (k, v) in keys_vals_1 {
1473///         let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1474///         assert_eq!(Some(v), val);
1475///     }
1476///
1477///     for (k, v) in keys_vals_2 {
1478///         let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1479///         assert_eq!(Some(v), val);
1480///     }
1481///     Ok(())
1482/// }
1483/// ```
1484pub struct DBBatch {
1485    rocksdb: Arc<RocksDB>,
1486    batch: RocksDBBatch,
1487    opts: WriteOptions,
1488    db_metrics: Arc<DBMetrics>,
1489    write_sample_interval: SamplingInterval,
1490}
1491
1492impl DBBatch {
1493    /// Create a new batch associated with a DB reference.
1494    ///
1495    /// Use `open_cf` to get the DB reference or an existing open database.
1496    pub fn new(
1497        dbref: &Arc<RocksDB>,
1498        batch: RocksDBBatch,
1499        opts: WriteOptions,
1500        db_metrics: &Arc<DBMetrics>,
1501        write_sample_interval: &SamplingInterval,
1502    ) -> Self {
1503        DBBatch {
1504            rocksdb: dbref.clone(),
1505            batch,
1506            opts,
1507            db_metrics: db_metrics.clone(),
1508            write_sample_interval: write_sample_interval.clone(),
1509        }
1510    }
1511
1512    /// Consume the batch and write its operations to the database
1513    #[instrument(level = "trace", skip_all, err)]
1514    pub fn write(self) -> Result<(), TypedStoreError> {
1515        let db_name = self.rocksdb.db_name();
1516        let timer = self
1517            .db_metrics
1518            .op_metrics
1519            .rocksdb_batch_commit_latency_seconds
1520            .with_label_values(&[&db_name])
1521            .start_timer();
1522        let batch_size = self.batch.size_in_bytes();
1523
1524        let perf_ctx = if self.write_sample_interval.sample() {
1525            Some(RocksDBPerfContext)
1526        } else {
1527            None
1528        };
1529        self.rocksdb.write(self.batch, &self.opts)?;
1530        self.db_metrics
1531            .op_metrics
1532            .rocksdb_batch_commit_bytes
1533            .with_label_values(&[&db_name])
1534            .observe(batch_size as f64);
1535
1536        if perf_ctx.is_some() {
1537            self.db_metrics
1538                .write_perf_ctx_metrics
1539                .report_metrics(&db_name);
1540        }
1541        let elapsed = timer.stop_and_record();
1542        if elapsed > 1.0 {
1543            warn!(?elapsed, ?db_name, "very slow batch write");
1544            self.db_metrics
1545                .op_metrics
1546                .rocksdb_very_slow_batch_writes_count
1547                .with_label_values(&[&db_name])
1548                .inc();
1549            self.db_metrics
1550                .op_metrics
1551                .rocksdb_very_slow_batch_writes_duration_ms
1552                .with_label_values(&[&db_name])
1553                .inc_by((elapsed * 1000.0) as u64);
1554        }
1555        Ok(())
1556    }
1557
1558    pub fn size_in_bytes(&self) -> usize {
1559        self.batch.size_in_bytes()
1560    }
1561}
1562
1563impl DBBatch {
1564    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1565        &mut self,
1566        db: &DBMap<K, V>,
1567        purged_vals: impl IntoIterator<Item = J>,
1568    ) -> Result<(), TypedStoreError> {
1569        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1570            return Err(TypedStoreError::CrossDBBatch);
1571        }
1572
1573        purged_vals
1574            .into_iter()
1575            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1576                let k_buf = be_fix_int_ser(k.borrow())?;
1577                self.batch.delete_cf(&db.cf(), k_buf);
1578
1579                Ok(())
1580            })?;
1581        Ok(())
1582    }
1583
1584    /// Deletes a range of keys between `from` (inclusive) and `to`
1585    /// (non-inclusive) by writing a range delete tombstone in the db map
1586    /// If the DBMap is configured with ignore_range_deletions set to false,
1587    /// the effect of this write will be visible immediately i.e. you won't
1588    /// see old values when you do a lookup or scan. But if it is configured
1589    /// with ignore_range_deletions set to true, the old value are visible until
1590    /// compaction actually deletes them which will happen sometime after. By
1591    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1592    /// overridden in the config), so please use this function with caution
1593    pub fn schedule_delete_range<K: Serialize, V>(
1594        &mut self,
1595        db: &DBMap<K, V>,
1596        from: &K,
1597        to: &K,
1598    ) -> Result<(), TypedStoreError> {
1599        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1600            return Err(TypedStoreError::CrossDBBatch);
1601        }
1602
1603        let from_buf = be_fix_int_ser(from)?;
1604        let to_buf = be_fix_int_ser(to)?;
1605
1606        self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1607        Ok(())
1608    }
1609
1610    /// inserts a range of (key, value) pairs given as an iterator
1611    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1612        &mut self,
1613        db: &DBMap<K, V>,
1614        new_vals: impl IntoIterator<Item = (J, U)>,
1615    ) -> Result<&mut Self, TypedStoreError> {
1616        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1617            return Err(TypedStoreError::CrossDBBatch);
1618        }
1619        let mut total = 0usize;
1620        new_vals
1621            .into_iter()
1622            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1623                let k_buf = be_fix_int_ser(k.borrow())?;
1624                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1625                total += k_buf.len() + v_buf.len();
1626                self.batch.put_cf(&db.cf(), k_buf, v_buf);
1627                Ok(())
1628            })?;
1629        self.db_metrics
1630            .op_metrics
1631            .rocksdb_batch_put_bytes
1632            .with_label_values(&[&db.cf])
1633            .observe(total as f64);
1634        Ok(self)
1635    }
1636}
1637
1638pub struct DBTransaction<'a> {
1639    rocksdb: Arc<RocksDB>,
1640    transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1641}
1642
1643impl<'a> DBTransaction<'a> {
1644    pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1645        Ok(Self {
1646            rocksdb: db.clone(),
1647            transaction: db.transaction()?,
1648        })
1649    }
1650
1651    pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1652        Ok(Self {
1653            rocksdb: db.clone(),
1654            transaction: db.transaction_without_snapshot()?,
1655        })
1656    }
1657
1658    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1659        &mut self,
1660        db: &DBMap<K, V>,
1661        new_vals: impl IntoIterator<Item = (J, U)>,
1662    ) -> Result<&mut Self, TypedStoreError> {
1663        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1664            return Err(TypedStoreError::CrossDBBatch);
1665        }
1666
1667        new_vals
1668            .into_iter()
1669            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1670                let k_buf = be_fix_int_ser(k.borrow())?;
1671                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1672                self.transaction
1673                    .put_cf(&db.cf(), k_buf, v_buf)
1674                    .map_err(typed_store_err_from_rocks_err)?;
1675                Ok(())
1676            })?;
1677        Ok(self)
1678    }
1679
1680    /// Deletes a set of keys given as an iterator
1681    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1682        &mut self,
1683        db: &DBMap<K, V>,
1684        purged_vals: impl IntoIterator<Item = J>,
1685    ) -> Result<&mut Self, TypedStoreError> {
1686        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1687            return Err(TypedStoreError::CrossDBBatch);
1688        }
1689        purged_vals
1690            .into_iter()
1691            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1692                let k_buf = be_fix_int_ser(k.borrow())?;
1693                self.transaction
1694                    .delete_cf(&db.cf(), k_buf)
1695                    .map_err(typed_store_err_from_rocks_err)?;
1696                Ok(())
1697            })?;
1698        Ok(self)
1699    }
1700
1701    pub fn snapshot(
1702        &self,
1703    ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1704    {
1705        self.transaction.snapshot()
1706    }
1707
1708    pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1709        &self,
1710        db: &DBMap<K, V>,
1711        key: &K,
1712    ) -> Result<Option<V>, TypedStoreError> {
1713        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1714            return Err(TypedStoreError::CrossDBBatch);
1715        }
1716        let k_buf = be_fix_int_ser(key)?;
1717        match self
1718            .transaction
1719            .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1720            .map_err(typed_store_err_from_rocks_err)?
1721        {
1722            Some(data) => Ok(Some(
1723                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1724            )),
1725            None => Ok(None),
1726        }
1727    }
1728
1729    pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1730        &self,
1731        db: &DBMap<K, V>,
1732        key: &K,
1733    ) -> Result<Option<V>, TypedStoreError> {
1734        let key_buf = be_fix_int_ser(key)?;
1735        self.transaction
1736            .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1737            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1738            .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1739    }
1740
1741    pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1742        &self,
1743        db: &DBMap<K, V>,
1744        keys: impl IntoIterator<Item = J>,
1745    ) -> Result<Vec<Option<V>>, TypedStoreError> {
1746        let cf = db.cf();
1747        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1748            .into_iter()
1749            .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1750            .collect();
1751
1752        let results = self
1753            .transaction
1754            .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1755
1756        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1757            .into_iter()
1758            .map(
1759                |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1760                    Some(data) => Ok(Some(
1761                        bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1762                    )),
1763                    None => Ok(None),
1764                },
1765            )
1766            .collect();
1767
1768        values_parsed
1769    }
1770
1771    pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1772        &'a self,
1773        db: &DBMap<K, V>,
1774    ) -> Iter<'a, K, V> {
1775        let db_iter = self
1776            .transaction
1777            .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1778        Iter::new(
1779            db.cf.clone(),
1780            RocksDBRawIter::OptimisticTransaction(db_iter),
1781            None,
1782            None,
1783            None,
1784            None,
1785            None,
1786        )
1787    }
1788
1789    pub fn commit(self) -> Result<(), TypedStoreError> {
1790        fail_point!("transaction-commit");
1791        self.transaction.commit().map_err(|e| match e.kind() {
1792            // empirically, this is what you get when there is a write conflict. it is not
1793            // documented whether this is the only time you can get this error.
1794            ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1795            _ => typed_store_err_from_rocks_err(e),
1796        })?;
1797        Ok(())
1798    }
1799}
1800
1801macro_rules! delegate_iter_call {
1802    ($self:ident.$method:ident($($args:ident),*)) => {
1803        match $self {
1804            Self::DB(db) => db.$method($($args),*),
1805            Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1806            Self::OptimisticTransaction(db) => db.$method($($args),*),
1807        }
1808    }
1809}
1810
1811pub enum RocksDBRawIter<'a> {
1812    DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1813    OptimisticTransactionDB(
1814        rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1815    ),
1816    OptimisticTransaction(
1817        rocksdb::DBRawIteratorWithThreadMode<
1818            'a,
1819            Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1820        >,
1821    ),
1822}
1823
1824impl RocksDBRawIter<'_> {
1825    pub fn valid(&self) -> bool {
1826        delegate_iter_call!(self.valid())
1827    }
1828    pub fn key(&self) -> Option<&[u8]> {
1829        delegate_iter_call!(self.key())
1830    }
1831    pub fn value(&self) -> Option<&[u8]> {
1832        delegate_iter_call!(self.value())
1833    }
1834    pub fn next(&mut self) {
1835        delegate_iter_call!(self.next())
1836    }
1837    pub fn prev(&mut self) {
1838        delegate_iter_call!(self.prev())
1839    }
1840    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1841        delegate_iter_call!(self.seek(key))
1842    }
1843    pub fn seek_to_last(&mut self) {
1844        delegate_iter_call!(self.seek_to_last())
1845    }
1846    pub fn seek_to_first(&mut self) {
1847        delegate_iter_call!(self.seek_to_first())
1848    }
1849    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1850        delegate_iter_call!(self.seek_for_prev(key))
1851    }
1852    pub fn status(&self) -> Result<(), rocksdb::Error> {
1853        delegate_iter_call!(self.status())
1854    }
1855}
1856
1857pub enum RocksDBIter<'a> {
1858    DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1859    OptimisticTransactionDB(
1860        rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1861    ),
1862}
1863
1864impl Iterator for RocksDBIter<'_> {
1865    type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1866    fn next(&mut self) -> Option<Self::Item> {
1867        match self {
1868            Self::DB(db) => db.next(),
1869            Self::OptimisticTransactionDB(db) => db.next(),
1870        }
1871    }
1872}
1873
1874impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1875where
1876    K: Serialize + DeserializeOwned,
1877    V: Serialize + DeserializeOwned,
1878{
1879    type Error = TypedStoreError;
1880    type Iterator = Iter<'a, K, V>;
1881    type SafeIterator = SafeIter<'a, K, V>;
1882
1883    #[instrument(level = "trace", skip_all, err)]
1884    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1885        let key_buf = be_fix_int_ser(key)?;
1886        // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
1887        // but no false negatives. We use it to short-circuit the absent case
1888        let readopts = self.opts.readopts();
1889        Ok(self
1890            .rocksdb
1891            .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1892            && self
1893                .rocksdb
1894                .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1895                .map_err(typed_store_err_from_rocks_err)?
1896                .is_some())
1897    }
1898
1899    #[instrument(level = "trace", skip_all, err)]
1900    fn multi_contains_keys<J>(
1901        &self,
1902        keys: impl IntoIterator<Item = J>,
1903    ) -> Result<Vec<bool>, Self::Error>
1904    where
1905        J: Borrow<K>,
1906    {
1907        let values = self.multi_get_pinned(keys)?;
1908        Ok(values.into_iter().map(|v| v.is_some()).collect())
1909    }
1910
1911    #[instrument(level = "trace", skip_all, err)]
1912    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1913        let _timer = self
1914            .db_metrics
1915            .op_metrics
1916            .rocksdb_get_latency_seconds
1917            .with_label_values(&[&self.cf])
1918            .start_timer();
1919        let perf_ctx = if self.get_sample_interval.sample() {
1920            Some(RocksDBPerfContext)
1921        } else {
1922            None
1923        };
1924        let key_buf = be_fix_int_ser(key)?;
1925        let res = self
1926            .rocksdb
1927            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1928            .map_err(typed_store_err_from_rocks_err)?;
1929        self.db_metrics
1930            .op_metrics
1931            .rocksdb_get_bytes
1932            .with_label_values(&[&self.cf])
1933            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1934        if perf_ctx.is_some() {
1935            self.db_metrics
1936                .read_perf_ctx_metrics
1937                .report_metrics(&self.cf);
1938        }
1939        match res {
1940            Some(data) => Ok(Some(
1941                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1942            )),
1943            None => Ok(None),
1944        }
1945    }
1946
1947    #[instrument(level = "trace", skip_all, err)]
1948    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1949        let timer = self
1950            .db_metrics
1951            .op_metrics
1952            .rocksdb_put_latency_seconds
1953            .with_label_values(&[&self.cf])
1954            .start_timer();
1955        let perf_ctx = if self.write_sample_interval.sample() {
1956            Some(RocksDBPerfContext)
1957        } else {
1958            None
1959        };
1960        let key_buf = be_fix_int_ser(key)?;
1961        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1962        self.db_metrics
1963            .op_metrics
1964            .rocksdb_put_bytes
1965            .with_label_values(&[&self.cf])
1966            .observe((key_buf.len() + value_buf.len()) as f64);
1967        if perf_ctx.is_some() {
1968            self.db_metrics
1969                .write_perf_ctx_metrics
1970                .report_metrics(&self.cf);
1971        }
1972        self.rocksdb
1973            .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
1974            .map_err(typed_store_err_from_rocks_err)?;
1975
1976        let elapsed = timer.stop_and_record();
1977        if elapsed > 1.0 {
1978            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1979            self.db_metrics
1980                .op_metrics
1981                .rocksdb_very_slow_puts_count
1982                .with_label_values(&[&self.cf])
1983                .inc();
1984            self.db_metrics
1985                .op_metrics
1986                .rocksdb_very_slow_puts_duration_ms
1987                .with_label_values(&[&self.cf])
1988                .inc_by((elapsed * 1000.0) as u64);
1989        }
1990
1991        Ok(())
1992    }
1993
1994    #[instrument(level = "trace", skip_all, err)]
1995    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1996        let _timer = self
1997            .db_metrics
1998            .op_metrics
1999            .rocksdb_delete_latency_seconds
2000            .with_label_values(&[&self.cf])
2001            .start_timer();
2002        let perf_ctx = if self.write_sample_interval.sample() {
2003            Some(RocksDBPerfContext)
2004        } else {
2005            None
2006        };
2007        let key_buf = be_fix_int_ser(key)?;
2008        self.rocksdb
2009            .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2010            .map_err(typed_store_err_from_rocks_err)?;
2011        self.db_metrics
2012            .op_metrics
2013            .rocksdb_deletes
2014            .with_label_values(&[&self.cf])
2015            .inc();
2016        if perf_ctx.is_some() {
2017            self.db_metrics
2018                .write_perf_ctx_metrics
2019                .report_metrics(&self.cf);
2020        }
2021        Ok(())
2022    }
2023
2024    /// This method first drops the existing column family and then creates a
2025    /// new one with the same name. The two operations are not atomic and
2026    /// hence it is possible to get into a race condition where the column
2027    /// family has been dropped but new one is not created yet
2028    #[instrument(level = "trace", skip_all, err)]
2029    fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2030        let _ = self.rocksdb.drop_cf(&self.cf);
2031        self.rocksdb
2032            .create_cf(self.cf.clone(), &default_db_options().options)
2033            .map_err(typed_store_err_from_rocks_err)?;
2034        Ok(())
2035    }
2036
2037    /// Writes a range delete tombstone to delete all entries in the db map
2038    /// If the DBMap is configured with ignore_range_deletions set to false,
2039    /// the effect of this write will be visible immediately i.e. you won't
2040    /// see old values when you do a lookup or scan. But if it is configured
2041    /// with ignore_range_deletions set to true, the old value are visible until
2042    /// compaction actually deletes them which will happen sometime after. By
2043    /// default ignore_range_deletions is set to true on a DBMap (unless it is
2044    /// overridden in the config), so please use this function with caution
2045    #[instrument(level = "trace", skip_all, err)]
2046    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2047        let first_key = self.unbounded_iter().next().map(|(k, _v)| k);
2048        let last_key = self
2049            .reversed_safe_iter_with_bounds(None, None)?
2050            .next()
2051            .transpose()?
2052            .map(|(k, _v)| k);
2053        if let Some((first_key, last_key)) = first_key.zip(last_key) {
2054            let mut batch = self.batch();
2055            batch.schedule_delete_range(self, &first_key, &last_key)?;
2056            batch.write()?;
2057        }
2058        Ok(())
2059    }
2060
2061    fn is_empty(&self) -> bool {
2062        self.safe_iter().next().is_none()
2063    }
2064
2065    /// Returns an unbounded iterator visiting each key-value pair in the map.
2066    /// This is potentially unsafe as it can perform a full table scan
2067    fn unbounded_iter(&'a self) -> Self::Iterator {
2068        let db_iter = self
2069            .rocksdb
2070            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2071        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2072        Iter::new(
2073            self.cf.clone(),
2074            db_iter,
2075            _timer,
2076            _perf_ctx,
2077            bytes_scanned,
2078            keys_scanned,
2079            Some(self.db_metrics.clone()),
2080        )
2081    }
2082
2083    /// Returns an iterator visiting each key-value pair in the map. By proving
2084    /// bounds of the scan range, RocksDB scan avoid unnecessary scans.
2085    /// Lower bound is inclusive, while upper bound is exclusive.
2086    fn iter_with_bounds(
2087        &'a self,
2088        lower_bound: Option<K>,
2089        upper_bound: Option<K>,
2090    ) -> Self::Iterator {
2091        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2092        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2093        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2094        Iter::new(
2095            self.cf.clone(),
2096            db_iter,
2097            _timer,
2098            _perf_ctx,
2099            bytes_scanned,
2100            keys_scanned,
2101            Some(self.db_metrics.clone()),
2102        )
2103    }
2104
2105    /// Similar to `iter_with_bounds` but allows specifying
2106    /// inclusivity/exclusivity of ranges explicitly. TODO: find better name
2107    fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2108        let readopts = self.create_read_options_with_range(range);
2109        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2110        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2111        Iter::new(
2112            self.cf.clone(),
2113            db_iter,
2114            _timer,
2115            _perf_ctx,
2116            bytes_scanned,
2117            keys_scanned,
2118            Some(self.db_metrics.clone()),
2119        )
2120    }
2121
2122    fn safe_iter(&'a self) -> Self::SafeIterator {
2123        let db_iter = self
2124            .rocksdb
2125            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2126        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2127        SafeIter::new(
2128            self.cf.clone(),
2129            db_iter,
2130            _timer,
2131            _perf_ctx,
2132            bytes_scanned,
2133            keys_scanned,
2134            Some(self.db_metrics.clone()),
2135        )
2136    }
2137
2138    fn safe_iter_with_bounds(
2139        &'a self,
2140        lower_bound: Option<K>,
2141        upper_bound: Option<K>,
2142    ) -> Self::SafeIterator {
2143        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2144        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2145        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2146        SafeIter::new(
2147            self.cf.clone(),
2148            db_iter,
2149            _timer,
2150            _perf_ctx,
2151            bytes_scanned,
2152            keys_scanned,
2153            Some(self.db_metrics.clone()),
2154        )
2155    }
2156
2157    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2158        let readopts = self.create_read_options_with_range(range);
2159        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2160        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2161        SafeIter::new(
2162            self.cf.clone(),
2163            db_iter,
2164            _timer,
2165            _perf_ctx,
2166            bytes_scanned,
2167            keys_scanned,
2168            Some(self.db_metrics.clone()),
2169        )
2170    }
2171
2172    /// Returns a vector of values corresponding to the keys provided.
2173    #[instrument(level = "trace", skip_all, err)]
2174    fn multi_get<J>(
2175        &self,
2176        keys: impl IntoIterator<Item = J>,
2177    ) -> Result<Vec<Option<V>>, TypedStoreError>
2178    where
2179        J: Borrow<K>,
2180    {
2181        let results = self.multi_get_pinned(keys)?;
2182        let values_parsed: Result<Vec<_>, TypedStoreError> = results
2183            .into_iter()
2184            .map(|value_byte| match value_byte {
2185                Some(data) => Ok(Some(
2186                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2187                )),
2188                None => Ok(None),
2189            })
2190            .collect();
2191
2192        values_parsed
2193    }
2194
2195    /// Convenience method for batch insertion
2196    #[instrument(level = "trace", skip_all, err)]
2197    fn multi_insert<J, U>(
2198        &self,
2199        key_val_pairs: impl IntoIterator<Item = (J, U)>,
2200    ) -> Result<(), Self::Error>
2201    where
2202        J: Borrow<K>,
2203        U: Borrow<V>,
2204    {
2205        let mut batch = self.batch();
2206        batch.insert_batch(self, key_val_pairs)?;
2207        batch.write()
2208    }
2209
2210    /// Convenience method for batch removal
2211    #[instrument(level = "trace", skip_all, err)]
2212    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2213    where
2214        J: Borrow<K>,
2215    {
2216        let mut batch = self.batch();
2217        batch.delete_batch(self, keys)?;
2218        batch.write()
2219    }
2220
2221    /// Try to catch up with primary when running as secondary
2222    #[instrument(level = "trace", skip_all, err)]
2223    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2224        self.rocksdb
2225            .try_catch_up_with_primary()
2226            .map_err(typed_store_err_from_rocks_err)
2227    }
2228}
2229
2230impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2231where
2232    J: Borrow<K>,
2233    U: Borrow<V>,
2234    K: Serialize,
2235    V: Serialize,
2236{
2237    type Error = TypedStoreError;
2238
2239    fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2240    where
2241        T: Iterator<Item = (J, U)>,
2242    {
2243        let mut batch = self.batch();
2244        batch.insert_batch(self, iter)?;
2245        batch.write()
2246    }
2247
2248    fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2249        let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2250        let mut batch = self.batch();
2251        batch.insert_batch(self, slice_of_refs)?;
2252        batch.write()
2253    }
2254}
2255
2256pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2257    env::var(var_name)
2258        .ok()?
2259        .parse::<usize>()
2260        .tap_err(|e| {
2261            warn!(
2262                "Env var {} does not contain valid usize integer: {}",
2263                var_name, e
2264            )
2265        })
2266        .ok()
2267}
2268
2269#[derive(Clone, Debug)]
2270pub struct ReadWriteOptions {
2271    pub ignore_range_deletions: bool,
2272    // Whether to sync to disk on every write.
2273    sync_to_disk: bool,
2274}
2275
2276impl ReadWriteOptions {
2277    pub fn readopts(&self) -> ReadOptions {
2278        let mut readopts = ReadOptions::default();
2279        readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2280        readopts
2281    }
2282
2283    pub fn writeopts(&self) -> WriteOptions {
2284        let mut opts = WriteOptions::default();
2285        opts.set_sync(self.sync_to_disk);
2286        opts
2287    }
2288
2289    pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2290        self.ignore_range_deletions = ignore;
2291        self
2292    }
2293}
2294
2295impl Default for ReadWriteOptions {
2296    fn default() -> Self {
2297        Self {
2298            ignore_range_deletions: true,
2299            sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2300        }
2301    }
2302}
2303// TODO: refactor this into a builder pattern, where rocksdb::Options are
2304// generated after a call to build().
2305#[derive(Default, Clone)]
2306pub struct DBOptions {
2307    pub options: rocksdb::Options,
2308    pub rw_options: ReadWriteOptions,
2309}
2310
2311impl DBOptions {
2312    // Optimize lookup perf for tables where no scans are performed.
2313    // If non-trivial number of values can be > 512B in size, it is beneficial to
2314    // also specify optimize_for_large_values_no_scan().
2315    pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2316        // NOTE: this overwrites the block options.
2317        self.options
2318            .optimize_for_point_lookup(block_cache_size_mb as u64);
2319        self
2320    }
2321
2322    // Optimize write and lookup perf for tables which are rarely scanned, and have
2323    // large values. https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html
2324    pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2325        if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2326            info!("Large value blob storage optimization is disabled via env var.");
2327            return self;
2328        }
2329
2330        // Blob settings.
2331        self.options.set_enable_blob_files(true);
2332        self.options
2333            .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2334        self.options.set_enable_blob_gc(true);
2335        // Since each blob can have non-trivial size overhead, and compression does not
2336        // work across blobs, set a min blob size in bytes to so small
2337        // transactions and effects are kept in sst files.
2338        self.options.set_min_blob_size(min_blob_size);
2339
2340        // Increase write buffer size to 256MiB.
2341        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2342            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2343            * 1024
2344            * 1024;
2345        self.options.set_write_buffer_size(write_buffer_size);
2346        // Since large blobs are not in sst files, reduce the target file size and base
2347        // level target size.
2348        let target_file_size_base = 64 << 20;
2349        self.options
2350            .set_target_file_size_base(target_file_size_base);
2351        // Level 1 default to 64MiB * 4 ~ 256MiB.
2352        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2353            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2354        self.options
2355            .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2356
2357        self
2358    }
2359
2360    // Optimize tables with a mix of lookup and scan workloads.
2361    pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2362        self.options
2363            .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2364        self
2365    }
2366
2367    // Optimize DB receiving significant insertions.
2368    pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2369        self.options
2370            .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2371        self.options
2372            .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2373        self
2374    }
2375
2376    // Optimize tables receiving significant insertions.
2377    pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2378        // Increase write buffer size to 256MiB.
2379        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2380            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2381            * 1024
2382            * 1024;
2383        self.options.set_write_buffer_size(write_buffer_size);
2384        // Increase write buffers to keep to 6 before slowing down writes.
2385        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2386            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2387        self.options
2388            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2389        // Keep 1 write buffer so recent writes can be read from memory.
2390        self.options
2391            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2392
2393        // Increase compaction trigger for level 0 to 6.
2394        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2395            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2396        self.options.set_level_zero_file_num_compaction_trigger(
2397            max_level_zero_file_num.try_into().unwrap(),
2398        );
2399        self.options.set_level_zero_slowdown_writes_trigger(
2400            (max_level_zero_file_num * 12).try_into().unwrap(),
2401        );
2402        self.options
2403            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2404
2405        // Increase sst file size to 128MiB.
2406        self.options.set_target_file_size_base(
2407            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2408                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2409                * 1024
2410                * 1024,
2411        );
2412
2413        // Increase level 1 target size to 256MiB * 6 ~ 1.5GiB.
2414        self.options
2415            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2416
2417        self
2418    }
2419
2420    // Optimize tables receiving significant insertions, without any deletions.
2421    // TODO: merge this function with optimize_for_write_throughput(), and use a
2422    // flag to indicate if deletion is received.
2423    pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2424        // Increase write buffer size to 256MiB.
2425        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2426            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2427            * 1024
2428            * 1024;
2429        self.options.set_write_buffer_size(write_buffer_size);
2430        // Increase write buffers to keep to 6 before slowing down writes.
2431        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2432            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2433        self.options
2434            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2435        // Keep 1 write buffer so recent writes can be read from memory.
2436        self.options
2437            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2438
2439        // Switch to universal compactions.
2440        self.options
2441            .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2442        let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2443        compaction_options.set_max_size_amplification_percent(10000);
2444        compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2445        self.options
2446            .set_universal_compaction_options(&compaction_options);
2447
2448        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2449            .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2450        self.options.set_level_zero_file_num_compaction_trigger(
2451            max_level_zero_file_num.try_into().unwrap(),
2452        );
2453        self.options.set_level_zero_slowdown_writes_trigger(
2454            (max_level_zero_file_num * 12).try_into().unwrap(),
2455        );
2456        self.options
2457            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2458
2459        // Increase sst file size to 128MiB.
2460        self.options.set_target_file_size_base(
2461            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2462                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2463                * 1024
2464                * 1024,
2465        );
2466
2467        // This should be a no-op for universal compaction but increasing it to be safe.
2468        self.options
2469            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2470
2471        self
2472    }
2473
2474    // Overrides the block options with different block cache size and block size.
2475    pub fn set_block_options(
2476        mut self,
2477        block_cache_size_mb: usize,
2478        block_size_bytes: usize,
2479    ) -> DBOptions {
2480        self.options
2481            .set_block_based_table_factory(&get_block_options(
2482                block_cache_size_mb,
2483                block_size_bytes,
2484            ));
2485        self
2486    }
2487
2488    // Disables write stalling and stopping based on pending compaction bytes.
2489    pub fn disable_write_throttling(mut self) -> DBOptions {
2490        self.options.set_soft_pending_compaction_bytes_limit(0);
2491        self.options.set_hard_pending_compaction_bytes_limit(0);
2492        self
2493    }
2494}
2495
2496/// Creates a default RocksDB option, to be used when RocksDB option is
2497/// unspecified.
2498pub fn default_db_options() -> DBOptions {
2499    let mut opt = rocksdb::Options::default();
2500
2501    // One common issue when running tests on Mac is that the default ulimit is too
2502    // low, leading to I/O errors such as "Too many open files". Raising fdlimit
2503    // to bypass it.
2504    if let Some(limit) = fdlimit::raise_fd_limit() {
2505        // on windows raise_fd_limit return None
2506        opt.set_max_open_files((limit / 8) as i32);
2507    }
2508
2509    // The table cache is locked for updates and this determines the number
2510    // of shards, ie 2^10. Increase in case of lock contentions.
2511    opt.set_table_cache_num_shard_bits(10);
2512
2513    // LSM compression settings
2514    opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2515    opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2516    opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2517
2518    // IOTA uses multiple RocksDB in a node, so total sizes of write buffers and WAL
2519    // can be higher than the limits below.
2520    //
2521    // RocksDB also exposes the option to configure total write buffer size across
2522    // multiple instances via `write_buffer_manager`. But the write buffer flush
2523    // policy (flushing the buffer receiving the next write) may not work well.
2524    // So sticking to per-db write buffer size limit for now.
2525    //
2526    // The environment variables are only meant to be emergency overrides. They may
2527    // go away in future. It is preferable to update the default value, or
2528    // override the option in code.
2529    opt.set_db_write_buffer_size(
2530        read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2531            * 1024
2532            * 1024,
2533    );
2534    opt.set_max_total_wal_size(
2535        read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2536    );
2537
2538    // Num threads for compactions and memtable flushes.
2539    opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2540
2541    opt.set_enable_pipelined_write(true);
2542
2543    // Increase block size to 16KiB.
2544    // https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md#indexes-and-filter-blocks
2545    opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2546
2547    // Set memtable bloomfilter.
2548    opt.set_memtable_prefix_bloom_ratio(0.02);
2549
2550    DBOptions {
2551        options: opt,
2552        rw_options: ReadWriteOptions::default(),
2553    }
2554}
2555
2556fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2557    // Set options mostly similar to those used in optimize_for_point_lookup(),
2558    // except non-default binary and hash index, to hopefully reduce lookup
2559    // latencies without causing any regression for scanning, with slightly more
2560    // memory usages. https://github.com/facebook/rocksdb/blob/11cb6af6e5009c51794641905ca40ce5beec7fee/options/options.cc#L611-L621
2561    let mut block_options = BlockBasedOptions::default();
2562    // Overrides block size.
2563    block_options.set_block_size(block_size_bytes);
2564    // Configure a block cache.
2565    block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2566    // Set a bloomfilter with 1% false positive rate.
2567    block_options.set_bloom_filter(10.0, false);
2568    // From https://github.com/EighteenZi/rocksdb_wiki/blob/master/Block-Cache.md#caching-index-and-filter-blocks
2569    block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2570    block_options
2571}
2572
2573/// Opens a database with options, and a number of column families that are
2574/// created if they do not exist.
2575#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2576pub fn open_cf<P: AsRef<Path>>(
2577    path: P,
2578    db_options: Option<rocksdb::Options>,
2579    metric_conf: MetricConf,
2580    opt_cfs: &[&str],
2581) -> Result<Arc<RocksDB>, TypedStoreError> {
2582    let options = db_options.unwrap_or_else(|| default_db_options().options);
2583    let column_descriptors: Vec<_> = opt_cfs
2584        .iter()
2585        .map(|name| (*name, options.clone()))
2586        .collect();
2587    open_cf_opts(
2588        path,
2589        Some(options.clone()),
2590        metric_conf,
2591        &column_descriptors[..],
2592    )
2593}
2594
2595fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2596    // Customize database options
2597    let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2598    options.create_if_missing(true);
2599    options.create_missing_column_families(true);
2600    options
2601}
2602
2603/// Opens a database with options, and a number of column families with
2604/// individual options that are created if they do not exist.
2605#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2606pub fn open_cf_opts<P: AsRef<Path>>(
2607    path: P,
2608    db_options: Option<rocksdb::Options>,
2609    metric_conf: MetricConf,
2610    opt_cfs: &[(&str, rocksdb::Options)],
2611) -> Result<Arc<RocksDB>, TypedStoreError> {
2612    let path = path.as_ref();
2613    // In the simulator, we intercept the wall clock in the test thread only. This
2614    // causes problems because rocksdb uses the simulated clock when creating
2615    // its background threads, but then those threads see the real wall clock
2616    // (because they are not the test thread), which causes rocksdb to panic.
2617    // The `nondeterministic` macro evaluates expressions in new threads, which
2618    // resolves the issue.
2619    //
2620    // This is a no-op in non-simulator builds.
2621
2622    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2623    nondeterministic!({
2624        let options = prepare_db_options(db_options);
2625        let rocksdb = {
2626            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2627                &options,
2628                path,
2629                cfs.into_iter()
2630                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2631            )
2632            .map_err(typed_store_err_from_rocks_err)?
2633        };
2634        Ok(Arc::new(RocksDB::DBWithThreadMode(
2635            DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2636        )))
2637    })
2638}
2639
2640/// Opens a database with options, and a number of column families with
2641/// individual options that are created if they do not exist.
2642#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2643pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2644    path: P,
2645    db_options: Option<rocksdb::Options>,
2646    metric_conf: MetricConf,
2647    opt_cfs: &[(&str, rocksdb::Options)],
2648) -> Result<Arc<RocksDB>, TypedStoreError> {
2649    let path = path.as_ref();
2650    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2651    // See comment above for explanation of why nondeterministic is necessary here.
2652    nondeterministic!({
2653        let options = prepare_db_options(db_options);
2654        let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2655            &options,
2656            path,
2657            cfs.into_iter()
2658                .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2659        )
2660        .map_err(typed_store_err_from_rocks_err)?;
2661        Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2662            OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2663        )))
2664    })
2665}
2666
2667/// Opens a database with options, and a number of column families with
2668/// individual options that are created if they do not exist.
2669pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2670    primary_path: P,
2671    secondary_path: Option<P>,
2672    db_options: Option<rocksdb::Options>,
2673    metric_conf: MetricConf,
2674    opt_cfs: &[(&str, rocksdb::Options)],
2675) -> Result<Arc<RocksDB>, TypedStoreError> {
2676    let primary_path = primary_path.as_ref();
2677    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2678    // See comment above for explanation of why nondeterministic is necessary here.
2679    nondeterministic!({
2680        // Customize database options
2681        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2682
2683        fdlimit::raise_fd_limit();
2684        // This is a requirement by RocksDB when opening as secondary
2685        options.set_max_open_files(-1);
2686
2687        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2688        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2689            .ok()
2690            .unwrap_or_default();
2691
2692        let default_db_options = default_db_options();
2693        // Add CFs not explicitly listed
2694        for cf_key in cfs.iter() {
2695            if !opt_cfs.contains_key(&cf_key[..]) {
2696                opt_cfs.insert(cf_key, default_db_options.options.clone());
2697            }
2698        }
2699
2700        let primary_path = primary_path.to_path_buf();
2701        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2702            let mut s = primary_path.clone();
2703            s.pop();
2704            s.push("SECONDARY");
2705            s.as_path().to_path_buf()
2706        });
2707
2708        let rocksdb = {
2709            options.create_if_missing(true);
2710            options.create_missing_column_families(true);
2711            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2712                &options,
2713                &primary_path,
2714                &secondary_path,
2715                opt_cfs
2716                    .iter()
2717                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2718            )
2719            .map_err(typed_store_err_from_rocks_err)?;
2720            db.try_catch_up_with_primary()
2721                .map_err(typed_store_err_from_rocks_err)?;
2722            db
2723        };
2724        Ok(Arc::new(RocksDB::DBWithThreadMode(
2725            DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2726        )))
2727    })
2728}
2729
2730pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2731    const DB_DEFAULT_CF_NAME: &str = "default";
2732
2733    let opts = rocksdb::Options::default();
2734    rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2735        .map_err(|e| e.into())
2736        .map(|q| {
2737            q.iter()
2738                .filter_map(|s| {
2739                    // The `default` table is not used
2740                    if s != DB_DEFAULT_CF_NAME {
2741                        Some(s.clone())
2742                    } else {
2743                        None
2744                    }
2745                })
2746                .collect()
2747        })
2748}
2749
2750/// TODO: Good description of why we're doing this : RocksDB stores keys in BE and has a seek operator on iterators, see `https://github.com/facebook/rocksdb/wiki/Iterator#introduction`
2751#[inline]
2752pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2753where
2754    S: ?Sized + serde::Serialize,
2755{
2756    bincode::DefaultOptions::new()
2757        .with_big_endian()
2758        .with_fixint_encoding()
2759        .serialize(t)
2760        .map_err(typed_store_err_from_bincode_err)
2761}
2762
2763#[derive(Clone)]
2764pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2765impl DBMapTableConfigMap {
2766    pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2767        Self(map)
2768    }
2769
2770    pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2771        self.0.clone()
2772    }
2773}
2774
2775pub enum RocksDBAccessType {
2776    Primary,
2777    Secondary(Option<PathBuf>),
2778}
2779
2780pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2781    rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2782}
2783
2784fn populate_missing_cfs(
2785    input_cfs: &[(&str, rocksdb::Options)],
2786    path: &Path,
2787) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2788    let mut cfs = vec![];
2789    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2790    let existing_cfs =
2791        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2792            .ok()
2793            .unwrap_or_default();
2794
2795    for cf_name in existing_cfs {
2796        if !input_cf_index.contains(&cf_name[..]) {
2797            cfs.push((cf_name, rocksdb::Options::default()));
2798        }
2799    }
2800    cfs.extend(
2801        input_cfs
2802            .iter()
2803            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2804    );
2805    Ok(cfs)
2806}
2807
2808/// Given a vec<u8>, find the value which is one more than the vector
2809/// if the vector was a big endian number.
2810/// If the vector is already minimum, don't change it.
2811fn big_endian_saturating_add_one(v: &mut [u8]) {
2812    if is_max(v) {
2813        return;
2814    }
2815    for i in (0..v.len()).rev() {
2816        if v[i] == u8::MAX {
2817            v[i] = 0;
2818        } else {
2819            v[i] += 1;
2820            break;
2821        }
2822    }
2823}
2824
2825/// Check if all the bytes in the vector are 0xFF
2826fn is_max(v: &[u8]) -> bool {
2827    v.iter().all(|&x| x == u8::MAX)
2828}
2829
2830// `clippy::manual_div_ceil` is raised by code expanded by the
2831// `uint::construct_uint!` macro so it needs to be fixed by `uint`
2832#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2833#[test]
2834fn test_helpers() {
2835    let v = vec![];
2836    assert!(is_max(&v));
2837
2838    fn check_add(v: Vec<u8>) {
2839        let mut v = v;
2840        let num = Num32::from_big_endian(&v);
2841        big_endian_saturating_add_one(&mut v);
2842        assert!(num + 1 == Num32::from_big_endian(&v));
2843    }
2844
2845    uint::construct_uint! {
2846        // 32 byte number
2847        struct Num32(4);
2848    }
2849
2850    let mut v = vec![255; 32];
2851    big_endian_saturating_add_one(&mut v);
2852    assert!(Num32::MAX == Num32::from_big_endian(&v));
2853
2854    check_add(vec![1; 32]);
2855    check_add(vec![6; 32]);
2856    check_add(vec![254; 32]);
2857
2858    // TBD: More tests coming with randomized arrays
2859}