typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod keys;
8pub(crate) mod safe_iter;
9pub mod util;
10pub(crate) mod values;
11
12use std::{
13    borrow::Borrow,
14    collections::{BTreeMap, HashSet},
15    env,
16    ffi::CStr,
17    marker::PhantomData,
18    ops::{Bound, RangeBounds},
19    path::{Path, PathBuf},
20    sync::Arc,
21    time::Duration,
22};
23
24use bincode::Options;
25use collectable::TryExtend;
26use iota_macros::{fail_point, nondeterministic};
27use itertools::Itertools;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::{
30    AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
31    ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
32    IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
33    ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
34    WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
35};
36use serde::{Serialize, de::DeserializeOwned};
37use tap::TapFallible;
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, instrument, warn};
40
41use self::{iter::Iter, keys::Keys, values::Values};
42use crate::{
43    TypedStoreError,
44    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
45    rocks::{
46        errors::{
47            typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
48            typed_store_err_from_rocks_err,
49        },
50        safe_iter::SafeIter,
51    },
52    traits::{Map, TableSummary},
53};
54
55// Write buffer size per RocksDB instance can be set via the env var below.
56// If the env var is not set, use the default value in MiB.
57const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
58const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
59
60// Write ahead log size per RocksDB instance can be set via the env var below.
61// If the env var is not set, use the default value in MiB.
62const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
63const DEFAULT_DB_WAL_SIZE: usize = 1024;
64
65// Environment variable to control behavior of write throughput optimized
66// tables.
67const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
68const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
69const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
70const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
71const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
72const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
73const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
74const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
75const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
76
77// Set to 1 to disable blob storage for transactions and effects.
78const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
79
80const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
81
82// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property
83// built-in. From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
84const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
85    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
86
87const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
88
89#[cfg(test)]
90mod tests;
91
92/// A helper macro to reopen multiple column families. The macro returns
93/// a tuple of DBMap structs in the same order that the column families
94/// are defined.
95///
96/// # Arguments
97///
98/// * `db` - a reference to a rocks DB object
99/// * `cf;<ty,ty>` - a comma separated list of column families to open. For each
100///   column family a concatenation of column family name (cf) and Key-Value
101///   <ty, ty> should be provided.
102///
103/// # Examples
104///
105/// We successfully open two different column families.
106/// ```
107/// use typed_store::reopen;
108/// use typed_store::rocks::*;
109/// use tempfile::tempdir;
110/// use prometheus::Registry;
111/// use std::sync::Arc;
112/// use typed_store::metrics::DBMetrics;
113/// use core::fmt::Error;
114///
115/// #[tokio::main]
116/// async fn main() -> Result<(), Error> {
117/// const FIRST_CF: &str = "First_CF";
118/// const SECOND_CF: &str = "Second_CF";
119///
120///
121/// /// Create the rocks database reference for the desired column families
122/// let rocks = open_cf(tempdir().unwrap(), None, MetricConf::default(), &[FIRST_CF, SECOND_CF]).unwrap();
123///
124/// /// Now simply open all the column families for their expected Key-Value types
125/// let (db_map_1, db_map_2) = reopen!(&rocks, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
126/// Ok(())
127/// }
128/// ```
129#[macro_export]
130macro_rules! reopen {
131    ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
132        (
133            $(
134                DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
135            ),*
136        )
137    };
138}
139
140/// Repeatedly attempt an Optimistic Transaction until it succeeds.
141/// Since many callsites (e.g. the consensus handler) cannot proceed in the case
142/// of failed writes, this will loop forever until the transaction succeeds.
143#[macro_export]
144macro_rules! retry_transaction {
145    ($transaction:expr) => {
146        retry_transaction!($transaction, Some(20))
147    };
148
149    (
150        $transaction:expr,
151        $max_retries:expr // should be an Option<int type>, None for unlimited
152        $(,)?
153
154    ) => {{
155        use std::time::Duration;
156
157        use rand::{
158            distributions::{Distribution, Uniform},
159            rngs::ThreadRng,
160        };
161        use tracing::{error, info};
162
163        let mut retries = 0;
164        let max_retries = $max_retries;
165        loop {
166            let status = $transaction;
167            match status {
168                Err(TypedStoreError::RetryableTransaction) => {
169                    retries += 1;
170                    // Randomized delay to help racing transactions get out of each other's way.
171                    let delay = {
172                        let mut rng = ThreadRng::default();
173                        Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
174                    };
175                    if let Some(max_retries) = max_retries {
176                        if retries > max_retries {
177                            error!(?max_retries, "max retries exceeded");
178                            break status;
179                        }
180                    }
181                    if retries > 10 {
182                        // TODO: monitoring needed?
183                        error!(?delay, ?retries, "excessive transaction retries...");
184                    } else {
185                        info!(
186                            ?delay,
187                            ?retries,
188                            "transaction write conflict detected, sleeping"
189                        );
190                    }
191                    std::thread::sleep(delay);
192                }
193                _ => break status,
194            }
195        }
196    }};
197}
198
199#[macro_export]
200macro_rules! retry_transaction_forever {
201    ($transaction:expr) => {
202        $crate::retry_transaction!($transaction, None)
203    };
204}
205
206#[derive(Debug)]
207pub struct DBWithThreadModeWrapper {
208    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
209    pub metric_conf: MetricConf,
210    pub db_path: PathBuf,
211}
212
213impl DBWithThreadModeWrapper {
214    fn new(
215        underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
216        metric_conf: MetricConf,
217        db_path: PathBuf,
218    ) -> Self {
219        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
220        Self {
221            underlying,
222            metric_conf,
223            db_path,
224        }
225    }
226}
227
228impl Drop for DBWithThreadModeWrapper {
229    fn drop(&mut self) {
230        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
231    }
232}
233
234#[derive(Debug)]
235pub struct OptimisticTransactionDBWrapper {
236    pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
237    pub metric_conf: MetricConf,
238    pub db_path: PathBuf,
239}
240
241impl OptimisticTransactionDBWrapper {
242    fn new(
243        underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
244        metric_conf: MetricConf,
245        db_path: PathBuf,
246    ) -> Self {
247        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
248        Self {
249            underlying,
250            metric_conf,
251            db_path,
252        }
253    }
254}
255
256impl Drop for OptimisticTransactionDBWrapper {
257    fn drop(&mut self) {
258        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
259    }
260}
261
262/// Thin wrapper to unify interface across different db types
263#[derive(Debug)]
264pub enum RocksDB {
265    DBWithThreadMode(DBWithThreadModeWrapper),
266    OptimisticTransactionDB(OptimisticTransactionDBWrapper),
267}
268
269macro_rules! delegate_call {
270    ($self:ident.$method:ident($($args:ident),*)) => {
271        match $self {
272            Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
273            Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
274        }
275    }
276}
277
278impl Drop for RocksDB {
279    fn drop(&mut self) {
280        delegate_call!(self.cancel_all_background_work(/* wait */ true))
281    }
282}
283
284impl RocksDB {
285    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
286        delegate_call!(self.get(key))
287    }
288
289    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
290        &'a self,
291        keys: I,
292        readopts: &ReadOptions,
293    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
294    where
295        K: AsRef<[u8]>,
296        I: IntoIterator<Item = (&'b W, K)>,
297        W: 'b + AsColumnFamilyRef,
298    {
299        delegate_call!(self.multi_get_cf_opt(keys, readopts))
300    }
301
302    pub fn batched_multi_get_cf_opt<I, K>(
303        &self,
304        cf: &impl AsColumnFamilyRef,
305        keys: I,
306        sorted_input: bool,
307        readopts: &ReadOptions,
308    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
309    where
310        I: IntoIterator<Item = K>,
311        K: AsRef<[u8]>,
312    {
313        delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
314    }
315
316    pub fn property_int_value_cf(
317        &self,
318        cf: &impl AsColumnFamilyRef,
319        name: impl CStrLike,
320    ) -> Result<Option<u64>, rocksdb::Error> {
321        delegate_call!(self.property_int_value_cf(cf, name))
322    }
323
324    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
325        &self,
326        cf: &impl AsColumnFamilyRef,
327        key: K,
328        readopts: &ReadOptions,
329    ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
330        delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
331    }
332
333    pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
334        delegate_call!(self.cf_handle(name))
335    }
336
337    pub fn create_cf<N: AsRef<str>>(
338        &self,
339        name: N,
340        opts: &rocksdb::Options,
341    ) -> Result<(), rocksdb::Error> {
342        delegate_call!(self.create_cf(name, opts))
343    }
344
345    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
346        delegate_call!(self.drop_cf(name))
347    }
348
349    pub fn delete_file_in_range<K: AsRef<[u8]>>(
350        &self,
351        cf: &impl AsColumnFamilyRef,
352        from: K,
353        to: K,
354    ) -> Result<(), rocksdb::Error> {
355        delegate_call!(self.delete_file_in_range_cf(cf, from, to))
356    }
357
358    pub fn delete_cf<K: AsRef<[u8]>>(
359        &self,
360        cf: &impl AsColumnFamilyRef,
361        key: K,
362        writeopts: &WriteOptions,
363    ) -> Result<(), rocksdb::Error> {
364        fail_point!("delete-cf-before");
365        let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
366        fail_point!("delete-cf-after");
367        #[expect(clippy::let_and_return)]
368        ret
369    }
370
371    pub fn path(&self) -> &Path {
372        delegate_call!(self.path())
373    }
374
375    pub fn put_cf<K, V>(
376        &self,
377        cf: &impl AsColumnFamilyRef,
378        key: K,
379        value: V,
380        writeopts: &WriteOptions,
381    ) -> Result<(), rocksdb::Error>
382    where
383        K: AsRef<[u8]>,
384        V: AsRef<[u8]>,
385    {
386        fail_point!("put-cf-before");
387        let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
388        fail_point!("put-cf-after");
389        #[expect(clippy::let_and_return)]
390        ret
391    }
392
393    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
394        &self,
395        cf: &impl AsColumnFamilyRef,
396        key: K,
397        readopts: &ReadOptions,
398    ) -> bool {
399        delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
400    }
401
402    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
403        delegate_call!(self.try_catch_up_with_primary())
404    }
405
406    pub fn write(
407        &self,
408        batch: RocksDBBatch,
409        writeopts: &WriteOptions,
410    ) -> Result<(), TypedStoreError> {
411        fail_point!("batch-write-before");
412        let ret = match (self, batch) {
413            (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
414                db.underlying
415                    .write_opt(batch, writeopts)
416                    .map_err(typed_store_err_from_rocks_err)?;
417                Ok(())
418            }
419            (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
420                db.underlying
421                    .write_opt(batch, writeopts)
422                    .map_err(typed_store_err_from_rocks_err)?;
423                Ok(())
424            }
425            _ => Err(TypedStoreError::RocksDB(
426                "using invalid batch type for the database".to_string(),
427            )),
428        };
429        fail_point!("batch-write-after");
430        #[expect(clippy::let_and_return)]
431        ret
432    }
433
434    pub fn transaction_without_snapshot(
435        &self,
436    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
437        match self {
438            Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
439            Self::DBWithThreadMode(_) => panic!(),
440        }
441    }
442
443    pub fn transaction(
444        &self,
445    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
446        match self {
447            Self::OptimisticTransactionDB(db) => {
448                let mut tx_opts = OptimisticTransactionOptions::new();
449                tx_opts.set_snapshot(true);
450
451                Ok(db
452                    .underlying
453                    .transaction_opt(&WriteOptions::default(), &tx_opts))
454            }
455            Self::DBWithThreadMode(_) => panic!(),
456        }
457    }
458
459    pub fn raw_iterator_cf<'a: 'b, 'b>(
460        &'a self,
461        cf_handle: &impl AsColumnFamilyRef,
462        readopts: ReadOptions,
463    ) -> RocksDBRawIter<'b> {
464        match self {
465            Self::DBWithThreadMode(db) => {
466                RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
467            }
468            Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
469                db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
470            ),
471        }
472    }
473
474    pub fn iterator_cf<'a: 'b, 'b>(
475        &'a self,
476        cf_handle: &impl AsColumnFamilyRef,
477        readopts: ReadOptions,
478        mode: IteratorMode<'_>,
479    ) -> RocksDBIter<'b> {
480        match self {
481            Self::DBWithThreadMode(db) => {
482                RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
483            }
484            Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
485                db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
486            ),
487        }
488    }
489
490    pub fn compact_range_cf<K: AsRef<[u8]>>(
491        &self,
492        cf: &impl AsColumnFamilyRef,
493        start: Option<K>,
494        end: Option<K>,
495    ) {
496        delegate_call!(self.compact_range_cf(cf, start, end))
497    }
498
499    pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
500        &self,
501        cf: &impl AsColumnFamilyRef,
502        start: Option<K>,
503        end: Option<K>,
504    ) {
505        let opt = &mut CompactOptions::default();
506        opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
507        delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
508    }
509
510    pub fn flush(&self) -> Result<(), TypedStoreError> {
511        delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
512    }
513
514    pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
515        match self {
516            Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
517            Self::OptimisticTransactionDB(d) => {
518                RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
519            }
520        }
521    }
522
523    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
524        let checkpoint = match self {
525            Self::DBWithThreadMode(d) => {
526                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
527            }
528            Self::OptimisticTransactionDB(d) => {
529                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
530            }
531        };
532        checkpoint
533            .create_checkpoint(path)
534            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
535        Ok(())
536    }
537
538    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
539        delegate_call!(self.flush_cf(cf))
540    }
541
542    pub fn set_options_cf(
543        &self,
544        cf: &impl AsColumnFamilyRef,
545        opts: &[(&str, &str)],
546    ) -> Result<(), rocksdb::Error> {
547        delegate_call!(self.set_options_cf(cf, opts))
548    }
549
550    pub fn get_sampling_interval(&self) -> SamplingInterval {
551        match self {
552            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
553            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
554        }
555    }
556
557    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
558        match self {
559            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
560            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
561        }
562    }
563
564    pub fn write_sampling_interval(&self) -> SamplingInterval {
565        match self {
566            Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
567            Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
568        }
569    }
570
571    pub fn iter_sampling_interval(&self) -> SamplingInterval {
572        match self {
573            Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
574            Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
575        }
576    }
577
578    pub fn db_name(&self) -> String {
579        let name = match self {
580            Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
581            Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
582        };
583        if name.is_empty() {
584            self.default_db_name()
585        } else {
586            name.clone()
587        }
588    }
589
590    fn default_db_name(&self) -> String {
591        self.path()
592            .file_name()
593            .and_then(|f| f.to_str())
594            .unwrap_or("unknown")
595            .to_string()
596    }
597
598    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
599        delegate_call!(self.live_files())
600    }
601}
602
603// Check if the database is corrupted, and if so, panic.
604// If the corrupted key is not set, we set it to [1].
605pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
606    let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
607
608    db.get(DB_CORRUPTED_KEY)
609        .map_err(|e| format!("Failed to open database: {e}"))
610        .and_then(|value| match value {
611            Some(v) if v[0] == 1 => Err(
612                "Database is corrupted, please remove the current database and start clean!"
613                    .to_string(),
614            ),
615            Some(_) => Ok(()),
616            None => db
617                .put(DB_CORRUPTED_KEY, [1])
618                .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
619        })?;
620
621    Ok(())
622}
623
624pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
625    rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
626}
627
628pub enum RocksDBSnapshot<'a> {
629    DBWithThreadMode(rocksdb::Snapshot<'a>),
630    OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
631}
632
633impl<'a> RocksDBSnapshot<'a> {
634    pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
635        &'a self,
636        keys: I,
637        readopts: ReadOptions,
638    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
639    where
640        K: AsRef<[u8]>,
641        I: IntoIterator<Item = (&'b W, K)>,
642        W: 'b + AsColumnFamilyRef,
643    {
644        match self {
645            Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
646            Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
647        }
648    }
649    pub fn multi_get_cf<'b: 'a, K, I, W>(
650        &'a self,
651        keys: I,
652    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
653    where
654        K: AsRef<[u8]>,
655        I: IntoIterator<Item = (&'b W, K)>,
656        W: 'b + AsColumnFamilyRef,
657    {
658        match self {
659            Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
660            Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
661        }
662    }
663}
664
665pub enum RocksDBBatch {
666    Regular(rocksdb::WriteBatch),
667    Transactional(rocksdb::WriteBatchWithTransaction<true>),
668}
669
670macro_rules! delegate_batch_call {
671    ($self:ident.$method:ident($($args:ident),*)) => {
672        match $self {
673            Self::Regular(b) => b.$method($($args),*),
674            Self::Transactional(b) => b.$method($($args),*),
675        }
676    }
677}
678
679impl RocksDBBatch {
680    fn size_in_bytes(&self) -> usize {
681        delegate_batch_call!(self.size_in_bytes())
682    }
683
684    pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
685        delegate_batch_call!(self.delete_cf(cf, key))
686    }
687
688    pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
689    where
690        K: AsRef<[u8]>,
691        V: AsRef<[u8]>,
692    {
693        delegate_batch_call!(self.put_cf(cf, key, value))
694    }
695
696    pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
697    where
698        K: AsRef<[u8]>,
699        V: AsRef<[u8]>,
700    {
701        delegate_batch_call!(self.merge_cf(cf, key, value))
702    }
703
704    pub fn delete_range_cf<K: AsRef<[u8]>>(
705        &mut self,
706        cf: &impl AsColumnFamilyRef,
707        from: K,
708        to: K,
709    ) -> Result<(), TypedStoreError> {
710        match self {
711            Self::Regular(batch) => {
712                batch.delete_range_cf(cf, from, to);
713                Ok(())
714            }
715            Self::Transactional(_) => panic!(),
716        }
717    }
718}
719
720#[derive(Debug, Default)]
721pub struct MetricConf {
722    pub db_name: String,
723    pub read_sample_interval: SamplingInterval,
724    pub write_sample_interval: SamplingInterval,
725    pub iter_sample_interval: SamplingInterval,
726}
727
728impl MetricConf {
729    pub fn new(db_name: &str) -> Self {
730        if db_name.is_empty() {
731            error!("A meaningful db name should be used for metrics reporting.")
732        }
733        Self {
734            db_name: db_name.to_string(),
735            read_sample_interval: SamplingInterval::default(),
736            write_sample_interval: SamplingInterval::default(),
737            iter_sample_interval: SamplingInterval::default(),
738        }
739    }
740
741    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
742        Self {
743            db_name: self.db_name,
744            read_sample_interval: read_interval,
745            write_sample_interval: SamplingInterval::default(),
746            iter_sample_interval: SamplingInterval::default(),
747        }
748    }
749}
750const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
751const METRICS_ERROR: i64 = -1;
752
753/// An interface to a rocksDB database, keyed by a columnfamily
754#[derive(Clone, Debug)]
755pub struct DBMap<K, V> {
756    pub rocksdb: Arc<RocksDB>,
757    _phantom: PhantomData<fn(K) -> V>,
758    // the rocksDB ColumnFamily under which the map is stored
759    cf: String,
760    pub opts: ReadWriteOptions,
761    db_metrics: Arc<DBMetrics>,
762    get_sample_interval: SamplingInterval,
763    multiget_sample_interval: SamplingInterval,
764    write_sample_interval: SamplingInterval,
765    iter_sample_interval: SamplingInterval,
766    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
767}
768
769unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
770
771impl<K, V> DBMap<K, V> {
772    pub(crate) fn new(
773        db: Arc<RocksDB>,
774        opts: &ReadWriteOptions,
775        opt_cf: &str,
776        is_deprecated: bool,
777    ) -> Self {
778        let db_cloned = db.clone();
779        let db_metrics = DBMetrics::get();
780        let db_metrics_cloned = db_metrics.clone();
781        let cf = opt_cf.to_string();
782        let (sender, mut recv) = tokio::sync::oneshot::channel();
783        if !is_deprecated {
784            tokio::task::spawn(async move {
785                let mut interval =
786                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
787                loop {
788                    tokio::select! {
789                        _ = interval.tick() => {
790                            let db = db_cloned.clone();
791                            let cf = cf.clone();
792                            let db_metrics = db_metrics.clone();
793                            if let Err(e) = tokio::task::spawn_blocking(move || {
794                                Self::report_metrics(&db, &cf, &db_metrics);
795                            }).await {
796                                error!("Failed to log metrics with error: {}", e);
797                            }
798                        }
799                        _ = &mut recv => break,
800                    }
801                }
802                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
803            });
804        }
805        DBMap {
806            rocksdb: db.clone(),
807            opts: opts.clone(),
808            _phantom: PhantomData,
809            cf: opt_cf.to_string(),
810            db_metrics: db_metrics_cloned,
811            _metrics_task_cancel_handle: Arc::new(sender),
812            get_sample_interval: db.get_sampling_interval(),
813            multiget_sample_interval: db.multiget_sampling_interval(),
814            write_sample_interval: db.write_sampling_interval(),
815            iter_sample_interval: db.iter_sampling_interval(),
816        }
817    }
818
819    /// Opens a database from a path, with specific options and an optional
820    /// column family.
821    ///
822    /// This database is used to perform operations on single column family, and
823    /// parametrizes all operations in `DBBatch` when writing across column
824    /// families.
825    #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
826    pub fn open<P: AsRef<Path>>(
827        path: P,
828        metric_conf: MetricConf,
829        db_options: Option<rocksdb::Options>,
830        opt_cf: Option<&str>,
831        rw_options: &ReadWriteOptions,
832    ) -> Result<Self, TypedStoreError> {
833        let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
834        let cfs = vec![cf_key];
835        let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
836        Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
837    }
838
839    /// Reopens an open database as a typed map operating under a specific
840    /// column family. if no column family is passed, the default column
841    /// family is used.
842    ///
843    /// ```
844    /// use core::fmt::Error;
845    /// use std::sync::Arc;
846    ///
847    /// use prometheus::Registry;
848    /// use tempfile::tempdir;
849    /// use typed_store::{metrics::DBMetrics, rocks::*};
850    /// #[tokio::main]
851    /// async fn main() -> Result<(), Error> {
852    ///     /// Open the DB with all needed column families first.
853    ///     let rocks = open_cf(
854    ///         tempdir().unwrap(),
855    ///         None,
856    ///         MetricConf::default(),
857    ///         &["First_CF", "Second_CF"],
858    ///     )
859    ///     .unwrap();
860    ///     /// Attach the column families to specific maps.
861    ///     let db_cf_1 = DBMap::<u32, u32>::reopen(
862    ///         &rocks,
863    ///         Some("First_CF"),
864    ///         &ReadWriteOptions::default(),
865    ///         false,
866    ///     )
867    ///     .expect("Failed to open storage");
868    ///     let db_cf_2 = DBMap::<u32, u32>::reopen(
869    ///         &rocks,
870    ///         Some("Second_CF"),
871    ///         &ReadWriteOptions::default(),
872    ///         false,
873    ///     )
874    ///     .expect("Failed to open storage");
875    ///     Ok(())
876    /// }
877    /// ```
878    #[instrument(level = "debug", skip(db), err)]
879    pub fn reopen(
880        db: &Arc<RocksDB>,
881        opt_cf: Option<&str>,
882        rw_options: &ReadWriteOptions,
883        is_deprecated: bool,
884    ) -> Result<Self, TypedStoreError> {
885        let cf_key = opt_cf
886            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
887            .to_owned();
888
889        db.cf_handle(&cf_key)
890            .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
891
892        Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
893    }
894
895    pub fn batch(&self) -> DBBatch {
896        let batch = match *self.rocksdb {
897            RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
898            RocksDB::OptimisticTransactionDB(_) => {
899                RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
900            }
901        };
902        DBBatch::new(
903            &self.rocksdb,
904            batch,
905            self.opts.writeopts(),
906            &self.db_metrics,
907            &self.write_sample_interval,
908        )
909    }
910
911    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
912        let from_buf = be_fix_int_ser(start)?;
913        let to_buf = be_fix_int_ser(end)?;
914        self.rocksdb
915            .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
916        Ok(())
917    }
918
919    pub fn compact_range_raw(
920        &self,
921        cf_name: &str,
922        start: Vec<u8>,
923        end: Vec<u8>,
924    ) -> Result<(), TypedStoreError> {
925        let cf = self
926            .rocksdb
927            .cf_handle(cf_name)
928            .expect("compact range: column family does not exist");
929        self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
930        Ok(())
931    }
932
933    pub fn compact_range_to_bottom<J: Serialize>(
934        &self,
935        start: &J,
936        end: &J,
937    ) -> Result<(), TypedStoreError> {
938        let from_buf = be_fix_int_ser(start)?;
939        let to_buf = be_fix_int_ser(end)?;
940        self.rocksdb
941            .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
942        Ok(())
943    }
944
945    pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
946        self.rocksdb
947            .cf_handle(&self.cf)
948            .expect("Map-keying column family should have been checked at DB creation")
949    }
950
951    pub fn iterator_cf(&self) -> RocksDBIter<'_> {
952        self.rocksdb
953            .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
954    }
955
956    pub fn flush(&self) -> Result<(), TypedStoreError> {
957        self.rocksdb
958            .flush_cf(&self.cf())
959            .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
960    }
961
962    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
963        self.rocksdb.set_options_cf(&self.cf(), opts)
964    }
965
966    fn get_int_property(
967        rocksdb: &RocksDB,
968        cf: &impl AsColumnFamilyRef,
969        property_name: &std::ffi::CStr,
970    ) -> Result<i64, TypedStoreError> {
971        match rocksdb.property_int_value_cf(cf, property_name) {
972            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
973            Ok(None) => Ok(0),
974            Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
975        }
976    }
977
978    /// Returns a vector of raw values corresponding to the keys provided.
979    fn multi_get_pinned<J>(
980        &self,
981        keys: impl IntoIterator<Item = J>,
982    ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
983    where
984        J: Borrow<K>,
985        K: Serialize,
986    {
987        let _timer = self
988            .db_metrics
989            .op_metrics
990            .rocksdb_multiget_latency_seconds
991            .with_label_values(&[&self.cf])
992            .start_timer();
993        let perf_ctx = if self.multiget_sample_interval.sample() {
994            Some(RocksDBPerfContext)
995        } else {
996            None
997        };
998        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
999            .into_iter()
1000            .map(|k| be_fix_int_ser(k.borrow()))
1001            .collect();
1002        let results: Result<Vec<_>, TypedStoreError> = self
1003            .rocksdb
1004            .batched_multi_get_cf_opt(
1005                &self.cf(),
1006                keys_bytes?,
1007                // sorted_keys=
1008                false,
1009                &self.opts.readopts(),
1010            )
1011            .into_iter()
1012            .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
1013            .collect();
1014        let entries = results?;
1015        let entry_size = entries
1016            .iter()
1017            .flatten()
1018            .map(|entry| entry.len())
1019            .sum::<usize>();
1020        self.db_metrics
1021            .op_metrics
1022            .rocksdb_multiget_bytes
1023            .with_label_values(&[&self.cf])
1024            .observe(entry_size as f64);
1025        if perf_ctx.is_some() {
1026            self.db_metrics
1027                .read_perf_ctx_metrics
1028                .report_metrics(&self.cf);
1029        }
1030        Ok(entries)
1031    }
1032
1033    fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
1034        let Some(cf) = rocksdb.cf_handle(cf_name) else {
1035            tracing::warn!(
1036                "unable to report metrics for cf {cf_name:?} in db {:?}",
1037                rocksdb.db_name()
1038            );
1039            return;
1040        };
1041
1042        db_metrics
1043            .cf_metrics
1044            .rocksdb_total_sst_files_size
1045            .with_label_values(&[cf_name])
1046            .set(
1047                Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1048                    .unwrap_or(METRICS_ERROR),
1049            );
1050        db_metrics
1051            .cf_metrics
1052            .rocksdb_total_blob_files_size
1053            .with_label_values(&[cf_name])
1054            .set(
1055                Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1056                    .unwrap_or(METRICS_ERROR),
1057            );
1058        // 7 is the default number of levels in RocksDB. If we ever change the number of
1059        // levels using `set_num_levels`, we need to update here as well. Note
1060        // that there isn't an API to query the DB to get the number of levels (yet).
1061        let total_num_files: i64 = (0..=6)
1062            .map(|level| {
1063                Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
1064                    .unwrap_or(METRICS_ERROR)
1065            })
1066            .sum();
1067        db_metrics
1068            .cf_metrics
1069            .rocksdb_total_num_files
1070            .with_label_values(&[cf_name])
1071            .set(total_num_files);
1072        db_metrics
1073            .cf_metrics
1074            .rocksdb_num_level0_files
1075            .with_label_values(&[cf_name])
1076            .set(
1077                Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
1078                    .unwrap_or(METRICS_ERROR),
1079            );
1080        db_metrics
1081            .cf_metrics
1082            .rocksdb_current_size_active_mem_tables
1083            .with_label_values(&[cf_name])
1084            .set(
1085                Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1086                    .unwrap_or(METRICS_ERROR),
1087            );
1088        db_metrics
1089            .cf_metrics
1090            .rocksdb_size_all_mem_tables
1091            .with_label_values(&[cf_name])
1092            .set(
1093                Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1094                    .unwrap_or(METRICS_ERROR),
1095            );
1096        db_metrics
1097            .cf_metrics
1098            .rocksdb_num_snapshots
1099            .with_label_values(&[cf_name])
1100            .set(
1101                Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1102                    .unwrap_or(METRICS_ERROR),
1103            );
1104        db_metrics
1105            .cf_metrics
1106            .rocksdb_oldest_snapshot_time
1107            .with_label_values(&[cf_name])
1108            .set(
1109                Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1110                    .unwrap_or(METRICS_ERROR),
1111            );
1112        db_metrics
1113            .cf_metrics
1114            .rocksdb_actual_delayed_write_rate
1115            .with_label_values(&[cf_name])
1116            .set(
1117                Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1118                    .unwrap_or(METRICS_ERROR),
1119            );
1120        db_metrics
1121            .cf_metrics
1122            .rocksdb_is_write_stopped
1123            .with_label_values(&[cf_name])
1124            .set(
1125                Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1126                    .unwrap_or(METRICS_ERROR),
1127            );
1128        db_metrics
1129            .cf_metrics
1130            .rocksdb_block_cache_capacity
1131            .with_label_values(&[cf_name])
1132            .set(
1133                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1134                    .unwrap_or(METRICS_ERROR),
1135            );
1136        db_metrics
1137            .cf_metrics
1138            .rocksdb_block_cache_usage
1139            .with_label_values(&[cf_name])
1140            .set(
1141                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1142                    .unwrap_or(METRICS_ERROR),
1143            );
1144        db_metrics
1145            .cf_metrics
1146            .rocksdb_block_cache_pinned_usage
1147            .with_label_values(&[cf_name])
1148            .set(
1149                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1150                    .unwrap_or(METRICS_ERROR),
1151            );
1152        db_metrics
1153            .cf_metrics
1154            .rocksdb_estimate_table_readers_mem
1155            .with_label_values(&[cf_name])
1156            .set(
1157                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1158                    .unwrap_or(METRICS_ERROR),
1159            );
1160        db_metrics
1161            .cf_metrics
1162            .rocksdb_estimated_num_keys
1163            .with_label_values(&[cf_name])
1164            .set(
1165                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1166                    .unwrap_or(METRICS_ERROR),
1167            );
1168        db_metrics
1169            .cf_metrics
1170            .rocksdb_num_immutable_mem_tables
1171            .with_label_values(&[cf_name])
1172            .set(
1173                Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1174                    .unwrap_or(METRICS_ERROR),
1175            );
1176        db_metrics
1177            .cf_metrics
1178            .rocksdb_mem_table_flush_pending
1179            .with_label_values(&[cf_name])
1180            .set(
1181                Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1182                    .unwrap_or(METRICS_ERROR),
1183            );
1184        db_metrics
1185            .cf_metrics
1186            .rocksdb_compaction_pending
1187            .with_label_values(&[cf_name])
1188            .set(
1189                Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1190                    .unwrap_or(METRICS_ERROR),
1191            );
1192        db_metrics
1193            .cf_metrics
1194            .rocksdb_estimate_pending_compaction_bytes
1195            .with_label_values(&[cf_name])
1196            .set(
1197                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1198                    .unwrap_or(METRICS_ERROR),
1199            );
1200        db_metrics
1201            .cf_metrics
1202            .rocksdb_num_running_compactions
1203            .with_label_values(&[cf_name])
1204            .set(
1205                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1206                    .unwrap_or(METRICS_ERROR),
1207            );
1208        db_metrics
1209            .cf_metrics
1210            .rocksdb_num_running_flushes
1211            .with_label_values(&[cf_name])
1212            .set(
1213                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1214                    .unwrap_or(METRICS_ERROR),
1215            );
1216        db_metrics
1217            .cf_metrics
1218            .rocksdb_estimate_oldest_key_time
1219            .with_label_values(&[cf_name])
1220            .set(
1221                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1222                    .unwrap_or(METRICS_ERROR),
1223            );
1224        db_metrics
1225            .cf_metrics
1226            .rocksdb_background_errors
1227            .with_label_values(&[cf_name])
1228            .set(
1229                Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1230                    .unwrap_or(METRICS_ERROR),
1231            );
1232        db_metrics
1233            .cf_metrics
1234            .rocksdb_base_level
1235            .with_label_values(&[cf_name])
1236            .set(
1237                Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1238                    .unwrap_or(METRICS_ERROR),
1239            );
1240    }
1241
1242    pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1243        DBTransaction::new(&self.rocksdb)
1244    }
1245
1246    pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1247        DBTransaction::new_without_snapshot(&self.rocksdb)
1248    }
1249
1250    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1251        self.rocksdb.checkpoint(path)
1252    }
1253
1254    pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1255        Ok(self.rocksdb.snapshot())
1256    }
1257
1258    pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1259        let mut num_keys = 0;
1260        let mut key_bytes_total = 0;
1261        let mut value_bytes_total = 0;
1262        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1263        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1264        let iter = self.iterator_cf().map(Result::unwrap);
1265        for (key, value) in iter {
1266            num_keys += 1;
1267            key_bytes_total += key.len();
1268            value_bytes_total += value.len();
1269            key_hist.record(key.len() as u64)?;
1270            value_hist.record(value.len() as u64)?;
1271        }
1272        Ok(TableSummary {
1273            num_keys,
1274            key_bytes_total,
1275            value_bytes_total,
1276            key_hist,
1277            value_hist,
1278        })
1279    }
1280
1281    // Creates metrics and context for tracking an iterator usage and performance.
1282    fn create_iter_context(
1283        &self,
1284    ) -> (
1285        Option<HistogramTimer>,
1286        Option<Histogram>,
1287        Option<Histogram>,
1288        Option<RocksDBPerfContext>,
1289    ) {
1290        let timer = self
1291            .db_metrics
1292            .op_metrics
1293            .rocksdb_iter_latency_seconds
1294            .with_label_values(&[&self.cf])
1295            .start_timer();
1296        let bytes_scanned = self
1297            .db_metrics
1298            .op_metrics
1299            .rocksdb_iter_bytes
1300            .with_label_values(&[&self.cf]);
1301        let keys_scanned = self
1302            .db_metrics
1303            .op_metrics
1304            .rocksdb_iter_keys
1305            .with_label_values(&[&self.cf]);
1306        let perf_ctx = if self.iter_sample_interval.sample() {
1307            Some(RocksDBPerfContext)
1308        } else {
1309            None
1310        };
1311        (
1312            Some(timer),
1313            Some(bytes_scanned),
1314            Some(keys_scanned),
1315            perf_ctx,
1316        )
1317    }
1318
1319    // Creates a RocksDB read option with specified lower and upper bounds.
1320    /// Lower bound is inclusive, and upper bound is exclusive.
1321    fn create_read_options_with_bounds(
1322        &self,
1323        lower_bound: Option<K>,
1324        upper_bound: Option<K>,
1325    ) -> ReadOptions
1326    where
1327        K: Serialize,
1328    {
1329        let mut readopts = self.opts.readopts();
1330        if let Some(lower_bound) = lower_bound {
1331            let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1332            readopts.set_iterate_lower_bound(key_buf);
1333        }
1334        if let Some(upper_bound) = upper_bound {
1335            let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1336            readopts.set_iterate_upper_bound(key_buf);
1337        }
1338        readopts
1339    }
1340
1341    // Creates a RocksDB read option with lower and upper bounds set corresponding
1342    // to `range`.
1343    fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1344    where
1345        K: Serialize,
1346    {
1347        let mut readopts = self.opts.readopts();
1348
1349        let lower_bound = range.start_bound();
1350        let upper_bound = range.end_bound();
1351
1352        match lower_bound {
1353            Bound::Included(lower_bound) => {
1354                // Rocksdb lower bound is inclusive by default so nothing to do
1355                let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1356                readopts.set_iterate_lower_bound(key_buf);
1357            }
1358            Bound::Excluded(lower_bound) => {
1359                let mut key_buf =
1360                    be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1361
1362                // Since we want exclusive, we need to increment the key to exclude the previous
1363                big_endian_saturating_add_one(&mut key_buf);
1364                readopts.set_iterate_lower_bound(key_buf);
1365            }
1366            Bound::Unbounded => (),
1367        };
1368
1369        match upper_bound {
1370            Bound::Included(upper_bound) => {
1371                let mut key_buf =
1372                    be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1373
1374                // If the key is already at the limit, there's nowhere else to go, so no upper
1375                // bound
1376                if !is_max(&key_buf) {
1377                    // Since we want exclusive, we need to increment the key to get the upper bound
1378                    big_endian_saturating_add_one(&mut key_buf);
1379                    readopts.set_iterate_upper_bound(key_buf);
1380                }
1381            }
1382            Bound::Excluded(upper_bound) => {
1383                // Rocksdb upper bound is inclusive by default so nothing to do
1384                let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1385                readopts.set_iterate_upper_bound(key_buf);
1386            }
1387            Bound::Unbounded => (),
1388        };
1389
1390        readopts
1391    }
1392}
1393
1394/// Provides a mutable struct to form a collection of database write operations,
1395/// and execute them.
1396///
1397/// Batching write and delete operations is faster than performing them one by
1398/// one and ensures their atomicity,  ie. they are all written or none is.
1399/// This is also true of operations across column families in the same database.
1400///
1401/// Serializations / Deserialization, and naming of column families is performed
1402/// by passing a DBMap<K,V> with each operation.
1403///
1404/// ```
1405/// use core::fmt::Error;
1406/// use std::sync::Arc;
1407///
1408/// use prometheus::Registry;
1409/// use tempfile::tempdir;
1410/// use typed_store::{Map, metrics::DBMetrics, rocks::*};
1411///
1412/// #[tokio::main]
1413/// async fn main() -> Result<(), Error> {
1414///     let rocks = open_cf(
1415///         tempfile::tempdir().unwrap(),
1416///         None,
1417///         MetricConf::default(),
1418///         &["First_CF", "Second_CF"],
1419///     )
1420///     .unwrap();
1421///
1422///     let db_cf_1 = DBMap::reopen(
1423///         &rocks,
1424///         Some("First_CF"),
1425///         &ReadWriteOptions::default(),
1426///         false,
1427///     )
1428///     .expect("Failed to open storage");
1429///     let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1430///
1431///     let db_cf_2 = DBMap::reopen(
1432///         &rocks,
1433///         Some("Second_CF"),
1434///         &ReadWriteOptions::default(),
1435///         false,
1436///     )
1437///     .expect("Failed to open storage");
1438///     let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1439///
1440///     let mut batch = db_cf_1.batch();
1441///     batch
1442///         .insert_batch(&db_cf_1, keys_vals_1.clone())
1443///         .expect("Failed to batch insert")
1444///         .insert_batch(&db_cf_2, keys_vals_2.clone())
1445///         .expect("Failed to batch insert");
1446///
1447///     let _ = batch.write().expect("Failed to execute batch");
1448///     for (k, v) in keys_vals_1 {
1449///         let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1450///         assert_eq!(Some(v), val);
1451///     }
1452///
1453///     for (k, v) in keys_vals_2 {
1454///         let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1455///         assert_eq!(Some(v), val);
1456///     }
1457///     Ok(())
1458/// }
1459/// ```
1460pub struct DBBatch {
1461    rocksdb: Arc<RocksDB>,
1462    batch: RocksDBBatch,
1463    opts: WriteOptions,
1464    db_metrics: Arc<DBMetrics>,
1465    write_sample_interval: SamplingInterval,
1466}
1467
1468impl DBBatch {
1469    /// Create a new batch associated with a DB reference.
1470    ///
1471    /// Use `open_cf` to get the DB reference or an existing open database.
1472    pub fn new(
1473        dbref: &Arc<RocksDB>,
1474        batch: RocksDBBatch,
1475        opts: WriteOptions,
1476        db_metrics: &Arc<DBMetrics>,
1477        write_sample_interval: &SamplingInterval,
1478    ) -> Self {
1479        DBBatch {
1480            rocksdb: dbref.clone(),
1481            batch,
1482            opts,
1483            db_metrics: db_metrics.clone(),
1484            write_sample_interval: write_sample_interval.clone(),
1485        }
1486    }
1487
1488    /// Consume the batch and write its operations to the database
1489    #[instrument(level = "trace", skip_all, err)]
1490    pub fn write(self) -> Result<(), TypedStoreError> {
1491        let db_name = self.rocksdb.db_name();
1492        let timer = self
1493            .db_metrics
1494            .op_metrics
1495            .rocksdb_batch_commit_latency_seconds
1496            .with_label_values(&[&db_name])
1497            .start_timer();
1498        let batch_size = self.batch.size_in_bytes();
1499
1500        let perf_ctx = if self.write_sample_interval.sample() {
1501            Some(RocksDBPerfContext)
1502        } else {
1503            None
1504        };
1505        self.rocksdb.write(self.batch, &self.opts)?;
1506        self.db_metrics
1507            .op_metrics
1508            .rocksdb_batch_commit_bytes
1509            .with_label_values(&[&db_name])
1510            .observe(batch_size as f64);
1511
1512        if perf_ctx.is_some() {
1513            self.db_metrics
1514                .write_perf_ctx_metrics
1515                .report_metrics(&db_name);
1516        }
1517        let elapsed = timer.stop_and_record();
1518        if elapsed > 1.0 {
1519            warn!(?elapsed, ?db_name, "very slow batch write");
1520            self.db_metrics
1521                .op_metrics
1522                .rocksdb_very_slow_batch_writes_count
1523                .with_label_values(&[&db_name])
1524                .inc();
1525            self.db_metrics
1526                .op_metrics
1527                .rocksdb_very_slow_batch_writes_duration_ms
1528                .with_label_values(&[&db_name])
1529                .inc_by((elapsed * 1000.0) as u64);
1530        }
1531        Ok(())
1532    }
1533
1534    pub fn size_in_bytes(&self) -> usize {
1535        self.batch.size_in_bytes()
1536    }
1537}
1538
1539impl DBBatch {
1540    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1541        &mut self,
1542        db: &DBMap<K, V>,
1543        purged_vals: impl IntoIterator<Item = J>,
1544    ) -> Result<(), TypedStoreError> {
1545        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1546            return Err(TypedStoreError::CrossDBBatch);
1547        }
1548
1549        purged_vals
1550            .into_iter()
1551            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1552                let k_buf = be_fix_int_ser(k.borrow())?;
1553                self.batch.delete_cf(&db.cf(), k_buf);
1554
1555                Ok(())
1556            })?;
1557        Ok(())
1558    }
1559
1560    /// Deletes a range of keys between `from` (inclusive) and `to`
1561    /// (non-inclusive) by writing a range delete tombstone in the db map
1562    /// If the DBMap is configured with ignore_range_deletions set to false,
1563    /// the effect of this write will be visible immediately i.e. you won't
1564    /// see old values when you do a lookup or scan. But if it is configured
1565    /// with ignore_range_deletions set to true, the old value are visible until
1566    /// compaction actually deletes them which will happen sometime after. By
1567    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1568    /// overridden in the config), so please use this function with caution
1569    pub fn schedule_delete_range<K: Serialize, V>(
1570        &mut self,
1571        db: &DBMap<K, V>,
1572        from: &K,
1573        to: &K,
1574    ) -> Result<(), TypedStoreError> {
1575        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1576            return Err(TypedStoreError::CrossDBBatch);
1577        }
1578
1579        let from_buf = be_fix_int_ser(from)?;
1580        let to_buf = be_fix_int_ser(to)?;
1581
1582        self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1583        Ok(())
1584    }
1585
1586    /// inserts a range of (key, value) pairs given as an iterator
1587    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1588        &mut self,
1589        db: &DBMap<K, V>,
1590        new_vals: impl IntoIterator<Item = (J, U)>,
1591    ) -> Result<&mut Self, TypedStoreError> {
1592        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1593            return Err(TypedStoreError::CrossDBBatch);
1594        }
1595        let mut total = 0usize;
1596        new_vals
1597            .into_iter()
1598            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1599                let k_buf = be_fix_int_ser(k.borrow())?;
1600                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1601                total += k_buf.len() + v_buf.len();
1602                self.batch.put_cf(&db.cf(), k_buf, v_buf);
1603                Ok(())
1604            })?;
1605        self.db_metrics
1606            .op_metrics
1607            .rocksdb_batch_put_bytes
1608            .with_label_values(&[&db.cf])
1609            .observe(total as f64);
1610        Ok(self)
1611    }
1612
1613    /// merges a range of (key, value) pairs given as an iterator
1614    pub fn merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1615        &mut self,
1616        db: &DBMap<K, V>,
1617        new_vals: impl IntoIterator<Item = (J, U)>,
1618    ) -> Result<&mut Self, TypedStoreError> {
1619        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1620            return Err(TypedStoreError::CrossDBBatch);
1621        }
1622
1623        new_vals
1624            .into_iter()
1625            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1626                let k_buf = be_fix_int_ser(k.borrow())?;
1627                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1628                self.batch.merge_cf(&db.cf(), k_buf, v_buf);
1629                Ok(())
1630            })?;
1631        Ok(self)
1632    }
1633
1634    /// similar to `merge_batch` but allows merge with partial values
1635    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, V: Serialize, B: AsRef<[u8]>>(
1636        &mut self,
1637        db: &DBMap<K, V>,
1638        new_vals: impl IntoIterator<Item = (J, B)>,
1639    ) -> Result<&mut Self, TypedStoreError> {
1640        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1641            return Err(TypedStoreError::CrossDBBatch);
1642        }
1643        new_vals
1644            .into_iter()
1645            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1646                let k_buf = be_fix_int_ser(k.borrow())?;
1647                self.batch.merge_cf(&db.cf(), k_buf, v);
1648                Ok(())
1649            })?;
1650        Ok(self)
1651    }
1652}
1653
1654pub struct DBTransaction<'a> {
1655    rocksdb: Arc<RocksDB>,
1656    transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1657}
1658
1659impl<'a> DBTransaction<'a> {
1660    pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1661        Ok(Self {
1662            rocksdb: db.clone(),
1663            transaction: db.transaction()?,
1664        })
1665    }
1666
1667    pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1668        Ok(Self {
1669            rocksdb: db.clone(),
1670            transaction: db.transaction_without_snapshot()?,
1671        })
1672    }
1673
1674    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1675        &mut self,
1676        db: &DBMap<K, V>,
1677        new_vals: impl IntoIterator<Item = (J, U)>,
1678    ) -> Result<&mut Self, TypedStoreError> {
1679        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1680            return Err(TypedStoreError::CrossDBBatch);
1681        }
1682
1683        new_vals
1684            .into_iter()
1685            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1686                let k_buf = be_fix_int_ser(k.borrow())?;
1687                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1688                self.transaction
1689                    .put_cf(&db.cf(), k_buf, v_buf)
1690                    .map_err(typed_store_err_from_rocks_err)?;
1691                Ok(())
1692            })?;
1693        Ok(self)
1694    }
1695
1696    /// Deletes a set of keys given as an iterator
1697    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1698        &mut self,
1699        db: &DBMap<K, V>,
1700        purged_vals: impl IntoIterator<Item = J>,
1701    ) -> Result<&mut Self, TypedStoreError> {
1702        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1703            return Err(TypedStoreError::CrossDBBatch);
1704        }
1705        purged_vals
1706            .into_iter()
1707            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1708                let k_buf = be_fix_int_ser(k.borrow())?;
1709                self.transaction
1710                    .delete_cf(&db.cf(), k_buf)
1711                    .map_err(typed_store_err_from_rocks_err)?;
1712                Ok(())
1713            })?;
1714        Ok(self)
1715    }
1716
1717    pub fn snapshot(
1718        &self,
1719    ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1720    {
1721        self.transaction.snapshot()
1722    }
1723
1724    pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1725        &self,
1726        db: &DBMap<K, V>,
1727        key: &K,
1728    ) -> Result<Option<V>, TypedStoreError> {
1729        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1730            return Err(TypedStoreError::CrossDBBatch);
1731        }
1732        let k_buf = be_fix_int_ser(key)?;
1733        match self
1734            .transaction
1735            .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1736            .map_err(typed_store_err_from_rocks_err)?
1737        {
1738            Some(data) => Ok(Some(
1739                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1740            )),
1741            None => Ok(None),
1742        }
1743    }
1744
1745    pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1746        &self,
1747        db: &DBMap<K, V>,
1748        key: &K,
1749    ) -> Result<Option<V>, TypedStoreError> {
1750        let key_buf = be_fix_int_ser(key)?;
1751        self.transaction
1752            .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1753            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1754            .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1755    }
1756
1757    pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1758        &self,
1759        db: &DBMap<K, V>,
1760        keys: impl IntoIterator<Item = J>,
1761    ) -> Result<Vec<Option<V>>, TypedStoreError> {
1762        let cf = db.cf();
1763        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1764            .into_iter()
1765            .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1766            .collect();
1767
1768        let results = self
1769            .transaction
1770            .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1771
1772        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1773            .into_iter()
1774            .map(
1775                |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1776                    Some(data) => Ok(Some(
1777                        bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1778                    )),
1779                    None => Ok(None),
1780                },
1781            )
1782            .collect();
1783
1784        values_parsed
1785    }
1786
1787    pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1788        &'a self,
1789        db: &DBMap<K, V>,
1790    ) -> Iter<'a, K, V> {
1791        let db_iter = self
1792            .transaction
1793            .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1794        Iter::new(
1795            db.cf.clone(),
1796            RocksDBRawIter::OptimisticTransaction(db_iter),
1797            None,
1798            None,
1799            None,
1800            None,
1801            None,
1802        )
1803    }
1804
1805    pub fn keys<K: DeserializeOwned, V: DeserializeOwned>(
1806        &'a self,
1807        db: &DBMap<K, V>,
1808    ) -> Keys<'a, K> {
1809        let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1810            self.transaction
1811                .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1812        );
1813        db_iter.seek_to_first();
1814
1815        Keys::new(db_iter)
1816    }
1817
1818    pub fn values<K: DeserializeOwned, V: DeserializeOwned>(
1819        &'a self,
1820        db: &DBMap<K, V>,
1821    ) -> Values<'a, V> {
1822        let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1823            self.transaction
1824                .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1825        );
1826        db_iter.seek_to_first();
1827
1828        Values::new(db_iter)
1829    }
1830
1831    pub fn commit(self) -> Result<(), TypedStoreError> {
1832        fail_point!("transaction-commit");
1833        self.transaction.commit().map_err(|e| match e.kind() {
1834            // empirically, this is what you get when there is a write conflict. it is not
1835            // documented whether this is the only time you can get this error.
1836            ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1837            _ => typed_store_err_from_rocks_err(e),
1838        })?;
1839        Ok(())
1840    }
1841}
1842
1843macro_rules! delegate_iter_call {
1844    ($self:ident.$method:ident($($args:ident),*)) => {
1845        match $self {
1846            Self::DB(db) => db.$method($($args),*),
1847            Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1848            Self::OptimisticTransaction(db) => db.$method($($args),*),
1849        }
1850    }
1851}
1852
1853pub enum RocksDBRawIter<'a> {
1854    DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1855    OptimisticTransactionDB(
1856        rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1857    ),
1858    OptimisticTransaction(
1859        rocksdb::DBRawIteratorWithThreadMode<
1860            'a,
1861            Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1862        >,
1863    ),
1864}
1865
1866impl RocksDBRawIter<'_> {
1867    pub fn valid(&self) -> bool {
1868        delegate_iter_call!(self.valid())
1869    }
1870    pub fn key(&self) -> Option<&[u8]> {
1871        delegate_iter_call!(self.key())
1872    }
1873    pub fn value(&self) -> Option<&[u8]> {
1874        delegate_iter_call!(self.value())
1875    }
1876    pub fn next(&mut self) {
1877        delegate_iter_call!(self.next())
1878    }
1879    pub fn prev(&mut self) {
1880        delegate_iter_call!(self.prev())
1881    }
1882    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1883        delegate_iter_call!(self.seek(key))
1884    }
1885    pub fn seek_to_last(&mut self) {
1886        delegate_iter_call!(self.seek_to_last())
1887    }
1888    pub fn seek_to_first(&mut self) {
1889        delegate_iter_call!(self.seek_to_first())
1890    }
1891    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1892        delegate_iter_call!(self.seek_for_prev(key))
1893    }
1894    pub fn status(&self) -> Result<(), rocksdb::Error> {
1895        delegate_iter_call!(self.status())
1896    }
1897}
1898
1899pub enum RocksDBIter<'a> {
1900    DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1901    OptimisticTransactionDB(
1902        rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1903    ),
1904}
1905
1906impl Iterator for RocksDBIter<'_> {
1907    type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1908    fn next(&mut self) -> Option<Self::Item> {
1909        match self {
1910            Self::DB(db) => db.next(),
1911            Self::OptimisticTransactionDB(db) => db.next(),
1912        }
1913    }
1914}
1915
1916impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1917where
1918    K: Serialize + DeserializeOwned,
1919    V: Serialize + DeserializeOwned,
1920{
1921    type Error = TypedStoreError;
1922    type Iterator = Iter<'a, K, V>;
1923    type SafeIterator = SafeIter<'a, K, V>;
1924    type Keys = Keys<'a, K>;
1925    type Values = Values<'a, V>;
1926
1927    #[instrument(level = "trace", skip_all, err)]
1928    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1929        let key_buf = be_fix_int_ser(key)?;
1930        // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
1931        // but no false negatives. We use it to short-circuit the absent case
1932        let readopts = self.opts.readopts();
1933        Ok(self
1934            .rocksdb
1935            .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1936            && self
1937                .rocksdb
1938                .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1939                .map_err(typed_store_err_from_rocks_err)?
1940                .is_some())
1941    }
1942
1943    #[instrument(level = "trace", skip_all, err)]
1944    fn multi_contains_keys<J>(
1945        &self,
1946        keys: impl IntoIterator<Item = J>,
1947    ) -> Result<Vec<bool>, Self::Error>
1948    where
1949        J: Borrow<K>,
1950    {
1951        let values = self.multi_get_pinned(keys)?;
1952        Ok(values.into_iter().map(|v| v.is_some()).collect())
1953    }
1954
1955    #[instrument(level = "trace", skip_all, err)]
1956    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1957        let _timer = self
1958            .db_metrics
1959            .op_metrics
1960            .rocksdb_get_latency_seconds
1961            .with_label_values(&[&self.cf])
1962            .start_timer();
1963        let perf_ctx = if self.get_sample_interval.sample() {
1964            Some(RocksDBPerfContext)
1965        } else {
1966            None
1967        };
1968        let key_buf = be_fix_int_ser(key)?;
1969        let res = self
1970            .rocksdb
1971            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1972            .map_err(typed_store_err_from_rocks_err)?;
1973        self.db_metrics
1974            .op_metrics
1975            .rocksdb_get_bytes
1976            .with_label_values(&[&self.cf])
1977            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1978        if perf_ctx.is_some() {
1979            self.db_metrics
1980                .read_perf_ctx_metrics
1981                .report_metrics(&self.cf);
1982        }
1983        match res {
1984            Some(data) => Ok(Some(
1985                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1986            )),
1987            None => Ok(None),
1988        }
1989    }
1990
1991    #[instrument(level = "trace", skip_all, err)]
1992    fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
1993        let _timer = self
1994            .db_metrics
1995            .op_metrics
1996            .rocksdb_get_latency_seconds
1997            .with_label_values(&[&self.cf])
1998            .start_timer();
1999        let perf_ctx = if self.get_sample_interval.sample() {
2000            Some(RocksDBPerfContext)
2001        } else {
2002            None
2003        };
2004        let key_buf = be_fix_int_ser(key)?;
2005        let res = self
2006            .rocksdb
2007            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
2008            .map_err(typed_store_err_from_rocks_err)?;
2009        self.db_metrics
2010            .op_metrics
2011            .rocksdb_get_bytes
2012            .with_label_values(&[&self.cf])
2013            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
2014        if perf_ctx.is_some() {
2015            self.db_metrics
2016                .read_perf_ctx_metrics
2017                .report_metrics(&self.cf);
2018        }
2019        match res {
2020            Some(data) => Ok(Some(data.to_vec())),
2021            None => Ok(None),
2022        }
2023    }
2024
2025    #[instrument(level = "trace", skip_all, err)]
2026    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
2027        let timer = self
2028            .db_metrics
2029            .op_metrics
2030            .rocksdb_put_latency_seconds
2031            .with_label_values(&[&self.cf])
2032            .start_timer();
2033        let perf_ctx = if self.write_sample_interval.sample() {
2034            Some(RocksDBPerfContext)
2035        } else {
2036            None
2037        };
2038        let key_buf = be_fix_int_ser(key)?;
2039        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
2040        self.db_metrics
2041            .op_metrics
2042            .rocksdb_put_bytes
2043            .with_label_values(&[&self.cf])
2044            .observe((key_buf.len() + value_buf.len()) as f64);
2045        if perf_ctx.is_some() {
2046            self.db_metrics
2047                .write_perf_ctx_metrics
2048                .report_metrics(&self.cf);
2049        }
2050        self.rocksdb
2051            .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
2052            .map_err(typed_store_err_from_rocks_err)?;
2053
2054        let elapsed = timer.stop_and_record();
2055        if elapsed > 1.0 {
2056            warn!(?elapsed, cf = ?self.cf, "very slow insert");
2057            self.db_metrics
2058                .op_metrics
2059                .rocksdb_very_slow_puts_count
2060                .with_label_values(&[&self.cf])
2061                .inc();
2062            self.db_metrics
2063                .op_metrics
2064                .rocksdb_very_slow_puts_duration_ms
2065                .with_label_values(&[&self.cf])
2066                .inc_by((elapsed * 1000.0) as u64);
2067        }
2068
2069        Ok(())
2070    }
2071
2072    #[instrument(level = "trace", skip_all, err)]
2073    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
2074        let _timer = self
2075            .db_metrics
2076            .op_metrics
2077            .rocksdb_delete_latency_seconds
2078            .with_label_values(&[&self.cf])
2079            .start_timer();
2080        let perf_ctx = if self.write_sample_interval.sample() {
2081            Some(RocksDBPerfContext)
2082        } else {
2083            None
2084        };
2085        let key_buf = be_fix_int_ser(key)?;
2086        self.rocksdb
2087            .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2088            .map_err(typed_store_err_from_rocks_err)?;
2089        self.db_metrics
2090            .op_metrics
2091            .rocksdb_deletes
2092            .with_label_values(&[&self.cf])
2093            .inc();
2094        if perf_ctx.is_some() {
2095            self.db_metrics
2096                .write_perf_ctx_metrics
2097                .report_metrics(&self.cf);
2098        }
2099        Ok(())
2100    }
2101
2102    /// Deletes a range of keys between `from` (inclusive) and `to`
2103    /// (non-inclusive) by immediately deleting any sst files whose key
2104    /// range overlaps with the range. Files whose range only partially
2105    /// overlaps with the range are not deleted. This can be useful for
2106    /// quickly removing a large amount of data without having
2107    /// to delete individual keys. Only files at level 1 or higher are
2108    /// considered ( Level 0 files are skipped). It doesn't guarantee that
2109    /// all keys in the range are deleted, as there might be keys in files
2110    /// that weren't entirely within the range.
2111    #[instrument(level = "trace", skip_all, err)]
2112    fn delete_file_in_range(&self, from: &K, to: &K) -> Result<(), TypedStoreError> {
2113        let from_buf = be_fix_int_ser(from.borrow())?;
2114        let to_buf = be_fix_int_ser(to.borrow())?;
2115        self.rocksdb
2116            .delete_file_in_range(&self.cf(), from_buf, to_buf)
2117            .map_err(typed_store_err_from_rocks_err)?;
2118        Ok(())
2119    }
2120
2121    /// This method first drops the existing column family and then creates a
2122    /// new one with the same name. The two operations are not atomic and
2123    /// hence it is possible to get into a race condition where the column
2124    /// family has been dropped but new one is not created yet
2125    #[instrument(level = "trace", skip_all, err)]
2126    fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2127        let _ = self.rocksdb.drop_cf(&self.cf);
2128        self.rocksdb
2129            .create_cf(self.cf.clone(), &default_db_options().options)
2130            .map_err(typed_store_err_from_rocks_err)?;
2131        Ok(())
2132    }
2133
2134    /// Writes a range delete tombstone to delete all entries in the db map
2135    /// If the DBMap is configured with ignore_range_deletions set to false,
2136    /// the effect of this write will be visible immediately i.e. you won't
2137    /// see old values when you do a lookup or scan. But if it is configured
2138    /// with ignore_range_deletions set to true, the old value are visible until
2139    /// compaction actually deletes them which will happen sometime after. By
2140    /// default ignore_range_deletions is set to true on a DBMap (unless it is
2141    /// overridden in the config), so please use this function with caution
2142    #[instrument(level = "trace", skip_all, err)]
2143    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2144        let mut iter = self.unbounded_iter().seek_to_first();
2145        let first_key = iter.next().map(|(k, _v)| k);
2146        let last_key = iter.skip_to_last().next().map(|(k, _v)| k);
2147        if let Some((first_key, last_key)) = first_key.zip(last_key) {
2148            let mut batch = self.batch();
2149            batch.schedule_delete_range(self, &first_key, &last_key)?;
2150            batch.write()?;
2151        }
2152        Ok(())
2153    }
2154
2155    fn is_empty(&self) -> bool {
2156        self.safe_iter().next().is_none()
2157    }
2158
2159    /// Returns an unbounded iterator visiting each key-value pair in the map.
2160    /// This is potentially unsafe as it can perform a full table scan
2161    fn unbounded_iter(&'a self) -> Self::Iterator {
2162        let db_iter = self
2163            .rocksdb
2164            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2165        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2166        Iter::new(
2167            self.cf.clone(),
2168            db_iter,
2169            _timer,
2170            _perf_ctx,
2171            bytes_scanned,
2172            keys_scanned,
2173            Some(self.db_metrics.clone()),
2174        )
2175    }
2176
2177    /// Returns an iterator visiting each key-value pair in the map. By proving
2178    /// bounds of the scan range, RocksDB scan avoid unnecessary scans.
2179    /// Lower bound is inclusive, while upper bound is exclusive.
2180    fn iter_with_bounds(
2181        &'a self,
2182        lower_bound: Option<K>,
2183        upper_bound: Option<K>,
2184    ) -> Self::Iterator {
2185        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2186        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2187        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2188        Iter::new(
2189            self.cf.clone(),
2190            db_iter,
2191            _timer,
2192            _perf_ctx,
2193            bytes_scanned,
2194            keys_scanned,
2195            Some(self.db_metrics.clone()),
2196        )
2197    }
2198
2199    /// Similar to `iter_with_bounds` but allows specifying
2200    /// inclusivity/exclusivity of ranges explicitly. TODO: find better name
2201    fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2202        let readopts = self.create_read_options_with_range(range);
2203        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2204        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2205        Iter::new(
2206            self.cf.clone(),
2207            db_iter,
2208            _timer,
2209            _perf_ctx,
2210            bytes_scanned,
2211            keys_scanned,
2212            Some(self.db_metrics.clone()),
2213        )
2214    }
2215
2216    fn safe_iter(&'a self) -> Self::SafeIterator {
2217        let db_iter = self
2218            .rocksdb
2219            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2220        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2221        SafeIter::new(
2222            self.cf.clone(),
2223            db_iter,
2224            _timer,
2225            _perf_ctx,
2226            bytes_scanned,
2227            keys_scanned,
2228            Some(self.db_metrics.clone()),
2229        )
2230    }
2231
2232    fn safe_iter_with_bounds(
2233        &'a self,
2234        lower_bound: Option<K>,
2235        upper_bound: Option<K>,
2236    ) -> Self::SafeIterator {
2237        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2238        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2239        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2240        SafeIter::new(
2241            self.cf.clone(),
2242            db_iter,
2243            _timer,
2244            _perf_ctx,
2245            bytes_scanned,
2246            keys_scanned,
2247            Some(self.db_metrics.clone()),
2248        )
2249    }
2250
2251    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2252        let readopts = self.create_read_options_with_range(range);
2253        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2254        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2255        SafeIter::new(
2256            self.cf.clone(),
2257            db_iter,
2258            _timer,
2259            _perf_ctx,
2260            bytes_scanned,
2261            keys_scanned,
2262            Some(self.db_metrics.clone()),
2263        )
2264    }
2265
2266    fn keys(&'a self) -> Self::Keys {
2267        let mut db_iter = self
2268            .rocksdb
2269            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2270        db_iter.seek_to_first();
2271
2272        Keys::new(db_iter)
2273    }
2274
2275    fn values(&'a self) -> Self::Values {
2276        let mut db_iter = self
2277            .rocksdb
2278            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2279        db_iter.seek_to_first();
2280
2281        Values::new(db_iter)
2282    }
2283
2284    /// Returns a vector of raw values corresponding to the keys provided.
2285    #[instrument(level = "trace", skip_all, err)]
2286    fn multi_get_raw_bytes<J>(
2287        &self,
2288        keys: impl IntoIterator<Item = J>,
2289    ) -> Result<Vec<Option<Vec<u8>>>, TypedStoreError>
2290    where
2291        J: Borrow<K>,
2292    {
2293        let results = self
2294            .multi_get_pinned(keys)?
2295            .into_iter()
2296            .map(|val| val.map(|v| v.to_vec()))
2297            .collect();
2298        Ok(results)
2299    }
2300
2301    /// Returns a vector of values corresponding to the keys provided.
2302    #[instrument(level = "trace", skip_all, err)]
2303    fn multi_get<J>(
2304        &self,
2305        keys: impl IntoIterator<Item = J>,
2306    ) -> Result<Vec<Option<V>>, TypedStoreError>
2307    where
2308        J: Borrow<K>,
2309    {
2310        let results = self.multi_get_pinned(keys)?;
2311        let values_parsed: Result<Vec<_>, TypedStoreError> = results
2312            .into_iter()
2313            .map(|value_byte| match value_byte {
2314                Some(data) => Ok(Some(
2315                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2316                )),
2317                None => Ok(None),
2318            })
2319            .collect();
2320
2321        values_parsed
2322    }
2323
2324    /// Returns a vector of values corresponding to the keys provided.
2325    #[instrument(level = "trace", skip_all, err)]
2326    fn chunked_multi_get<J>(
2327        &self,
2328        keys: impl IntoIterator<Item = J>,
2329        chunk_size: usize,
2330    ) -> Result<Vec<Option<V>>, TypedStoreError>
2331    where
2332        J: Borrow<K>,
2333    {
2334        let cf = self.cf();
2335        let keys_bytes = keys
2336            .into_iter()
2337            .map(|k| (&cf, be_fix_int_ser(k.borrow()).unwrap()));
2338        let chunked_keys = keys_bytes.into_iter().chunks(chunk_size);
2339        let snapshot = self.snapshot()?;
2340        let mut results = vec![];
2341        for chunk in chunked_keys.into_iter() {
2342            let chunk_result = snapshot.multi_get_cf(chunk);
2343            let values_parsed: Result<Vec<_>, TypedStoreError> = chunk_result
2344                .into_iter()
2345                .map(|value_byte| {
2346                    let value_byte = value_byte.map_err(typed_store_err_from_rocks_err)?;
2347                    match value_byte {
2348                        Some(data) => Ok(Some(
2349                            bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2350                        )),
2351                        None => Ok(None),
2352                    }
2353                })
2354                .collect();
2355            results.extend(values_parsed?);
2356        }
2357        Ok(results)
2358    }
2359
2360    /// Convenience method for batch insertion
2361    #[instrument(level = "trace", skip_all, err)]
2362    fn multi_insert<J, U>(
2363        &self,
2364        key_val_pairs: impl IntoIterator<Item = (J, U)>,
2365    ) -> Result<(), Self::Error>
2366    where
2367        J: Borrow<K>,
2368        U: Borrow<V>,
2369    {
2370        let mut batch = self.batch();
2371        batch.insert_batch(self, key_val_pairs)?;
2372        batch.write()
2373    }
2374
2375    /// Convenience method for batch removal
2376    #[instrument(level = "trace", skip_all, err)]
2377    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2378    where
2379        J: Borrow<K>,
2380    {
2381        let mut batch = self.batch();
2382        batch.delete_batch(self, keys)?;
2383        batch.write()
2384    }
2385
2386    /// Try to catch up with primary when running as secondary
2387    #[instrument(level = "trace", skip_all, err)]
2388    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2389        self.rocksdb
2390            .try_catch_up_with_primary()
2391            .map_err(typed_store_err_from_rocks_err)
2392    }
2393}
2394
2395impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2396where
2397    J: Borrow<K>,
2398    U: Borrow<V>,
2399    K: Serialize,
2400    V: Serialize,
2401{
2402    type Error = TypedStoreError;
2403
2404    fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2405    where
2406        T: Iterator<Item = (J, U)>,
2407    {
2408        let mut batch = self.batch();
2409        batch.insert_batch(self, iter)?;
2410        batch.write()
2411    }
2412
2413    fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2414        let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2415        let mut batch = self.batch();
2416        batch.insert_batch(self, slice_of_refs)?;
2417        batch.write()
2418    }
2419}
2420
2421pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2422    env::var(var_name)
2423        .ok()?
2424        .parse::<usize>()
2425        .tap_err(|e| {
2426            warn!(
2427                "Env var {} does not contain valid usize integer: {}",
2428                var_name, e
2429            )
2430        })
2431        .ok()
2432}
2433
2434#[derive(Clone, Debug)]
2435pub struct ReadWriteOptions {
2436    pub ignore_range_deletions: bool,
2437    // Whether to sync to disk on every write.
2438    sync_to_disk: bool,
2439}
2440
2441impl ReadWriteOptions {
2442    pub fn readopts(&self) -> ReadOptions {
2443        let mut readopts = ReadOptions::default();
2444        readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2445        readopts
2446    }
2447
2448    pub fn writeopts(&self) -> WriteOptions {
2449        let mut opts = WriteOptions::default();
2450        opts.set_sync(self.sync_to_disk);
2451        opts
2452    }
2453
2454    pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2455        self.ignore_range_deletions = ignore;
2456        self
2457    }
2458}
2459
2460impl Default for ReadWriteOptions {
2461    fn default() -> Self {
2462        Self {
2463            ignore_range_deletions: true,
2464            sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2465        }
2466    }
2467}
2468// TODO: refactor this into a builder pattern, where rocksdb::Options are
2469// generated after a call to build().
2470#[derive(Default, Clone)]
2471pub struct DBOptions {
2472    pub options: rocksdb::Options,
2473    pub rw_options: ReadWriteOptions,
2474}
2475
2476impl DBOptions {
2477    // Optimize lookup perf for tables where no scans are performed.
2478    // If non-trivial number of values can be > 512B in size, it is beneficial to
2479    // also specify optimize_for_large_values_no_scan().
2480    pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2481        // NOTE: this overwrites the block options.
2482        self.options
2483            .optimize_for_point_lookup(block_cache_size_mb as u64);
2484        self
2485    }
2486
2487    // Optimize write and lookup perf for tables which are rarely scanned, and have
2488    // large values. https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html
2489    pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2490        if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2491            info!("Large value blob storage optimization is disabled via env var.");
2492            return self;
2493        }
2494
2495        // Blob settings.
2496        self.options.set_enable_blob_files(true);
2497        self.options
2498            .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2499        self.options.set_enable_blob_gc(true);
2500        // Since each blob can have non-trivial size overhead, and compression does not
2501        // work across blobs, set a min blob size in bytes to so small
2502        // transactions and effects are kept in sst files.
2503        self.options.set_min_blob_size(min_blob_size);
2504
2505        // Increase write buffer size to 256MiB.
2506        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2507            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2508            * 1024
2509            * 1024;
2510        self.options.set_write_buffer_size(write_buffer_size);
2511        // Since large blobs are not in sst files, reduce the target file size and base
2512        // level target size.
2513        let target_file_size_base = 64 << 20;
2514        self.options
2515            .set_target_file_size_base(target_file_size_base);
2516        // Level 1 default to 64MiB * 4 ~ 256MiB.
2517        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2518            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2519        self.options
2520            .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2521
2522        self
2523    }
2524
2525    // Optimize tables with a mix of lookup and scan workloads.
2526    pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2527        self.options
2528            .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2529        self
2530    }
2531
2532    // Optimize DB receiving significant insertions.
2533    pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2534        self.options
2535            .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2536        self.options
2537            .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2538        self
2539    }
2540
2541    // Optimize tables receiving significant insertions.
2542    pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2543        // Increase write buffer size to 256MiB.
2544        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2545            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2546            * 1024
2547            * 1024;
2548        self.options.set_write_buffer_size(write_buffer_size);
2549        // Increase write buffers to keep to 6 before slowing down writes.
2550        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2551            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2552        self.options
2553            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2554        // Keep 1 write buffer so recent writes can be read from memory.
2555        self.options
2556            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2557
2558        // Increase compaction trigger for level 0 to 6.
2559        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2560            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2561        self.options.set_level_zero_file_num_compaction_trigger(
2562            max_level_zero_file_num.try_into().unwrap(),
2563        );
2564        self.options.set_level_zero_slowdown_writes_trigger(
2565            (max_level_zero_file_num * 12).try_into().unwrap(),
2566        );
2567        self.options
2568            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2569
2570        // Increase sst file size to 128MiB.
2571        self.options.set_target_file_size_base(
2572            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2573                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2574                * 1024
2575                * 1024,
2576        );
2577
2578        // Increase level 1 target size to 256MiB * 6 ~ 1.5GiB.
2579        self.options
2580            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2581
2582        self
2583    }
2584
2585    // Optimize tables receiving significant insertions, without any deletions.
2586    // TODO: merge this function with optimize_for_write_throughput(), and use a
2587    // flag to indicate if deletion is received.
2588    pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2589        // Increase write buffer size to 256MiB.
2590        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2591            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2592            * 1024
2593            * 1024;
2594        self.options.set_write_buffer_size(write_buffer_size);
2595        // Increase write buffers to keep to 6 before slowing down writes.
2596        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2597            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2598        self.options
2599            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2600        // Keep 1 write buffer so recent writes can be read from memory.
2601        self.options
2602            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2603
2604        // Switch to universal compactions.
2605        self.options
2606            .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2607        let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2608        compaction_options.set_max_size_amplification_percent(10000);
2609        compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2610        self.options
2611            .set_universal_compaction_options(&compaction_options);
2612
2613        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2614            .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2615        self.options.set_level_zero_file_num_compaction_trigger(
2616            max_level_zero_file_num.try_into().unwrap(),
2617        );
2618        self.options.set_level_zero_slowdown_writes_trigger(
2619            (max_level_zero_file_num * 12).try_into().unwrap(),
2620        );
2621        self.options
2622            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2623
2624        // Increase sst file size to 128MiB.
2625        self.options.set_target_file_size_base(
2626            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2627                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2628                * 1024
2629                * 1024,
2630        );
2631
2632        // This should be a no-op for universal compaction but increasing it to be safe.
2633        self.options
2634            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2635
2636        self
2637    }
2638
2639    // Overrides the block options with different block cache size and block size.
2640    pub fn set_block_options(
2641        mut self,
2642        block_cache_size_mb: usize,
2643        block_size_bytes: usize,
2644    ) -> DBOptions {
2645        self.options
2646            .set_block_based_table_factory(&get_block_options(
2647                block_cache_size_mb,
2648                block_size_bytes,
2649            ));
2650        self
2651    }
2652
2653    // Disables write stalling and stopping based on pending compaction bytes.
2654    pub fn disable_write_throttling(mut self) -> DBOptions {
2655        self.options.set_soft_pending_compaction_bytes_limit(0);
2656        self.options.set_hard_pending_compaction_bytes_limit(0);
2657        self
2658    }
2659}
2660
2661/// Creates a default RocksDB option, to be used when RocksDB option is
2662/// unspecified.
2663pub fn default_db_options() -> DBOptions {
2664    let mut opt = rocksdb::Options::default();
2665
2666    // One common issue when running tests on Mac is that the default ulimit is too
2667    // low, leading to I/O errors such as "Too many open files". Raising fdlimit
2668    // to bypass it.
2669    if let Some(limit) = fdlimit::raise_fd_limit() {
2670        // on windows raise_fd_limit return None
2671        opt.set_max_open_files((limit / 8) as i32);
2672    }
2673
2674    // The table cache is locked for updates and this determines the number
2675    // of shards, ie 2^10. Increase in case of lock contentions.
2676    opt.set_table_cache_num_shard_bits(10);
2677
2678    // LSM compression settings
2679    opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2680    opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2681    opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2682
2683    // IOTA uses multiple RocksDB in a node, so total sizes of write buffers and WAL
2684    // can be higher than the limits below.
2685    //
2686    // RocksDB also exposes the option to configure total write buffer size across
2687    // multiple instances via `write_buffer_manager`. But the write buffer flush
2688    // policy (flushing the buffer receiving the next write) may not work well.
2689    // So sticking to per-db write buffer size limit for now.
2690    //
2691    // The environment variables are only meant to be emergency overrides. They may
2692    // go away in future. It is preferable to update the default value, or
2693    // override the option in code.
2694    opt.set_db_write_buffer_size(
2695        read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2696            * 1024
2697            * 1024,
2698    );
2699    opt.set_max_total_wal_size(
2700        read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2701    );
2702
2703    // Num threads for compactions and memtable flushes.
2704    opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2705
2706    opt.set_enable_pipelined_write(true);
2707
2708    // Increase block size to 16KiB.
2709    // https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md#indexes-and-filter-blocks
2710    opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2711
2712    // Set memtable bloomfilter.
2713    opt.set_memtable_prefix_bloom_ratio(0.02);
2714
2715    DBOptions {
2716        options: opt,
2717        rw_options: ReadWriteOptions::default(),
2718    }
2719}
2720
2721fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2722    // Set options mostly similar to those used in optimize_for_point_lookup(),
2723    // except non-default binary and hash index, to hopefully reduce lookup
2724    // latencies without causing any regression for scanning, with slightly more
2725    // memory usages. https://github.com/facebook/rocksdb/blob/11cb6af6e5009c51794641905ca40ce5beec7fee/options/options.cc#L611-L621
2726    let mut block_options = BlockBasedOptions::default();
2727    // Overrides block size.
2728    block_options.set_block_size(block_size_bytes);
2729    // Configure a block cache.
2730    block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2731    // Set a bloomfilter with 1% false positive rate.
2732    block_options.set_bloom_filter(10.0, false);
2733    // From https://github.com/EighteenZi/rocksdb_wiki/blob/master/Block-Cache.md#caching-index-and-filter-blocks
2734    block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2735    block_options
2736}
2737
2738/// Opens a database with options, and a number of column families that are
2739/// created if they do not exist.
2740#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2741pub fn open_cf<P: AsRef<Path>>(
2742    path: P,
2743    db_options: Option<rocksdb::Options>,
2744    metric_conf: MetricConf,
2745    opt_cfs: &[&str],
2746) -> Result<Arc<RocksDB>, TypedStoreError> {
2747    let options = db_options.unwrap_or_else(|| default_db_options().options);
2748    let column_descriptors: Vec<_> = opt_cfs
2749        .iter()
2750        .map(|name| (*name, options.clone()))
2751        .collect();
2752    open_cf_opts(
2753        path,
2754        Some(options.clone()),
2755        metric_conf,
2756        &column_descriptors[..],
2757    )
2758}
2759
2760fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2761    // Customize database options
2762    let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2763    options.create_if_missing(true);
2764    options.create_missing_column_families(true);
2765    options
2766}
2767
2768/// Opens a database with options, and a number of column families with
2769/// individual options that are created if they do not exist.
2770#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2771pub fn open_cf_opts<P: AsRef<Path>>(
2772    path: P,
2773    db_options: Option<rocksdb::Options>,
2774    metric_conf: MetricConf,
2775    opt_cfs: &[(&str, rocksdb::Options)],
2776) -> Result<Arc<RocksDB>, TypedStoreError> {
2777    let path = path.as_ref();
2778    // In the simulator, we intercept the wall clock in the test thread only. This
2779    // causes problems because rocksdb uses the simulated clock when creating
2780    // its background threads, but then those threads see the real wall clock
2781    // (because they are not the test thread), which causes rocksdb to panic.
2782    // The `nondeterministic` macro evaluates expressions in new threads, which
2783    // resolves the issue.
2784    //
2785    // This is a no-op in non-simulator builds.
2786
2787    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2788    nondeterministic!({
2789        let options = prepare_db_options(db_options);
2790        let rocksdb = {
2791            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2792                &options,
2793                path,
2794                cfs.into_iter()
2795                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2796            )
2797            .map_err(typed_store_err_from_rocks_err)?
2798        };
2799        Ok(Arc::new(RocksDB::DBWithThreadMode(
2800            DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2801        )))
2802    })
2803}
2804
2805/// Opens a database with options, and a number of column families with
2806/// individual options that are created if they do not exist.
2807#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2808pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2809    path: P,
2810    db_options: Option<rocksdb::Options>,
2811    metric_conf: MetricConf,
2812    opt_cfs: &[(&str, rocksdb::Options)],
2813) -> Result<Arc<RocksDB>, TypedStoreError> {
2814    let path = path.as_ref();
2815    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2816    // See comment above for explanation of why nondeterministic is necessary here.
2817    nondeterministic!({
2818        let options = prepare_db_options(db_options);
2819        let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2820            &options,
2821            path,
2822            cfs.into_iter()
2823                .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2824        )
2825        .map_err(typed_store_err_from_rocks_err)?;
2826        Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2827            OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2828        )))
2829    })
2830}
2831
2832/// Opens a database with options, and a number of column families with
2833/// individual options that are created if they do not exist.
2834pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2835    primary_path: P,
2836    secondary_path: Option<P>,
2837    db_options: Option<rocksdb::Options>,
2838    metric_conf: MetricConf,
2839    opt_cfs: &[(&str, rocksdb::Options)],
2840) -> Result<Arc<RocksDB>, TypedStoreError> {
2841    let primary_path = primary_path.as_ref();
2842    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2843    // See comment above for explanation of why nondeterministic is necessary here.
2844    nondeterministic!({
2845        // Customize database options
2846        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2847
2848        fdlimit::raise_fd_limit();
2849        // This is a requirement by RocksDB when opening as secondary
2850        options.set_max_open_files(-1);
2851
2852        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2853        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2854            .ok()
2855            .unwrap_or_default();
2856
2857        let default_db_options = default_db_options();
2858        // Add CFs not explicitly listed
2859        for cf_key in cfs.iter() {
2860            if !opt_cfs.contains_key(&cf_key[..]) {
2861                opt_cfs.insert(cf_key, default_db_options.options.clone());
2862            }
2863        }
2864
2865        let primary_path = primary_path.to_path_buf();
2866        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2867            let mut s = primary_path.clone();
2868            s.pop();
2869            s.push("SECONDARY");
2870            s.as_path().to_path_buf()
2871        });
2872
2873        let rocksdb = {
2874            options.create_if_missing(true);
2875            options.create_missing_column_families(true);
2876            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2877                &options,
2878                &primary_path,
2879                &secondary_path,
2880                opt_cfs
2881                    .iter()
2882                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2883            )
2884            .map_err(typed_store_err_from_rocks_err)?;
2885            db.try_catch_up_with_primary()
2886                .map_err(typed_store_err_from_rocks_err)?;
2887            db
2888        };
2889        Ok(Arc::new(RocksDB::DBWithThreadMode(
2890            DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2891        )))
2892    })
2893}
2894
2895pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2896    const DB_DEFAULT_CF_NAME: &str = "default";
2897
2898    let opts = rocksdb::Options::default();
2899    rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2900        .map_err(|e| e.into())
2901        .map(|q| {
2902            q.iter()
2903                .filter_map(|s| {
2904                    // The `default` table is not used
2905                    if s != DB_DEFAULT_CF_NAME {
2906                        Some(s.clone())
2907                    } else {
2908                        None
2909                    }
2910                })
2911                .collect()
2912        })
2913}
2914
2915/// TODO: Good description of why we're doing this : RocksDB stores keys in BE and has a seek operator on iterators, see `https://github.com/facebook/rocksdb/wiki/Iterator#introduction`
2916#[inline]
2917pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2918where
2919    S: ?Sized + serde::Serialize,
2920{
2921    bincode::DefaultOptions::new()
2922        .with_big_endian()
2923        .with_fixint_encoding()
2924        .serialize(t)
2925        .map_err(typed_store_err_from_bincode_err)
2926}
2927
2928#[derive(Clone)]
2929pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2930impl DBMapTableConfigMap {
2931    pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2932        Self(map)
2933    }
2934
2935    pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2936        self.0.clone()
2937    }
2938}
2939
2940pub enum RocksDBAccessType {
2941    Primary,
2942    Secondary(Option<PathBuf>),
2943}
2944
2945pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2946    rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2947}
2948
2949fn populate_missing_cfs(
2950    input_cfs: &[(&str, rocksdb::Options)],
2951    path: &Path,
2952) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2953    let mut cfs = vec![];
2954    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2955    let existing_cfs =
2956        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2957            .ok()
2958            .unwrap_or_default();
2959
2960    for cf_name in existing_cfs {
2961        if !input_cf_index.contains(&cf_name[..]) {
2962            cfs.push((cf_name, rocksdb::Options::default()));
2963        }
2964    }
2965    cfs.extend(
2966        input_cfs
2967            .iter()
2968            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2969    );
2970    Ok(cfs)
2971}
2972
2973/// Given a vec<u8>, find the value which is one more than the vector
2974/// if the vector was a big endian number.
2975/// If the vector is already minimum, don't change it.
2976fn big_endian_saturating_add_one(v: &mut [u8]) {
2977    if is_max(v) {
2978        return;
2979    }
2980    for i in (0..v.len()).rev() {
2981        if v[i] == u8::MAX {
2982            v[i] = 0;
2983        } else {
2984            v[i] += 1;
2985            break;
2986        }
2987    }
2988}
2989
2990/// Check if all the bytes in the vector are 0xFF
2991fn is_max(v: &[u8]) -> bool {
2992    v.iter().all(|&x| x == u8::MAX)
2993}
2994
2995// `clippy::manual_div_ceil` is raised by code expanded by the
2996// `uint::construct_uint!` macro so it needs to be fixed by `uint`
2997#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2998#[test]
2999fn test_helpers() {
3000    let v = vec![];
3001    assert!(is_max(&v));
3002
3003    fn check_add(v: Vec<u8>) {
3004        let mut v = v;
3005        let num = Num32::from_big_endian(&v);
3006        big_endian_saturating_add_one(&mut v);
3007        assert!(num + 1 == Num32::from_big_endian(&v));
3008    }
3009
3010    uint::construct_uint! {
3011        // 32 byte number
3012        struct Num32(4);
3013    }
3014
3015    let mut v = vec![255; 32];
3016    big_endian_saturating_add_one(&mut v);
3017    assert!(Num32::MAX == Num32::from_big_endian(&v));
3018
3019    check_add(vec![1; 32]);
3020    check_add(vec![6; 32]);
3021    check_add(vec![254; 32]);
3022
3023    // TBD: More tests coming with randomized arrays
3024}