typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod keys;
8pub(crate) mod safe_iter;
9pub mod util;
10pub(crate) mod values;
11
12use std::{
13    borrow::Borrow,
14    collections::{BTreeMap, HashSet},
15    env,
16    ffi::CStr,
17    marker::PhantomData,
18    ops::{Bound, RangeBounds},
19    path::{Path, PathBuf},
20    sync::Arc,
21    time::Duration,
22};
23
24use bincode::Options;
25use collectable::TryExtend;
26use iota_macros::{fail_point, nondeterministic};
27use itertools::Itertools;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::{
30    AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
31    ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
32    IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
33    ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
34    WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
35};
36use serde::{Serialize, de::DeserializeOwned};
37use tap::TapFallible;
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, instrument, warn};
40
41use self::{iter::Iter, keys::Keys, values::Values};
42use crate::{
43    TypedStoreError,
44    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
45    rocks::{
46        errors::{
47            typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
48            typed_store_err_from_rocks_err,
49        },
50        safe_iter::SafeIter,
51    },
52    traits::{Map, TableSummary},
53};
54
55// Write buffer size per RocksDB instance can be set via the env var below.
56// If the env var is not set, use the default value in MiB.
57const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
58const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
59
60// Write ahead log size per RocksDB instance can be set via the env var below.
61// If the env var is not set, use the default value in MiB.
62const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
63const DEFAULT_DB_WAL_SIZE: usize = 1024;
64
65// Environment variable to control behavior of write throughput optimized
66// tables.
67const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
68const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
69const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
70const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
71const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
72const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
73const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
74const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
75const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
76
77// Set to 1 to disable blob storage for transactions and effects.
78const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
79
80const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
81
82// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property
83// built-in. From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
84const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
85    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
86
87const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
88
89#[cfg(test)]
90mod tests;
91
92/// A helper macro to reopen multiple column families. The macro returns
93/// a tuple of DBMap structs in the same order that the column families
94/// are defined.
95///
96/// # Arguments
97///
98/// * `db` - a reference to a rocks DB object
99/// * `cf;<ty,ty>` - a comma separated list of column families to open. For each
100///   column family a concatenation of column family name (cf) and Key-Value
101///   <ty, ty> should be provided.
102///
103/// # Examples
104///
105/// We successfully open two different column families.
106/// ```
107/// use typed_store::reopen;
108/// use typed_store::rocks::*;
109/// use tempfile::tempdir;
110/// use prometheus::Registry;
111/// use std::sync::Arc;
112/// use typed_store::metrics::DBMetrics;
113/// use core::fmt::Error;
114///
115/// #[tokio::main]
116/// async fn main() -> Result<(), Error> {
117/// const FIRST_CF: &str = "First_CF";
118/// const SECOND_CF: &str = "Second_CF";
119///
120///
121/// /// Create the rocks database reference for the desired column families
122/// let rocks = open_cf(tempdir().unwrap(), None, MetricConf::default(), &[FIRST_CF, SECOND_CF]).unwrap();
123///
124/// /// Now simply open all the column families for their expected Key-Value types
125/// let (db_map_1, db_map_2) = reopen!(&rocks, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
126/// Ok(())
127/// }
128/// ```
129#[macro_export]
130macro_rules! reopen {
131    ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
132        (
133            $(
134                DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
135            ),*
136        )
137    };
138}
139
140/// Repeatedly attempt an Optimistic Transaction until it succeeds.
141/// Since many callsites (e.g. the consensus handler) cannot proceed in the case
142/// of failed writes, this will loop forever until the transaction succeeds.
143#[macro_export]
144macro_rules! retry_transaction {
145    ($transaction:expr) => {
146        retry_transaction!($transaction, Some(20))
147    };
148
149    (
150        $transaction:expr,
151        $max_retries:expr // should be an Option<int type>, None for unlimited
152        $(,)?
153
154    ) => {{
155        use rand::{
156            distributions::{Distribution, Uniform},
157            rngs::ThreadRng,
158        };
159        use tokio::time::{Duration, sleep};
160        use tracing::{error, info};
161
162        let mut retries = 0;
163        let max_retries = $max_retries;
164        loop {
165            let status = $transaction;
166            match status {
167                Err(TypedStoreError::RetryableTransaction) => {
168                    retries += 1;
169                    // Randomized delay to help racing transactions get out of each other's way.
170                    let delay = {
171                        let mut rng = ThreadRng::default();
172                        Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
173                    };
174                    if let Some(max_retries) = max_retries {
175                        if retries > max_retries {
176                            error!(?max_retries, "max retries exceeded");
177                            break status;
178                        }
179                    }
180                    if retries > 10 {
181                        // TODO: monitoring needed?
182                        error!(?delay, ?retries, "excessive transaction retries...");
183                    } else {
184                        info!(
185                            ?delay,
186                            ?retries,
187                            "transaction write conflict detected, sleeping"
188                        );
189                    }
190                    sleep(delay).await;
191                }
192                _ => break status,
193            }
194        }
195    }};
196}
197
198#[macro_export]
199macro_rules! retry_transaction_forever {
200    ($transaction:expr) => {
201        $crate::retry_transaction!($transaction, None)
202    };
203}
204
205#[derive(Debug)]
206pub struct DBWithThreadModeWrapper {
207    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
208    pub metric_conf: MetricConf,
209    pub db_path: PathBuf,
210}
211
212impl DBWithThreadModeWrapper {
213    fn new(
214        underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
215        metric_conf: MetricConf,
216        db_path: PathBuf,
217    ) -> Self {
218        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
219        Self {
220            underlying,
221            metric_conf,
222            db_path,
223        }
224    }
225}
226
227impl Drop for DBWithThreadModeWrapper {
228    fn drop(&mut self) {
229        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
230    }
231}
232
233#[derive(Debug)]
234pub struct OptimisticTransactionDBWrapper {
235    pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
236    pub metric_conf: MetricConf,
237    pub db_path: PathBuf,
238}
239
240impl OptimisticTransactionDBWrapper {
241    fn new(
242        underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
243        metric_conf: MetricConf,
244        db_path: PathBuf,
245    ) -> Self {
246        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
247        Self {
248            underlying,
249            metric_conf,
250            db_path,
251        }
252    }
253}
254
255impl Drop for OptimisticTransactionDBWrapper {
256    fn drop(&mut self) {
257        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
258    }
259}
260
261/// Thin wrapper to unify interface across different db types
262#[derive(Debug)]
263pub enum RocksDB {
264    DBWithThreadMode(DBWithThreadModeWrapper),
265    OptimisticTransactionDB(OptimisticTransactionDBWrapper),
266}
267
268macro_rules! delegate_call {
269    ($self:ident.$method:ident($($args:ident),*)) => {
270        match $self {
271            Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
272            Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
273        }
274    }
275}
276
277impl Drop for RocksDB {
278    fn drop(&mut self) {
279        delegate_call!(self.cancel_all_background_work(/* wait */ true))
280    }
281}
282
283impl RocksDB {
284    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
285        delegate_call!(self.get(key))
286    }
287
288    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
289        &'a self,
290        keys: I,
291        readopts: &ReadOptions,
292    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
293    where
294        K: AsRef<[u8]>,
295        I: IntoIterator<Item = (&'b W, K)>,
296        W: 'b + AsColumnFamilyRef,
297    {
298        delegate_call!(self.multi_get_cf_opt(keys, readopts))
299    }
300
301    pub fn batched_multi_get_cf_opt<I, K>(
302        &self,
303        cf: &impl AsColumnFamilyRef,
304        keys: I,
305        sorted_input: bool,
306        readopts: &ReadOptions,
307    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
308    where
309        I: IntoIterator<Item = K>,
310        K: AsRef<[u8]>,
311    {
312        delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
313    }
314
315    pub fn property_int_value_cf(
316        &self,
317        cf: &impl AsColumnFamilyRef,
318        name: impl CStrLike,
319    ) -> Result<Option<u64>, rocksdb::Error> {
320        delegate_call!(self.property_int_value_cf(cf, name))
321    }
322
323    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
324        &self,
325        cf: &impl AsColumnFamilyRef,
326        key: K,
327        readopts: &ReadOptions,
328    ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
329        delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
330    }
331
332    pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
333        delegate_call!(self.cf_handle(name))
334    }
335
336    pub fn create_cf<N: AsRef<str>>(
337        &self,
338        name: N,
339        opts: &rocksdb::Options,
340    ) -> Result<(), rocksdb::Error> {
341        delegate_call!(self.create_cf(name, opts))
342    }
343
344    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
345        delegate_call!(self.drop_cf(name))
346    }
347
348    pub fn delete_file_in_range<K: AsRef<[u8]>>(
349        &self,
350        cf: &impl AsColumnFamilyRef,
351        from: K,
352        to: K,
353    ) -> Result<(), rocksdb::Error> {
354        delegate_call!(self.delete_file_in_range_cf(cf, from, to))
355    }
356
357    pub fn delete_cf<K: AsRef<[u8]>>(
358        &self,
359        cf: &impl AsColumnFamilyRef,
360        key: K,
361        writeopts: &WriteOptions,
362    ) -> Result<(), rocksdb::Error> {
363        fail_point!("delete-cf-before");
364        let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
365        fail_point!("delete-cf-after");
366        #[expect(clippy::let_and_return)]
367        ret
368    }
369
370    pub fn path(&self) -> &Path {
371        delegate_call!(self.path())
372    }
373
374    pub fn put_cf<K, V>(
375        &self,
376        cf: &impl AsColumnFamilyRef,
377        key: K,
378        value: V,
379        writeopts: &WriteOptions,
380    ) -> Result<(), rocksdb::Error>
381    where
382        K: AsRef<[u8]>,
383        V: AsRef<[u8]>,
384    {
385        fail_point!("put-cf-before");
386        let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
387        fail_point!("put-cf-after");
388        #[expect(clippy::let_and_return)]
389        ret
390    }
391
392    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
393        &self,
394        cf: &impl AsColumnFamilyRef,
395        key: K,
396        readopts: &ReadOptions,
397    ) -> bool {
398        delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
399    }
400
401    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
402        delegate_call!(self.try_catch_up_with_primary())
403    }
404
405    pub fn write(
406        &self,
407        batch: RocksDBBatch,
408        writeopts: &WriteOptions,
409    ) -> Result<(), TypedStoreError> {
410        fail_point!("batch-write-before");
411        let ret = match (self, batch) {
412            (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
413                db.underlying
414                    .write_opt(batch, writeopts)
415                    .map_err(typed_store_err_from_rocks_err)?;
416                Ok(())
417            }
418            (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
419                db.underlying
420                    .write_opt(batch, writeopts)
421                    .map_err(typed_store_err_from_rocks_err)?;
422                Ok(())
423            }
424            _ => Err(TypedStoreError::RocksDB(
425                "using invalid batch type for the database".to_string(),
426            )),
427        };
428        fail_point!("batch-write-after");
429        #[expect(clippy::let_and_return)]
430        ret
431    }
432
433    pub fn transaction_without_snapshot(
434        &self,
435    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
436        match self {
437            Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
438            Self::DBWithThreadMode(_) => panic!(),
439        }
440    }
441
442    pub fn transaction(
443        &self,
444    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
445        match self {
446            Self::OptimisticTransactionDB(db) => {
447                let mut tx_opts = OptimisticTransactionOptions::new();
448                tx_opts.set_snapshot(true);
449
450                Ok(db
451                    .underlying
452                    .transaction_opt(&WriteOptions::default(), &tx_opts))
453            }
454            Self::DBWithThreadMode(_) => panic!(),
455        }
456    }
457
458    pub fn raw_iterator_cf<'a: 'b, 'b>(
459        &'a self,
460        cf_handle: &impl AsColumnFamilyRef,
461        readopts: ReadOptions,
462    ) -> RocksDBRawIter<'b> {
463        match self {
464            Self::DBWithThreadMode(db) => {
465                RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
466            }
467            Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
468                db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
469            ),
470        }
471    }
472
473    pub fn iterator_cf<'a: 'b, 'b>(
474        &'a self,
475        cf_handle: &impl AsColumnFamilyRef,
476        readopts: ReadOptions,
477        mode: IteratorMode<'_>,
478    ) -> RocksDBIter<'b> {
479        match self {
480            Self::DBWithThreadMode(db) => {
481                RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
482            }
483            Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
484                db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
485            ),
486        }
487    }
488
489    pub fn compact_range_cf<K: AsRef<[u8]>>(
490        &self,
491        cf: &impl AsColumnFamilyRef,
492        start: Option<K>,
493        end: Option<K>,
494    ) {
495        delegate_call!(self.compact_range_cf(cf, start, end))
496    }
497
498    pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
499        &self,
500        cf: &impl AsColumnFamilyRef,
501        start: Option<K>,
502        end: Option<K>,
503    ) {
504        let opt = &mut CompactOptions::default();
505        opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
506        delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
507    }
508
509    pub fn flush(&self) -> Result<(), TypedStoreError> {
510        delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
511    }
512
513    pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
514        match self {
515            Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
516            Self::OptimisticTransactionDB(d) => {
517                RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
518            }
519        }
520    }
521
522    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
523        let checkpoint = match self {
524            Self::DBWithThreadMode(d) => {
525                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
526            }
527            Self::OptimisticTransactionDB(d) => {
528                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
529            }
530        };
531        checkpoint
532            .create_checkpoint(path)
533            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
534        Ok(())
535    }
536
537    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
538        delegate_call!(self.flush_cf(cf))
539    }
540
541    pub fn set_options_cf(
542        &self,
543        cf: &impl AsColumnFamilyRef,
544        opts: &[(&str, &str)],
545    ) -> Result<(), rocksdb::Error> {
546        delegate_call!(self.set_options_cf(cf, opts))
547    }
548
549    pub fn get_sampling_interval(&self) -> SamplingInterval {
550        match self {
551            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
552            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
553        }
554    }
555
556    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
557        match self {
558            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
559            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
560        }
561    }
562
563    pub fn write_sampling_interval(&self) -> SamplingInterval {
564        match self {
565            Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
566            Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
567        }
568    }
569
570    pub fn iter_sampling_interval(&self) -> SamplingInterval {
571        match self {
572            Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
573            Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
574        }
575    }
576
577    pub fn db_name(&self) -> String {
578        let name = match self {
579            Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
580            Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
581        };
582        if name.is_empty() {
583            self.default_db_name()
584        } else {
585            name.clone()
586        }
587    }
588
589    fn default_db_name(&self) -> String {
590        self.path()
591            .file_name()
592            .and_then(|f| f.to_str())
593            .unwrap_or("unknown")
594            .to_string()
595    }
596
597    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
598        delegate_call!(self.live_files())
599    }
600}
601
602// Check if the database is corrupted, and if so, panic.
603// If the corrupted key is not set, we set it to [1].
604pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
605    let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
606
607    db.get(DB_CORRUPTED_KEY)
608        .map_err(|e| format!("Failed to open database: {e}"))
609        .and_then(|value| match value {
610            Some(v) if v[0] == 1 => Err(
611                "Database is corrupted, please remove the current database and start clean!"
612                    .to_string(),
613            ),
614            Some(_) => Ok(()),
615            None => db
616                .put(DB_CORRUPTED_KEY, [1])
617                .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
618        })?;
619
620    Ok(())
621}
622
623pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
624    rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
625}
626
627pub enum RocksDBSnapshot<'a> {
628    DBWithThreadMode(rocksdb::Snapshot<'a>),
629    OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
630}
631
632impl<'a> RocksDBSnapshot<'a> {
633    pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
634        &'a self,
635        keys: I,
636        readopts: ReadOptions,
637    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
638    where
639        K: AsRef<[u8]>,
640        I: IntoIterator<Item = (&'b W, K)>,
641        W: 'b + AsColumnFamilyRef,
642    {
643        match self {
644            Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
645            Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
646        }
647    }
648    pub fn multi_get_cf<'b: 'a, K, I, W>(
649        &'a self,
650        keys: I,
651    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
652    where
653        K: AsRef<[u8]>,
654        I: IntoIterator<Item = (&'b W, K)>,
655        W: 'b + AsColumnFamilyRef,
656    {
657        match self {
658            Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
659            Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
660        }
661    }
662}
663
664pub enum RocksDBBatch {
665    Regular(rocksdb::WriteBatch),
666    Transactional(rocksdb::WriteBatchWithTransaction<true>),
667}
668
669macro_rules! delegate_batch_call {
670    ($self:ident.$method:ident($($args:ident),*)) => {
671        match $self {
672            Self::Regular(b) => b.$method($($args),*),
673            Self::Transactional(b) => b.$method($($args),*),
674        }
675    }
676}
677
678impl RocksDBBatch {
679    fn size_in_bytes(&self) -> usize {
680        delegate_batch_call!(self.size_in_bytes())
681    }
682
683    pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
684        delegate_batch_call!(self.delete_cf(cf, key))
685    }
686
687    pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
688    where
689        K: AsRef<[u8]>,
690        V: AsRef<[u8]>,
691    {
692        delegate_batch_call!(self.put_cf(cf, key, value))
693    }
694
695    pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
696    where
697        K: AsRef<[u8]>,
698        V: AsRef<[u8]>,
699    {
700        delegate_batch_call!(self.merge_cf(cf, key, value))
701    }
702
703    pub fn delete_range_cf<K: AsRef<[u8]>>(
704        &mut self,
705        cf: &impl AsColumnFamilyRef,
706        from: K,
707        to: K,
708    ) -> Result<(), TypedStoreError> {
709        match self {
710            Self::Regular(batch) => {
711                batch.delete_range_cf(cf, from, to);
712                Ok(())
713            }
714            Self::Transactional(_) => panic!(),
715        }
716    }
717}
718
719#[derive(Debug, Default)]
720pub struct MetricConf {
721    pub db_name: String,
722    pub read_sample_interval: SamplingInterval,
723    pub write_sample_interval: SamplingInterval,
724    pub iter_sample_interval: SamplingInterval,
725}
726
727impl MetricConf {
728    pub fn new(db_name: &str) -> Self {
729        if db_name.is_empty() {
730            error!("A meaningful db name should be used for metrics reporting.")
731        }
732        Self {
733            db_name: db_name.to_string(),
734            read_sample_interval: SamplingInterval::default(),
735            write_sample_interval: SamplingInterval::default(),
736            iter_sample_interval: SamplingInterval::default(),
737        }
738    }
739
740    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
741        Self {
742            db_name: self.db_name,
743            read_sample_interval: read_interval,
744            write_sample_interval: SamplingInterval::default(),
745            iter_sample_interval: SamplingInterval::default(),
746        }
747    }
748}
749const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
750const METRICS_ERROR: i64 = -1;
751
752/// An interface to a rocksDB database, keyed by a columnfamily
753#[derive(Clone, Debug)]
754pub struct DBMap<K, V> {
755    pub rocksdb: Arc<RocksDB>,
756    _phantom: PhantomData<fn(K) -> V>,
757    // the rocksDB ColumnFamily under which the map is stored
758    cf: String,
759    pub opts: ReadWriteOptions,
760    db_metrics: Arc<DBMetrics>,
761    get_sample_interval: SamplingInterval,
762    multiget_sample_interval: SamplingInterval,
763    write_sample_interval: SamplingInterval,
764    iter_sample_interval: SamplingInterval,
765    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
766}
767
768unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
769
770impl<K, V> DBMap<K, V> {
771    pub(crate) fn new(
772        db: Arc<RocksDB>,
773        opts: &ReadWriteOptions,
774        opt_cf: &str,
775        is_deprecated: bool,
776    ) -> Self {
777        let db_cloned = db.clone();
778        let db_metrics = DBMetrics::get();
779        let db_metrics_cloned = db_metrics.clone();
780        let cf = opt_cf.to_string();
781        let (sender, mut recv) = tokio::sync::oneshot::channel();
782        if !is_deprecated {
783            tokio::task::spawn(async move {
784                let mut interval =
785                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
786                loop {
787                    tokio::select! {
788                        _ = interval.tick() => {
789                            let db = db_cloned.clone();
790                            let cf = cf.clone();
791                            let db_metrics = db_metrics.clone();
792                            if let Err(e) = tokio::task::spawn_blocking(move || {
793                                Self::report_metrics(&db, &cf, &db_metrics);
794                            }).await {
795                                error!("Failed to log metrics with error: {}", e);
796                            }
797                        }
798                        _ = &mut recv => break,
799                    }
800                }
801                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
802            });
803        }
804        DBMap {
805            rocksdb: db.clone(),
806            opts: opts.clone(),
807            _phantom: PhantomData,
808            cf: opt_cf.to_string(),
809            db_metrics: db_metrics_cloned,
810            _metrics_task_cancel_handle: Arc::new(sender),
811            get_sample_interval: db.get_sampling_interval(),
812            multiget_sample_interval: db.multiget_sampling_interval(),
813            write_sample_interval: db.write_sampling_interval(),
814            iter_sample_interval: db.iter_sampling_interval(),
815        }
816    }
817
818    /// Opens a database from a path, with specific options and an optional
819    /// column family.
820    ///
821    /// This database is used to perform operations on single column family, and
822    /// parametrizes all operations in `DBBatch` when writing across column
823    /// families.
824    #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
825    pub fn open<P: AsRef<Path>>(
826        path: P,
827        metric_conf: MetricConf,
828        db_options: Option<rocksdb::Options>,
829        opt_cf: Option<&str>,
830        rw_options: &ReadWriteOptions,
831    ) -> Result<Self, TypedStoreError> {
832        let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
833        let cfs = vec![cf_key];
834        let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
835        Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
836    }
837
838    /// Reopens an open database as a typed map operating under a specific
839    /// column family. if no column family is passed, the default column
840    /// family is used.
841    ///
842    /// ```
843    /// use core::fmt::Error;
844    /// use std::sync::Arc;
845    ///
846    /// use prometheus::Registry;
847    /// use tempfile::tempdir;
848    /// use typed_store::{metrics::DBMetrics, rocks::*};
849    /// #[tokio::main]
850    /// async fn main() -> Result<(), Error> {
851    ///     /// Open the DB with all needed column families first.
852    ///     let rocks = open_cf(
853    ///         tempdir().unwrap(),
854    ///         None,
855    ///         MetricConf::default(),
856    ///         &["First_CF", "Second_CF"],
857    ///     )
858    ///     .unwrap();
859    ///     /// Attach the column families to specific maps.
860    ///     let db_cf_1 = DBMap::<u32, u32>::reopen(
861    ///         &rocks,
862    ///         Some("First_CF"),
863    ///         &ReadWriteOptions::default(),
864    ///         false,
865    ///     )
866    ///     .expect("Failed to open storage");
867    ///     let db_cf_2 = DBMap::<u32, u32>::reopen(
868    ///         &rocks,
869    ///         Some("Second_CF"),
870    ///         &ReadWriteOptions::default(),
871    ///         false,
872    ///     )
873    ///     .expect("Failed to open storage");
874    ///     Ok(())
875    /// }
876    /// ```
877    #[instrument(level = "debug", skip(db), err)]
878    pub fn reopen(
879        db: &Arc<RocksDB>,
880        opt_cf: Option<&str>,
881        rw_options: &ReadWriteOptions,
882        is_deprecated: bool,
883    ) -> Result<Self, TypedStoreError> {
884        let cf_key = opt_cf
885            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
886            .to_owned();
887
888        db.cf_handle(&cf_key)
889            .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
890
891        Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
892    }
893
894    pub fn batch(&self) -> DBBatch {
895        let batch = match *self.rocksdb {
896            RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
897            RocksDB::OptimisticTransactionDB(_) => {
898                RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
899            }
900        };
901        DBBatch::new(
902            &self.rocksdb,
903            batch,
904            self.opts.writeopts(),
905            &self.db_metrics,
906            &self.write_sample_interval,
907        )
908    }
909
910    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
911        let from_buf = be_fix_int_ser(start)?;
912        let to_buf = be_fix_int_ser(end)?;
913        self.rocksdb
914            .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
915        Ok(())
916    }
917
918    pub fn compact_range_raw(
919        &self,
920        cf_name: &str,
921        start: Vec<u8>,
922        end: Vec<u8>,
923    ) -> Result<(), TypedStoreError> {
924        let cf = self
925            .rocksdb
926            .cf_handle(cf_name)
927            .expect("compact range: column family does not exist");
928        self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
929        Ok(())
930    }
931
932    pub fn compact_range_to_bottom<J: Serialize>(
933        &self,
934        start: &J,
935        end: &J,
936    ) -> Result<(), TypedStoreError> {
937        let from_buf = be_fix_int_ser(start)?;
938        let to_buf = be_fix_int_ser(end)?;
939        self.rocksdb
940            .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
941        Ok(())
942    }
943
944    pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
945        self.rocksdb
946            .cf_handle(&self.cf)
947            .expect("Map-keying column family should have been checked at DB creation")
948    }
949
950    pub fn iterator_cf(&self) -> RocksDBIter<'_> {
951        self.rocksdb
952            .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
953    }
954
955    pub fn flush(&self) -> Result<(), TypedStoreError> {
956        self.rocksdb
957            .flush_cf(&self.cf())
958            .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
959    }
960
961    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
962        self.rocksdb.set_options_cf(&self.cf(), opts)
963    }
964
965    fn get_int_property(
966        rocksdb: &RocksDB,
967        cf: &impl AsColumnFamilyRef,
968        property_name: &std::ffi::CStr,
969    ) -> Result<i64, TypedStoreError> {
970        match rocksdb.property_int_value_cf(cf, property_name) {
971            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
972            Ok(None) => Ok(0),
973            Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
974        }
975    }
976
977    /// Returns a vector of raw values corresponding to the keys provided.
978    fn multi_get_pinned<J>(
979        &self,
980        keys: impl IntoIterator<Item = J>,
981    ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
982    where
983        J: Borrow<K>,
984        K: Serialize,
985    {
986        let _timer = self
987            .db_metrics
988            .op_metrics
989            .rocksdb_multiget_latency_seconds
990            .with_label_values(&[&self.cf])
991            .start_timer();
992        let perf_ctx = if self.multiget_sample_interval.sample() {
993            Some(RocksDBPerfContext)
994        } else {
995            None
996        };
997        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
998            .into_iter()
999            .map(|k| be_fix_int_ser(k.borrow()))
1000            .collect();
1001        let results: Result<Vec<_>, TypedStoreError> = self
1002            .rocksdb
1003            .batched_multi_get_cf_opt(
1004                &self.cf(),
1005                keys_bytes?,
1006                // sorted_keys=
1007                false,
1008                &self.opts.readopts(),
1009            )
1010            .into_iter()
1011            .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
1012            .collect();
1013        let entries = results?;
1014        let entry_size = entries
1015            .iter()
1016            .flatten()
1017            .map(|entry| entry.len())
1018            .sum::<usize>();
1019        self.db_metrics
1020            .op_metrics
1021            .rocksdb_multiget_bytes
1022            .with_label_values(&[&self.cf])
1023            .observe(entry_size as f64);
1024        if perf_ctx.is_some() {
1025            self.db_metrics
1026                .read_perf_ctx_metrics
1027                .report_metrics(&self.cf);
1028        }
1029        Ok(entries)
1030    }
1031
1032    fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
1033        let Some(cf) = rocksdb.cf_handle(cf_name) else {
1034            tracing::warn!(
1035                "unable to report metrics for cf {cf_name:?} in db {:?}",
1036                rocksdb.db_name()
1037            );
1038            return;
1039        };
1040
1041        db_metrics
1042            .cf_metrics
1043            .rocksdb_total_sst_files_size
1044            .with_label_values(&[cf_name])
1045            .set(
1046                Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1047                    .unwrap_or(METRICS_ERROR),
1048            );
1049        db_metrics
1050            .cf_metrics
1051            .rocksdb_total_blob_files_size
1052            .with_label_values(&[cf_name])
1053            .set(
1054                Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1055                    .unwrap_or(METRICS_ERROR),
1056            );
1057        // 7 is the default number of levels in RocksDB. If we ever change the number of
1058        // levels using `set_num_levels`, we need to update here as well. Note
1059        // that there isn't an API to query the DB to get the number of levels (yet).
1060        let total_num_files: i64 = (0..=6)
1061            .map(|level| {
1062                Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
1063                    .unwrap_or(METRICS_ERROR)
1064            })
1065            .sum();
1066        db_metrics
1067            .cf_metrics
1068            .rocksdb_total_num_files
1069            .with_label_values(&[cf_name])
1070            .set(total_num_files);
1071        db_metrics
1072            .cf_metrics
1073            .rocksdb_num_level0_files
1074            .with_label_values(&[cf_name])
1075            .set(
1076                Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
1077                    .unwrap_or(METRICS_ERROR),
1078            );
1079        db_metrics
1080            .cf_metrics
1081            .rocksdb_current_size_active_mem_tables
1082            .with_label_values(&[cf_name])
1083            .set(
1084                Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1085                    .unwrap_or(METRICS_ERROR),
1086            );
1087        db_metrics
1088            .cf_metrics
1089            .rocksdb_size_all_mem_tables
1090            .with_label_values(&[cf_name])
1091            .set(
1092                Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1093                    .unwrap_or(METRICS_ERROR),
1094            );
1095        db_metrics
1096            .cf_metrics
1097            .rocksdb_num_snapshots
1098            .with_label_values(&[cf_name])
1099            .set(
1100                Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1101                    .unwrap_or(METRICS_ERROR),
1102            );
1103        db_metrics
1104            .cf_metrics
1105            .rocksdb_oldest_snapshot_time
1106            .with_label_values(&[cf_name])
1107            .set(
1108                Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1109                    .unwrap_or(METRICS_ERROR),
1110            );
1111        db_metrics
1112            .cf_metrics
1113            .rocksdb_actual_delayed_write_rate
1114            .with_label_values(&[cf_name])
1115            .set(
1116                Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1117                    .unwrap_or(METRICS_ERROR),
1118            );
1119        db_metrics
1120            .cf_metrics
1121            .rocksdb_is_write_stopped
1122            .with_label_values(&[cf_name])
1123            .set(
1124                Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1125                    .unwrap_or(METRICS_ERROR),
1126            );
1127        db_metrics
1128            .cf_metrics
1129            .rocksdb_block_cache_capacity
1130            .with_label_values(&[cf_name])
1131            .set(
1132                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1133                    .unwrap_or(METRICS_ERROR),
1134            );
1135        db_metrics
1136            .cf_metrics
1137            .rocksdb_block_cache_usage
1138            .with_label_values(&[cf_name])
1139            .set(
1140                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1141                    .unwrap_or(METRICS_ERROR),
1142            );
1143        db_metrics
1144            .cf_metrics
1145            .rocksdb_block_cache_pinned_usage
1146            .with_label_values(&[cf_name])
1147            .set(
1148                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1149                    .unwrap_or(METRICS_ERROR),
1150            );
1151        db_metrics
1152            .cf_metrics
1153            .rocksdb_estimate_table_readers_mem
1154            .with_label_values(&[cf_name])
1155            .set(
1156                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1157                    .unwrap_or(METRICS_ERROR),
1158            );
1159        db_metrics
1160            .cf_metrics
1161            .rocksdb_estimated_num_keys
1162            .with_label_values(&[cf_name])
1163            .set(
1164                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1165                    .unwrap_or(METRICS_ERROR),
1166            );
1167        db_metrics
1168            .cf_metrics
1169            .rocksdb_num_immutable_mem_tables
1170            .with_label_values(&[cf_name])
1171            .set(
1172                Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1173                    .unwrap_or(METRICS_ERROR),
1174            );
1175        db_metrics
1176            .cf_metrics
1177            .rocksdb_mem_table_flush_pending
1178            .with_label_values(&[cf_name])
1179            .set(
1180                Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1181                    .unwrap_or(METRICS_ERROR),
1182            );
1183        db_metrics
1184            .cf_metrics
1185            .rocksdb_compaction_pending
1186            .with_label_values(&[cf_name])
1187            .set(
1188                Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1189                    .unwrap_or(METRICS_ERROR),
1190            );
1191        db_metrics
1192            .cf_metrics
1193            .rocksdb_estimate_pending_compaction_bytes
1194            .with_label_values(&[cf_name])
1195            .set(
1196                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1197                    .unwrap_or(METRICS_ERROR),
1198            );
1199        db_metrics
1200            .cf_metrics
1201            .rocksdb_num_running_compactions
1202            .with_label_values(&[cf_name])
1203            .set(
1204                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1205                    .unwrap_or(METRICS_ERROR),
1206            );
1207        db_metrics
1208            .cf_metrics
1209            .rocksdb_num_running_flushes
1210            .with_label_values(&[cf_name])
1211            .set(
1212                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1213                    .unwrap_or(METRICS_ERROR),
1214            );
1215        db_metrics
1216            .cf_metrics
1217            .rocksdb_estimate_oldest_key_time
1218            .with_label_values(&[cf_name])
1219            .set(
1220                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1221                    .unwrap_or(METRICS_ERROR),
1222            );
1223        db_metrics
1224            .cf_metrics
1225            .rocksdb_background_errors
1226            .with_label_values(&[cf_name])
1227            .set(
1228                Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1229                    .unwrap_or(METRICS_ERROR),
1230            );
1231        db_metrics
1232            .cf_metrics
1233            .rocksdb_base_level
1234            .with_label_values(&[cf_name])
1235            .set(
1236                Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1237                    .unwrap_or(METRICS_ERROR),
1238            );
1239    }
1240
1241    pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1242        DBTransaction::new(&self.rocksdb)
1243    }
1244
1245    pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1246        DBTransaction::new_without_snapshot(&self.rocksdb)
1247    }
1248
1249    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1250        self.rocksdb.checkpoint(path)
1251    }
1252
1253    pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1254        Ok(self.rocksdb.snapshot())
1255    }
1256
1257    pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1258        let mut num_keys = 0;
1259        let mut key_bytes_total = 0;
1260        let mut value_bytes_total = 0;
1261        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1262        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1263        let iter = self.iterator_cf().map(Result::unwrap);
1264        for (key, value) in iter {
1265            num_keys += 1;
1266            key_bytes_total += key.len();
1267            value_bytes_total += value.len();
1268            key_hist.record(key.len() as u64)?;
1269            value_hist.record(value.len() as u64)?;
1270        }
1271        Ok(TableSummary {
1272            num_keys,
1273            key_bytes_total,
1274            value_bytes_total,
1275            key_hist,
1276            value_hist,
1277        })
1278    }
1279
1280    // Creates metrics and context for tracking an iterator usage and performance.
1281    fn create_iter_context(
1282        &self,
1283    ) -> (
1284        Option<HistogramTimer>,
1285        Option<Histogram>,
1286        Option<Histogram>,
1287        Option<RocksDBPerfContext>,
1288    ) {
1289        let timer = self
1290            .db_metrics
1291            .op_metrics
1292            .rocksdb_iter_latency_seconds
1293            .with_label_values(&[&self.cf])
1294            .start_timer();
1295        let bytes_scanned = self
1296            .db_metrics
1297            .op_metrics
1298            .rocksdb_iter_bytes
1299            .with_label_values(&[&self.cf]);
1300        let keys_scanned = self
1301            .db_metrics
1302            .op_metrics
1303            .rocksdb_iter_keys
1304            .with_label_values(&[&self.cf]);
1305        let perf_ctx = if self.iter_sample_interval.sample() {
1306            Some(RocksDBPerfContext)
1307        } else {
1308            None
1309        };
1310        (
1311            Some(timer),
1312            Some(bytes_scanned),
1313            Some(keys_scanned),
1314            perf_ctx,
1315        )
1316    }
1317
1318    // Creates a RocksDB read option with specified lower and upper bounds.
1319    /// Lower bound is inclusive, and upper bound is exclusive.
1320    fn create_read_options_with_bounds(
1321        &self,
1322        lower_bound: Option<K>,
1323        upper_bound: Option<K>,
1324    ) -> ReadOptions
1325    where
1326        K: Serialize,
1327    {
1328        let mut readopts = self.opts.readopts();
1329        if let Some(lower_bound) = lower_bound {
1330            let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1331            readopts.set_iterate_lower_bound(key_buf);
1332        }
1333        if let Some(upper_bound) = upper_bound {
1334            let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1335            readopts.set_iterate_upper_bound(key_buf);
1336        }
1337        readopts
1338    }
1339
1340    // Creates a RocksDB read option with lower and upper bounds set corresponding
1341    // to `range`.
1342    fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1343    where
1344        K: Serialize,
1345    {
1346        let mut readopts = self.opts.readopts();
1347
1348        let lower_bound = range.start_bound();
1349        let upper_bound = range.end_bound();
1350
1351        match lower_bound {
1352            Bound::Included(lower_bound) => {
1353                // Rocksdb lower bound is inclusive by default so nothing to do
1354                let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1355                readopts.set_iterate_lower_bound(key_buf);
1356            }
1357            Bound::Excluded(lower_bound) => {
1358                let mut key_buf =
1359                    be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1360
1361                // Since we want exclusive, we need to increment the key to exclude the previous
1362                big_endian_saturating_add_one(&mut key_buf);
1363                readopts.set_iterate_lower_bound(key_buf);
1364            }
1365            Bound::Unbounded => (),
1366        };
1367
1368        match upper_bound {
1369            Bound::Included(upper_bound) => {
1370                let mut key_buf =
1371                    be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1372
1373                // If the key is already at the limit, there's nowhere else to go, so no upper
1374                // bound
1375                if !is_max(&key_buf) {
1376                    // Since we want exclusive, we need to increment the key to get the upper bound
1377                    big_endian_saturating_add_one(&mut key_buf);
1378                    readopts.set_iterate_upper_bound(key_buf);
1379                }
1380            }
1381            Bound::Excluded(upper_bound) => {
1382                // Rocksdb upper bound is inclusive by default so nothing to do
1383                let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1384                readopts.set_iterate_upper_bound(key_buf);
1385            }
1386            Bound::Unbounded => (),
1387        };
1388
1389        readopts
1390    }
1391}
1392
1393/// Provides a mutable struct to form a collection of database write operations,
1394/// and execute them.
1395///
1396/// Batching write and delete operations is faster than performing them one by
1397/// one and ensures their atomicity,  ie. they are all written or none is.
1398/// This is also true of operations across column families in the same database.
1399///
1400/// Serializations / Deserialization, and naming of column families is performed
1401/// by passing a DBMap<K,V> with each operation.
1402///
1403/// ```
1404/// use core::fmt::Error;
1405/// use std::sync::Arc;
1406///
1407/// use prometheus::Registry;
1408/// use tempfile::tempdir;
1409/// use typed_store::{Map, metrics::DBMetrics, rocks::*};
1410///
1411/// #[tokio::main]
1412/// async fn main() -> Result<(), Error> {
1413///     let rocks = open_cf(
1414///         tempfile::tempdir().unwrap(),
1415///         None,
1416///         MetricConf::default(),
1417///         &["First_CF", "Second_CF"],
1418///     )
1419///     .unwrap();
1420///
1421///     let db_cf_1 = DBMap::reopen(
1422///         &rocks,
1423///         Some("First_CF"),
1424///         &ReadWriteOptions::default(),
1425///         false,
1426///     )
1427///     .expect("Failed to open storage");
1428///     let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1429///
1430///     let db_cf_2 = DBMap::reopen(
1431///         &rocks,
1432///         Some("Second_CF"),
1433///         &ReadWriteOptions::default(),
1434///         false,
1435///     )
1436///     .expect("Failed to open storage");
1437///     let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1438///
1439///     let mut batch = db_cf_1.batch();
1440///     batch
1441///         .insert_batch(&db_cf_1, keys_vals_1.clone())
1442///         .expect("Failed to batch insert")
1443///         .insert_batch(&db_cf_2, keys_vals_2.clone())
1444///         .expect("Failed to batch insert");
1445///
1446///     let _ = batch.write().expect("Failed to execute batch");
1447///     for (k, v) in keys_vals_1 {
1448///         let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1449///         assert_eq!(Some(v), val);
1450///     }
1451///
1452///     for (k, v) in keys_vals_2 {
1453///         let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1454///         assert_eq!(Some(v), val);
1455///     }
1456///     Ok(())
1457/// }
1458/// ```
1459pub struct DBBatch {
1460    rocksdb: Arc<RocksDB>,
1461    batch: RocksDBBatch,
1462    opts: WriteOptions,
1463    db_metrics: Arc<DBMetrics>,
1464    write_sample_interval: SamplingInterval,
1465}
1466
1467impl DBBatch {
1468    /// Create a new batch associated with a DB reference.
1469    ///
1470    /// Use `open_cf` to get the DB reference or an existing open database.
1471    pub fn new(
1472        dbref: &Arc<RocksDB>,
1473        batch: RocksDBBatch,
1474        opts: WriteOptions,
1475        db_metrics: &Arc<DBMetrics>,
1476        write_sample_interval: &SamplingInterval,
1477    ) -> Self {
1478        DBBatch {
1479            rocksdb: dbref.clone(),
1480            batch,
1481            opts,
1482            db_metrics: db_metrics.clone(),
1483            write_sample_interval: write_sample_interval.clone(),
1484        }
1485    }
1486
1487    /// Consume the batch and write its operations to the database
1488    #[instrument(level = "trace", skip_all, err)]
1489    pub fn write(self) -> Result<(), TypedStoreError> {
1490        let db_name = self.rocksdb.db_name();
1491        let timer = self
1492            .db_metrics
1493            .op_metrics
1494            .rocksdb_batch_commit_latency_seconds
1495            .with_label_values(&[&db_name])
1496            .start_timer();
1497        let batch_size = self.batch.size_in_bytes();
1498
1499        let perf_ctx = if self.write_sample_interval.sample() {
1500            Some(RocksDBPerfContext)
1501        } else {
1502            None
1503        };
1504        self.rocksdb.write(self.batch, &self.opts)?;
1505        self.db_metrics
1506            .op_metrics
1507            .rocksdb_batch_commit_bytes
1508            .with_label_values(&[&db_name])
1509            .observe(batch_size as f64);
1510
1511        if perf_ctx.is_some() {
1512            self.db_metrics
1513                .write_perf_ctx_metrics
1514                .report_metrics(&db_name);
1515        }
1516        let elapsed = timer.stop_and_record();
1517        if elapsed > 1.0 {
1518            warn!(?elapsed, ?db_name, "very slow batch write");
1519            self.db_metrics
1520                .op_metrics
1521                .rocksdb_very_slow_batch_writes_count
1522                .with_label_values(&[&db_name])
1523                .inc();
1524            self.db_metrics
1525                .op_metrics
1526                .rocksdb_very_slow_batch_writes_duration_ms
1527                .with_label_values(&[&db_name])
1528                .inc_by((elapsed * 1000.0) as u64);
1529        }
1530        Ok(())
1531    }
1532
1533    pub fn size_in_bytes(&self) -> usize {
1534        self.batch.size_in_bytes()
1535    }
1536}
1537
1538impl DBBatch {
1539    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1540        &mut self,
1541        db: &DBMap<K, V>,
1542        purged_vals: impl IntoIterator<Item = J>,
1543    ) -> Result<(), TypedStoreError> {
1544        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1545            return Err(TypedStoreError::CrossDBBatch);
1546        }
1547
1548        purged_vals
1549            .into_iter()
1550            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1551                let k_buf = be_fix_int_ser(k.borrow())?;
1552                self.batch.delete_cf(&db.cf(), k_buf);
1553
1554                Ok(())
1555            })?;
1556        Ok(())
1557    }
1558
1559    /// Deletes a range of keys between `from` (inclusive) and `to`
1560    /// (non-inclusive) by writing a range delete tombstone in the db map
1561    /// If the DBMap is configured with ignore_range_deletions set to false,
1562    /// the effect of this write will be visible immediately i.e. you won't
1563    /// see old values when you do a lookup or scan. But if it is configured
1564    /// with ignore_range_deletions set to true, the old value are visible until
1565    /// compaction actually deletes them which will happen sometime after. By
1566    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1567    /// overridden in the config), so please use this function with caution
1568    pub fn schedule_delete_range<K: Serialize, V>(
1569        &mut self,
1570        db: &DBMap<K, V>,
1571        from: &K,
1572        to: &K,
1573    ) -> Result<(), TypedStoreError> {
1574        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1575            return Err(TypedStoreError::CrossDBBatch);
1576        }
1577
1578        let from_buf = be_fix_int_ser(from)?;
1579        let to_buf = be_fix_int_ser(to)?;
1580
1581        self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1582        Ok(())
1583    }
1584
1585    /// inserts a range of (key, value) pairs given as an iterator
1586    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1587        &mut self,
1588        db: &DBMap<K, V>,
1589        new_vals: impl IntoIterator<Item = (J, U)>,
1590    ) -> Result<&mut Self, TypedStoreError> {
1591        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1592            return Err(TypedStoreError::CrossDBBatch);
1593        }
1594        let mut total = 0usize;
1595        new_vals
1596            .into_iter()
1597            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1598                let k_buf = be_fix_int_ser(k.borrow())?;
1599                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1600                total += k_buf.len() + v_buf.len();
1601                self.batch.put_cf(&db.cf(), k_buf, v_buf);
1602                Ok(())
1603            })?;
1604        self.db_metrics
1605            .op_metrics
1606            .rocksdb_batch_put_bytes
1607            .with_label_values(&[&db.cf])
1608            .observe(total as f64);
1609        Ok(self)
1610    }
1611
1612    /// merges a range of (key, value) pairs given as an iterator
1613    pub fn merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1614        &mut self,
1615        db: &DBMap<K, V>,
1616        new_vals: impl IntoIterator<Item = (J, U)>,
1617    ) -> Result<&mut Self, TypedStoreError> {
1618        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1619            return Err(TypedStoreError::CrossDBBatch);
1620        }
1621
1622        new_vals
1623            .into_iter()
1624            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1625                let k_buf = be_fix_int_ser(k.borrow())?;
1626                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1627                self.batch.merge_cf(&db.cf(), k_buf, v_buf);
1628                Ok(())
1629            })?;
1630        Ok(self)
1631    }
1632
1633    /// similar to `merge_batch` but allows merge with partial values
1634    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, V: Serialize, B: AsRef<[u8]>>(
1635        &mut self,
1636        db: &DBMap<K, V>,
1637        new_vals: impl IntoIterator<Item = (J, B)>,
1638    ) -> Result<&mut Self, TypedStoreError> {
1639        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1640            return Err(TypedStoreError::CrossDBBatch);
1641        }
1642        new_vals
1643            .into_iter()
1644            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1645                let k_buf = be_fix_int_ser(k.borrow())?;
1646                self.batch.merge_cf(&db.cf(), k_buf, v);
1647                Ok(())
1648            })?;
1649        Ok(self)
1650    }
1651}
1652
1653pub struct DBTransaction<'a> {
1654    rocksdb: Arc<RocksDB>,
1655    transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1656}
1657
1658impl<'a> DBTransaction<'a> {
1659    pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1660        Ok(Self {
1661            rocksdb: db.clone(),
1662            transaction: db.transaction()?,
1663        })
1664    }
1665
1666    pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1667        Ok(Self {
1668            rocksdb: db.clone(),
1669            transaction: db.transaction_without_snapshot()?,
1670        })
1671    }
1672
1673    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1674        &mut self,
1675        db: &DBMap<K, V>,
1676        new_vals: impl IntoIterator<Item = (J, U)>,
1677    ) -> Result<&mut Self, TypedStoreError> {
1678        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1679            return Err(TypedStoreError::CrossDBBatch);
1680        }
1681
1682        new_vals
1683            .into_iter()
1684            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1685                let k_buf = be_fix_int_ser(k.borrow())?;
1686                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1687                self.transaction
1688                    .put_cf(&db.cf(), k_buf, v_buf)
1689                    .map_err(typed_store_err_from_rocks_err)?;
1690                Ok(())
1691            })?;
1692        Ok(self)
1693    }
1694
1695    /// Deletes a set of keys given as an iterator
1696    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1697        &mut self,
1698        db: &DBMap<K, V>,
1699        purged_vals: impl IntoIterator<Item = J>,
1700    ) -> Result<&mut Self, TypedStoreError> {
1701        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1702            return Err(TypedStoreError::CrossDBBatch);
1703        }
1704        purged_vals
1705            .into_iter()
1706            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1707                let k_buf = be_fix_int_ser(k.borrow())?;
1708                self.transaction
1709                    .delete_cf(&db.cf(), k_buf)
1710                    .map_err(typed_store_err_from_rocks_err)?;
1711                Ok(())
1712            })?;
1713        Ok(self)
1714    }
1715
1716    pub fn snapshot(
1717        &self,
1718    ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1719    {
1720        self.transaction.snapshot()
1721    }
1722
1723    pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1724        &self,
1725        db: &DBMap<K, V>,
1726        key: &K,
1727    ) -> Result<Option<V>, TypedStoreError> {
1728        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1729            return Err(TypedStoreError::CrossDBBatch);
1730        }
1731        let k_buf = be_fix_int_ser(key)?;
1732        match self
1733            .transaction
1734            .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1735            .map_err(typed_store_err_from_rocks_err)?
1736        {
1737            Some(data) => Ok(Some(
1738                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1739            )),
1740            None => Ok(None),
1741        }
1742    }
1743
1744    pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1745        &self,
1746        db: &DBMap<K, V>,
1747        key: &K,
1748    ) -> Result<Option<V>, TypedStoreError> {
1749        let key_buf = be_fix_int_ser(key)?;
1750        self.transaction
1751            .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1752            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1753            .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1754    }
1755
1756    pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1757        &self,
1758        db: &DBMap<K, V>,
1759        keys: impl IntoIterator<Item = J>,
1760    ) -> Result<Vec<Option<V>>, TypedStoreError> {
1761        let cf = db.cf();
1762        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1763            .into_iter()
1764            .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1765            .collect();
1766
1767        let results = self
1768            .transaction
1769            .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1770
1771        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1772            .into_iter()
1773            .map(
1774                |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1775                    Some(data) => Ok(Some(
1776                        bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1777                    )),
1778                    None => Ok(None),
1779                },
1780            )
1781            .collect();
1782
1783        values_parsed
1784    }
1785
1786    pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1787        &'a self,
1788        db: &DBMap<K, V>,
1789    ) -> Iter<'a, K, V> {
1790        let db_iter = self
1791            .transaction
1792            .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1793        Iter::new(
1794            db.cf.clone(),
1795            RocksDBRawIter::OptimisticTransaction(db_iter),
1796            None,
1797            None,
1798            None,
1799            None,
1800            None,
1801        )
1802    }
1803
1804    pub fn keys<K: DeserializeOwned, V: DeserializeOwned>(
1805        &'a self,
1806        db: &DBMap<K, V>,
1807    ) -> Keys<'a, K> {
1808        let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1809            self.transaction
1810                .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1811        );
1812        db_iter.seek_to_first();
1813
1814        Keys::new(db_iter)
1815    }
1816
1817    pub fn values<K: DeserializeOwned, V: DeserializeOwned>(
1818        &'a self,
1819        db: &DBMap<K, V>,
1820    ) -> Values<'a, V> {
1821        let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1822            self.transaction
1823                .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1824        );
1825        db_iter.seek_to_first();
1826
1827        Values::new(db_iter)
1828    }
1829
1830    pub fn commit(self) -> Result<(), TypedStoreError> {
1831        fail_point!("transaction-commit");
1832        self.transaction.commit().map_err(|e| match e.kind() {
1833            // empirically, this is what you get when there is a write conflict. it is not
1834            // documented whether this is the only time you can get this error.
1835            ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1836            _ => typed_store_err_from_rocks_err(e),
1837        })?;
1838        Ok(())
1839    }
1840}
1841
1842macro_rules! delegate_iter_call {
1843    ($self:ident.$method:ident($($args:ident),*)) => {
1844        match $self {
1845            Self::DB(db) => db.$method($($args),*),
1846            Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1847            Self::OptimisticTransaction(db) => db.$method($($args),*),
1848        }
1849    }
1850}
1851
1852pub enum RocksDBRawIter<'a> {
1853    DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1854    OptimisticTransactionDB(
1855        rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1856    ),
1857    OptimisticTransaction(
1858        rocksdb::DBRawIteratorWithThreadMode<
1859            'a,
1860            Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1861        >,
1862    ),
1863}
1864
1865impl RocksDBRawIter<'_> {
1866    pub fn valid(&self) -> bool {
1867        delegate_iter_call!(self.valid())
1868    }
1869    pub fn key(&self) -> Option<&[u8]> {
1870        delegate_iter_call!(self.key())
1871    }
1872    pub fn value(&self) -> Option<&[u8]> {
1873        delegate_iter_call!(self.value())
1874    }
1875    pub fn next(&mut self) {
1876        delegate_iter_call!(self.next())
1877    }
1878    pub fn prev(&mut self) {
1879        delegate_iter_call!(self.prev())
1880    }
1881    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1882        delegate_iter_call!(self.seek(key))
1883    }
1884    pub fn seek_to_last(&mut self) {
1885        delegate_iter_call!(self.seek_to_last())
1886    }
1887    pub fn seek_to_first(&mut self) {
1888        delegate_iter_call!(self.seek_to_first())
1889    }
1890    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1891        delegate_iter_call!(self.seek_for_prev(key))
1892    }
1893    pub fn status(&self) -> Result<(), rocksdb::Error> {
1894        delegate_iter_call!(self.status())
1895    }
1896}
1897
1898pub enum RocksDBIter<'a> {
1899    DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1900    OptimisticTransactionDB(
1901        rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1902    ),
1903}
1904
1905impl Iterator for RocksDBIter<'_> {
1906    type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1907    fn next(&mut self) -> Option<Self::Item> {
1908        match self {
1909            Self::DB(db) => db.next(),
1910            Self::OptimisticTransactionDB(db) => db.next(),
1911        }
1912    }
1913}
1914
1915impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1916where
1917    K: Serialize + DeserializeOwned,
1918    V: Serialize + DeserializeOwned,
1919{
1920    type Error = TypedStoreError;
1921    type Iterator = Iter<'a, K, V>;
1922    type SafeIterator = SafeIter<'a, K, V>;
1923    type Keys = Keys<'a, K>;
1924    type Values = Values<'a, V>;
1925
1926    #[instrument(level = "trace", skip_all, err)]
1927    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1928        let key_buf = be_fix_int_ser(key)?;
1929        // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
1930        // but no false negatives. We use it to short-circuit the absent case
1931        let readopts = self.opts.readopts();
1932        Ok(self
1933            .rocksdb
1934            .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1935            && self
1936                .rocksdb
1937                .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1938                .map_err(typed_store_err_from_rocks_err)?
1939                .is_some())
1940    }
1941
1942    #[instrument(level = "trace", skip_all, err)]
1943    fn multi_contains_keys<J>(
1944        &self,
1945        keys: impl IntoIterator<Item = J>,
1946    ) -> Result<Vec<bool>, Self::Error>
1947    where
1948        J: Borrow<K>,
1949    {
1950        let values = self.multi_get_pinned(keys)?;
1951        Ok(values.into_iter().map(|v| v.is_some()).collect())
1952    }
1953
1954    #[instrument(level = "trace", skip_all, err)]
1955    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1956        let _timer = self
1957            .db_metrics
1958            .op_metrics
1959            .rocksdb_get_latency_seconds
1960            .with_label_values(&[&self.cf])
1961            .start_timer();
1962        let perf_ctx = if self.get_sample_interval.sample() {
1963            Some(RocksDBPerfContext)
1964        } else {
1965            None
1966        };
1967        let key_buf = be_fix_int_ser(key)?;
1968        let res = self
1969            .rocksdb
1970            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1971            .map_err(typed_store_err_from_rocks_err)?;
1972        self.db_metrics
1973            .op_metrics
1974            .rocksdb_get_bytes
1975            .with_label_values(&[&self.cf])
1976            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1977        if perf_ctx.is_some() {
1978            self.db_metrics
1979                .read_perf_ctx_metrics
1980                .report_metrics(&self.cf);
1981        }
1982        match res {
1983            Some(data) => Ok(Some(
1984                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1985            )),
1986            None => Ok(None),
1987        }
1988    }
1989
1990    #[instrument(level = "trace", skip_all, err)]
1991    fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
1992        let _timer = self
1993            .db_metrics
1994            .op_metrics
1995            .rocksdb_get_latency_seconds
1996            .with_label_values(&[&self.cf])
1997            .start_timer();
1998        let perf_ctx = if self.get_sample_interval.sample() {
1999            Some(RocksDBPerfContext)
2000        } else {
2001            None
2002        };
2003        let key_buf = be_fix_int_ser(key)?;
2004        let res = self
2005            .rocksdb
2006            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
2007            .map_err(typed_store_err_from_rocks_err)?;
2008        self.db_metrics
2009            .op_metrics
2010            .rocksdb_get_bytes
2011            .with_label_values(&[&self.cf])
2012            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
2013        if perf_ctx.is_some() {
2014            self.db_metrics
2015                .read_perf_ctx_metrics
2016                .report_metrics(&self.cf);
2017        }
2018        match res {
2019            Some(data) => Ok(Some(data.to_vec())),
2020            None => Ok(None),
2021        }
2022    }
2023
2024    #[instrument(level = "trace", skip_all, err)]
2025    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
2026        let timer = self
2027            .db_metrics
2028            .op_metrics
2029            .rocksdb_put_latency_seconds
2030            .with_label_values(&[&self.cf])
2031            .start_timer();
2032        let perf_ctx = if self.write_sample_interval.sample() {
2033            Some(RocksDBPerfContext)
2034        } else {
2035            None
2036        };
2037        let key_buf = be_fix_int_ser(key)?;
2038        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
2039        self.db_metrics
2040            .op_metrics
2041            .rocksdb_put_bytes
2042            .with_label_values(&[&self.cf])
2043            .observe((key_buf.len() + value_buf.len()) as f64);
2044        if perf_ctx.is_some() {
2045            self.db_metrics
2046                .write_perf_ctx_metrics
2047                .report_metrics(&self.cf);
2048        }
2049        self.rocksdb
2050            .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
2051            .map_err(typed_store_err_from_rocks_err)?;
2052
2053        let elapsed = timer.stop_and_record();
2054        if elapsed > 1.0 {
2055            warn!(?elapsed, cf = ?self.cf, "very slow insert");
2056            self.db_metrics
2057                .op_metrics
2058                .rocksdb_very_slow_puts_count
2059                .with_label_values(&[&self.cf])
2060                .inc();
2061            self.db_metrics
2062                .op_metrics
2063                .rocksdb_very_slow_puts_duration_ms
2064                .with_label_values(&[&self.cf])
2065                .inc_by((elapsed * 1000.0) as u64);
2066        }
2067
2068        Ok(())
2069    }
2070
2071    #[instrument(level = "trace", skip_all, err)]
2072    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
2073        let _timer = self
2074            .db_metrics
2075            .op_metrics
2076            .rocksdb_delete_latency_seconds
2077            .with_label_values(&[&self.cf])
2078            .start_timer();
2079        let perf_ctx = if self.write_sample_interval.sample() {
2080            Some(RocksDBPerfContext)
2081        } else {
2082            None
2083        };
2084        let key_buf = be_fix_int_ser(key)?;
2085        self.rocksdb
2086            .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2087            .map_err(typed_store_err_from_rocks_err)?;
2088        self.db_metrics
2089            .op_metrics
2090            .rocksdb_deletes
2091            .with_label_values(&[&self.cf])
2092            .inc();
2093        if perf_ctx.is_some() {
2094            self.db_metrics
2095                .write_perf_ctx_metrics
2096                .report_metrics(&self.cf);
2097        }
2098        Ok(())
2099    }
2100
2101    /// Deletes a range of keys between `from` (inclusive) and `to`
2102    /// (non-inclusive) by immediately deleting any sst files whose key
2103    /// range overlaps with the range. Files whose range only partially
2104    /// overlaps with the range are not deleted. This can be useful for
2105    /// quickly removing a large amount of data without having
2106    /// to delete individual keys. Only files at level 1 or higher are
2107    /// considered ( Level 0 files are skipped). It doesn't guarantee that
2108    /// all keys in the range are deleted, as there might be keys in files
2109    /// that weren't entirely within the range.
2110    #[instrument(level = "trace", skip_all, err)]
2111    fn delete_file_in_range(&self, from: &K, to: &K) -> Result<(), TypedStoreError> {
2112        let from_buf = be_fix_int_ser(from.borrow())?;
2113        let to_buf = be_fix_int_ser(to.borrow())?;
2114        self.rocksdb
2115            .delete_file_in_range(&self.cf(), from_buf, to_buf)
2116            .map_err(typed_store_err_from_rocks_err)?;
2117        Ok(())
2118    }
2119
2120    /// This method first drops the existing column family and then creates a
2121    /// new one with the same name. The two operations are not atomic and
2122    /// hence it is possible to get into a race condition where the column
2123    /// family has been dropped but new one is not created yet
2124    #[instrument(level = "trace", skip_all, err)]
2125    fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2126        let _ = self.rocksdb.drop_cf(&self.cf);
2127        self.rocksdb
2128            .create_cf(self.cf.clone(), &default_db_options().options)
2129            .map_err(typed_store_err_from_rocks_err)?;
2130        Ok(())
2131    }
2132
2133    /// Writes a range delete tombstone to delete all entries in the db map
2134    /// If the DBMap is configured with ignore_range_deletions set to false,
2135    /// the effect of this write will be visible immediately i.e. you won't
2136    /// see old values when you do a lookup or scan. But if it is configured
2137    /// with ignore_range_deletions set to true, the old value are visible until
2138    /// compaction actually deletes them which will happen sometime after. By
2139    /// default ignore_range_deletions is set to true on a DBMap (unless it is
2140    /// overridden in the config), so please use this function with caution
2141    #[instrument(level = "trace", skip_all, err)]
2142    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2143        let mut iter = self.unbounded_iter().seek_to_first();
2144        let first_key = iter.next().map(|(k, _v)| k);
2145        let last_key = iter.skip_to_last().next().map(|(k, _v)| k);
2146        if let Some((first_key, last_key)) = first_key.zip(last_key) {
2147            let mut batch = self.batch();
2148            batch.schedule_delete_range(self, &first_key, &last_key)?;
2149            batch.write()?;
2150        }
2151        Ok(())
2152    }
2153
2154    fn is_empty(&self) -> bool {
2155        self.safe_iter().next().is_none()
2156    }
2157
2158    /// Returns an unbounded iterator visiting each key-value pair in the map.
2159    /// This is potentially unsafe as it can perform a full table scan
2160    fn unbounded_iter(&'a self) -> Self::Iterator {
2161        let db_iter = self
2162            .rocksdb
2163            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2164        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2165        Iter::new(
2166            self.cf.clone(),
2167            db_iter,
2168            _timer,
2169            _perf_ctx,
2170            bytes_scanned,
2171            keys_scanned,
2172            Some(self.db_metrics.clone()),
2173        )
2174    }
2175
2176    /// Returns an iterator visiting each key-value pair in the map. By proving
2177    /// bounds of the scan range, RocksDB scan avoid unnecessary scans.
2178    /// Lower bound is inclusive, while upper bound is exclusive.
2179    fn iter_with_bounds(
2180        &'a self,
2181        lower_bound: Option<K>,
2182        upper_bound: Option<K>,
2183    ) -> Self::Iterator {
2184        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2185        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2186        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2187        Iter::new(
2188            self.cf.clone(),
2189            db_iter,
2190            _timer,
2191            _perf_ctx,
2192            bytes_scanned,
2193            keys_scanned,
2194            Some(self.db_metrics.clone()),
2195        )
2196    }
2197
2198    /// Similar to `iter_with_bounds` but allows specifying
2199    /// inclusivity/exclusivity of ranges explicitly. TODO: find better name
2200    fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2201        let readopts = self.create_read_options_with_range(range);
2202        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2203        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2204        Iter::new(
2205            self.cf.clone(),
2206            db_iter,
2207            _timer,
2208            _perf_ctx,
2209            bytes_scanned,
2210            keys_scanned,
2211            Some(self.db_metrics.clone()),
2212        )
2213    }
2214
2215    fn safe_iter(&'a self) -> Self::SafeIterator {
2216        let db_iter = self
2217            .rocksdb
2218            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2219        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2220        SafeIter::new(
2221            self.cf.clone(),
2222            db_iter,
2223            _timer,
2224            _perf_ctx,
2225            bytes_scanned,
2226            keys_scanned,
2227            Some(self.db_metrics.clone()),
2228        )
2229    }
2230
2231    fn safe_iter_with_bounds(
2232        &'a self,
2233        lower_bound: Option<K>,
2234        upper_bound: Option<K>,
2235    ) -> Self::SafeIterator {
2236        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2237        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2238        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2239        SafeIter::new(
2240            self.cf.clone(),
2241            db_iter,
2242            _timer,
2243            _perf_ctx,
2244            bytes_scanned,
2245            keys_scanned,
2246            Some(self.db_metrics.clone()),
2247        )
2248    }
2249
2250    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2251        let readopts = self.create_read_options_with_range(range);
2252        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2253        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2254        SafeIter::new(
2255            self.cf.clone(),
2256            db_iter,
2257            _timer,
2258            _perf_ctx,
2259            bytes_scanned,
2260            keys_scanned,
2261            Some(self.db_metrics.clone()),
2262        )
2263    }
2264
2265    fn keys(&'a self) -> Self::Keys {
2266        let mut db_iter = self
2267            .rocksdb
2268            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2269        db_iter.seek_to_first();
2270
2271        Keys::new(db_iter)
2272    }
2273
2274    fn values(&'a self) -> Self::Values {
2275        let mut db_iter = self
2276            .rocksdb
2277            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2278        db_iter.seek_to_first();
2279
2280        Values::new(db_iter)
2281    }
2282
2283    /// Returns a vector of raw values corresponding to the keys provided.
2284    #[instrument(level = "trace", skip_all, err)]
2285    fn multi_get_raw_bytes<J>(
2286        &self,
2287        keys: impl IntoIterator<Item = J>,
2288    ) -> Result<Vec<Option<Vec<u8>>>, TypedStoreError>
2289    where
2290        J: Borrow<K>,
2291    {
2292        let results = self
2293            .multi_get_pinned(keys)?
2294            .into_iter()
2295            .map(|val| val.map(|v| v.to_vec()))
2296            .collect();
2297        Ok(results)
2298    }
2299
2300    /// Returns a vector of values corresponding to the keys provided.
2301    #[instrument(level = "trace", skip_all, err)]
2302    fn multi_get<J>(
2303        &self,
2304        keys: impl IntoIterator<Item = J>,
2305    ) -> Result<Vec<Option<V>>, TypedStoreError>
2306    where
2307        J: Borrow<K>,
2308    {
2309        let results = self.multi_get_pinned(keys)?;
2310        let values_parsed: Result<Vec<_>, TypedStoreError> = results
2311            .into_iter()
2312            .map(|value_byte| match value_byte {
2313                Some(data) => Ok(Some(
2314                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2315                )),
2316                None => Ok(None),
2317            })
2318            .collect();
2319
2320        values_parsed
2321    }
2322
2323    /// Returns a vector of values corresponding to the keys provided.
2324    #[instrument(level = "trace", skip_all, err)]
2325    fn chunked_multi_get<J>(
2326        &self,
2327        keys: impl IntoIterator<Item = J>,
2328        chunk_size: usize,
2329    ) -> Result<Vec<Option<V>>, TypedStoreError>
2330    where
2331        J: Borrow<K>,
2332    {
2333        let cf = self.cf();
2334        let keys_bytes = keys
2335            .into_iter()
2336            .map(|k| (&cf, be_fix_int_ser(k.borrow()).unwrap()));
2337        let chunked_keys = keys_bytes.into_iter().chunks(chunk_size);
2338        let snapshot = self.snapshot()?;
2339        let mut results = vec![];
2340        for chunk in chunked_keys.into_iter() {
2341            let chunk_result = snapshot.multi_get_cf(chunk);
2342            let values_parsed: Result<Vec<_>, TypedStoreError> = chunk_result
2343                .into_iter()
2344                .map(|value_byte| {
2345                    let value_byte = value_byte.map_err(typed_store_err_from_rocks_err)?;
2346                    match value_byte {
2347                        Some(data) => Ok(Some(
2348                            bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2349                        )),
2350                        None => Ok(None),
2351                    }
2352                })
2353                .collect();
2354            results.extend(values_parsed?);
2355        }
2356        Ok(results)
2357    }
2358
2359    /// Convenience method for batch insertion
2360    #[instrument(level = "trace", skip_all, err)]
2361    fn multi_insert<J, U>(
2362        &self,
2363        key_val_pairs: impl IntoIterator<Item = (J, U)>,
2364    ) -> Result<(), Self::Error>
2365    where
2366        J: Borrow<K>,
2367        U: Borrow<V>,
2368    {
2369        let mut batch = self.batch();
2370        batch.insert_batch(self, key_val_pairs)?;
2371        batch.write()
2372    }
2373
2374    /// Convenience method for batch removal
2375    #[instrument(level = "trace", skip_all, err)]
2376    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2377    where
2378        J: Borrow<K>,
2379    {
2380        let mut batch = self.batch();
2381        batch.delete_batch(self, keys)?;
2382        batch.write()
2383    }
2384
2385    /// Try to catch up with primary when running as secondary
2386    #[instrument(level = "trace", skip_all, err)]
2387    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2388        self.rocksdb
2389            .try_catch_up_with_primary()
2390            .map_err(typed_store_err_from_rocks_err)
2391    }
2392}
2393
2394impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2395where
2396    J: Borrow<K>,
2397    U: Borrow<V>,
2398    K: Serialize,
2399    V: Serialize,
2400{
2401    type Error = TypedStoreError;
2402
2403    fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2404    where
2405        T: Iterator<Item = (J, U)>,
2406    {
2407        let mut batch = self.batch();
2408        batch.insert_batch(self, iter)?;
2409        batch.write()
2410    }
2411
2412    fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2413        let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2414        let mut batch = self.batch();
2415        batch.insert_batch(self, slice_of_refs)?;
2416        batch.write()
2417    }
2418}
2419
2420pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2421    env::var(var_name)
2422        .ok()?
2423        .parse::<usize>()
2424        .tap_err(|e| {
2425            warn!(
2426                "Env var {} does not contain valid usize integer: {}",
2427                var_name, e
2428            )
2429        })
2430        .ok()
2431}
2432
2433#[derive(Clone, Debug)]
2434pub struct ReadWriteOptions {
2435    pub ignore_range_deletions: bool,
2436    // Whether to sync to disk on every write.
2437    sync_to_disk: bool,
2438}
2439
2440impl ReadWriteOptions {
2441    pub fn readopts(&self) -> ReadOptions {
2442        let mut readopts = ReadOptions::default();
2443        readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2444        readopts
2445    }
2446
2447    pub fn writeopts(&self) -> WriteOptions {
2448        let mut opts = WriteOptions::default();
2449        opts.set_sync(self.sync_to_disk);
2450        opts
2451    }
2452
2453    pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2454        self.ignore_range_deletions = ignore;
2455        self
2456    }
2457}
2458
2459impl Default for ReadWriteOptions {
2460    fn default() -> Self {
2461        Self {
2462            ignore_range_deletions: true,
2463            sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2464        }
2465    }
2466}
2467// TODO: refactor this into a builder pattern, where rocksdb::Options are
2468// generated after a call to build().
2469#[derive(Default, Clone)]
2470pub struct DBOptions {
2471    pub options: rocksdb::Options,
2472    pub rw_options: ReadWriteOptions,
2473}
2474
2475impl DBOptions {
2476    // Optimize lookup perf for tables where no scans are performed.
2477    // If non-trivial number of values can be > 512B in size, it is beneficial to
2478    // also specify optimize_for_large_values_no_scan().
2479    pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2480        // NOTE: this overwrites the block options.
2481        self.options
2482            .optimize_for_point_lookup(block_cache_size_mb as u64);
2483        self
2484    }
2485
2486    // Optimize write and lookup perf for tables which are rarely scanned, and have
2487    // large values. https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html
2488    pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2489        if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2490            info!("Large value blob storage optimization is disabled via env var.");
2491            return self;
2492        }
2493
2494        // Blob settings.
2495        self.options.set_enable_blob_files(true);
2496        self.options
2497            .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2498        self.options.set_enable_blob_gc(true);
2499        // Since each blob can have non-trivial size overhead, and compression does not
2500        // work across blobs, set a min blob size in bytes to so small
2501        // transactions and effects are kept in sst files.
2502        self.options.set_min_blob_size(min_blob_size);
2503
2504        // Increase write buffer size to 256MiB.
2505        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2506            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2507            * 1024
2508            * 1024;
2509        self.options.set_write_buffer_size(write_buffer_size);
2510        // Since large blobs are not in sst files, reduce the target file size and base
2511        // level target size.
2512        let target_file_size_base = 64 << 20;
2513        self.options
2514            .set_target_file_size_base(target_file_size_base);
2515        // Level 1 default to 64MiB * 4 ~ 256MiB.
2516        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2517            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2518        self.options
2519            .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2520
2521        self
2522    }
2523
2524    // Optimize tables with a mix of lookup and scan workloads.
2525    pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2526        self.options
2527            .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2528        self
2529    }
2530
2531    // Optimize DB receiving significant insertions.
2532    pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2533        self.options
2534            .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2535        self.options
2536            .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2537        self
2538    }
2539
2540    // Optimize tables receiving significant insertions.
2541    pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2542        // Increase write buffer size to 256MiB.
2543        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2544            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2545            * 1024
2546            * 1024;
2547        self.options.set_write_buffer_size(write_buffer_size);
2548        // Increase write buffers to keep to 6 before slowing down writes.
2549        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2550            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2551        self.options
2552            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2553        // Keep 1 write buffer so recent writes can be read from memory.
2554        self.options
2555            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2556
2557        // Increase compaction trigger for level 0 to 6.
2558        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2559            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2560        self.options.set_level_zero_file_num_compaction_trigger(
2561            max_level_zero_file_num.try_into().unwrap(),
2562        );
2563        self.options.set_level_zero_slowdown_writes_trigger(
2564            (max_level_zero_file_num * 12).try_into().unwrap(),
2565        );
2566        self.options
2567            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2568
2569        // Increase sst file size to 128MiB.
2570        self.options.set_target_file_size_base(
2571            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2572                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2573                * 1024
2574                * 1024,
2575        );
2576
2577        // Increase level 1 target size to 256MiB * 6 ~ 1.5GiB.
2578        self.options
2579            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2580
2581        self
2582    }
2583
2584    // Optimize tables receiving significant insertions, without any deletions.
2585    // TODO: merge this function with optimize_for_write_throughput(), and use a
2586    // flag to indicate if deletion is received.
2587    pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2588        // Increase write buffer size to 256MiB.
2589        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2590            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2591            * 1024
2592            * 1024;
2593        self.options.set_write_buffer_size(write_buffer_size);
2594        // Increase write buffers to keep to 6 before slowing down writes.
2595        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2596            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2597        self.options
2598            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2599        // Keep 1 write buffer so recent writes can be read from memory.
2600        self.options
2601            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2602
2603        // Switch to universal compactions.
2604        self.options
2605            .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2606        let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2607        compaction_options.set_max_size_amplification_percent(10000);
2608        compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2609        self.options
2610            .set_universal_compaction_options(&compaction_options);
2611
2612        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2613            .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2614        self.options.set_level_zero_file_num_compaction_trigger(
2615            max_level_zero_file_num.try_into().unwrap(),
2616        );
2617        self.options.set_level_zero_slowdown_writes_trigger(
2618            (max_level_zero_file_num * 12).try_into().unwrap(),
2619        );
2620        self.options
2621            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2622
2623        // Increase sst file size to 128MiB.
2624        self.options.set_target_file_size_base(
2625            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2626                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2627                * 1024
2628                * 1024,
2629        );
2630
2631        // This should be a no-op for universal compaction but increasing it to be safe.
2632        self.options
2633            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2634
2635        self
2636    }
2637
2638    // Overrides the block options with different block cache size and block size.
2639    pub fn set_block_options(
2640        mut self,
2641        block_cache_size_mb: usize,
2642        block_size_bytes: usize,
2643    ) -> DBOptions {
2644        self.options
2645            .set_block_based_table_factory(&get_block_options(
2646                block_cache_size_mb,
2647                block_size_bytes,
2648            ));
2649        self
2650    }
2651
2652    // Disables write stalling and stopping based on pending compaction bytes.
2653    pub fn disable_write_throttling(mut self) -> DBOptions {
2654        self.options.set_soft_pending_compaction_bytes_limit(0);
2655        self.options.set_hard_pending_compaction_bytes_limit(0);
2656        self
2657    }
2658}
2659
2660/// Creates a default RocksDB option, to be used when RocksDB option is
2661/// unspecified.
2662pub fn default_db_options() -> DBOptions {
2663    let mut opt = rocksdb::Options::default();
2664
2665    // One common issue when running tests on Mac is that the default ulimit is too
2666    // low, leading to I/O errors such as "Too many open files". Raising fdlimit
2667    // to bypass it.
2668    if let Some(limit) = fdlimit::raise_fd_limit() {
2669        // on windows raise_fd_limit return None
2670        opt.set_max_open_files((limit / 8) as i32);
2671    }
2672
2673    // The table cache is locked for updates and this determines the number
2674    // of shards, ie 2^10. Increase in case of lock contentions.
2675    opt.set_table_cache_num_shard_bits(10);
2676
2677    // LSM compression settings
2678    opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2679    opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2680    opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2681
2682    // IOTA uses multiple RocksDB in a node, so total sizes of write buffers and WAL
2683    // can be higher than the limits below.
2684    //
2685    // RocksDB also exposes the option to configure total write buffer size across
2686    // multiple instances via `write_buffer_manager`. But the write buffer flush
2687    // policy (flushing the buffer receiving the next write) may not work well.
2688    // So sticking to per-db write buffer size limit for now.
2689    //
2690    // The environment variables are only meant to be emergency overrides. They may
2691    // go away in future. It is preferable to update the default value, or
2692    // override the option in code.
2693    opt.set_db_write_buffer_size(
2694        read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2695            * 1024
2696            * 1024,
2697    );
2698    opt.set_max_total_wal_size(
2699        read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2700    );
2701
2702    // Num threads for compactions and memtable flushes.
2703    opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2704
2705    opt.set_enable_pipelined_write(true);
2706
2707    // Increase block size to 16KiB.
2708    // https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md#indexes-and-filter-blocks
2709    opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2710
2711    // Set memtable bloomfilter.
2712    opt.set_memtable_prefix_bloom_ratio(0.02);
2713
2714    DBOptions {
2715        options: opt,
2716        rw_options: ReadWriteOptions::default(),
2717    }
2718}
2719
2720fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2721    // Set options mostly similar to those used in optimize_for_point_lookup(),
2722    // except non-default binary and hash index, to hopefully reduce lookup
2723    // latencies without causing any regression for scanning, with slightly more
2724    // memory usages. https://github.com/facebook/rocksdb/blob/11cb6af6e5009c51794641905ca40ce5beec7fee/options/options.cc#L611-L621
2725    let mut block_options = BlockBasedOptions::default();
2726    // Overrides block size.
2727    block_options.set_block_size(block_size_bytes);
2728    // Configure a block cache.
2729    block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2730    // Set a bloomfilter with 1% false positive rate.
2731    block_options.set_bloom_filter(10.0, false);
2732    // From https://github.com/EighteenZi/rocksdb_wiki/blob/master/Block-Cache.md#caching-index-and-filter-blocks
2733    block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2734    block_options
2735}
2736
2737/// Opens a database with options, and a number of column families that are
2738/// created if they do not exist.
2739#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2740pub fn open_cf<P: AsRef<Path>>(
2741    path: P,
2742    db_options: Option<rocksdb::Options>,
2743    metric_conf: MetricConf,
2744    opt_cfs: &[&str],
2745) -> Result<Arc<RocksDB>, TypedStoreError> {
2746    let options = db_options.unwrap_or_else(|| default_db_options().options);
2747    let column_descriptors: Vec<_> = opt_cfs
2748        .iter()
2749        .map(|name| (*name, options.clone()))
2750        .collect();
2751    open_cf_opts(
2752        path,
2753        Some(options.clone()),
2754        metric_conf,
2755        &column_descriptors[..],
2756    )
2757}
2758
2759fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2760    // Customize database options
2761    let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2762    options.create_if_missing(true);
2763    options.create_missing_column_families(true);
2764    options
2765}
2766
2767/// Opens a database with options, and a number of column families with
2768/// individual options that are created if they do not exist.
2769#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2770pub fn open_cf_opts<P: AsRef<Path>>(
2771    path: P,
2772    db_options: Option<rocksdb::Options>,
2773    metric_conf: MetricConf,
2774    opt_cfs: &[(&str, rocksdb::Options)],
2775) -> Result<Arc<RocksDB>, TypedStoreError> {
2776    let path = path.as_ref();
2777    // In the simulator, we intercept the wall clock in the test thread only. This
2778    // causes problems because rocksdb uses the simulated clock when creating
2779    // its background threads, but then those threads see the real wall clock
2780    // (because they are not the test thread), which causes rocksdb to panic.
2781    // The `nondeterministic` macro evaluates expressions in new threads, which
2782    // resolves the issue.
2783    //
2784    // This is a no-op in non-simulator builds.
2785
2786    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2787    nondeterministic!({
2788        let options = prepare_db_options(db_options);
2789        let rocksdb = {
2790            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2791                &options,
2792                path,
2793                cfs.into_iter()
2794                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2795            )
2796            .map_err(typed_store_err_from_rocks_err)?
2797        };
2798        Ok(Arc::new(RocksDB::DBWithThreadMode(
2799            DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2800        )))
2801    })
2802}
2803
2804/// Opens a database with options, and a number of column families with
2805/// individual options that are created if they do not exist.
2806#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2807pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2808    path: P,
2809    db_options: Option<rocksdb::Options>,
2810    metric_conf: MetricConf,
2811    opt_cfs: &[(&str, rocksdb::Options)],
2812) -> Result<Arc<RocksDB>, TypedStoreError> {
2813    let path = path.as_ref();
2814    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2815    // See comment above for explanation of why nondeterministic is necessary here.
2816    nondeterministic!({
2817        let options = prepare_db_options(db_options);
2818        let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2819            &options,
2820            path,
2821            cfs.into_iter()
2822                .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2823        )
2824        .map_err(typed_store_err_from_rocks_err)?;
2825        Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2826            OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2827        )))
2828    })
2829}
2830
2831/// Opens a database with options, and a number of column families with
2832/// individual options that are created if they do not exist.
2833pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2834    primary_path: P,
2835    secondary_path: Option<P>,
2836    db_options: Option<rocksdb::Options>,
2837    metric_conf: MetricConf,
2838    opt_cfs: &[(&str, rocksdb::Options)],
2839) -> Result<Arc<RocksDB>, TypedStoreError> {
2840    let primary_path = primary_path.as_ref();
2841    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2842    // See comment above for explanation of why nondeterministic is necessary here.
2843    nondeterministic!({
2844        // Customize database options
2845        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2846
2847        fdlimit::raise_fd_limit();
2848        // This is a requirement by RocksDB when opening as secondary
2849        options.set_max_open_files(-1);
2850
2851        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2852        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2853            .ok()
2854            .unwrap_or_default();
2855
2856        let default_db_options = default_db_options();
2857        // Add CFs not explicitly listed
2858        for cf_key in cfs.iter() {
2859            if !opt_cfs.contains_key(&cf_key[..]) {
2860                opt_cfs.insert(cf_key, default_db_options.options.clone());
2861            }
2862        }
2863
2864        let primary_path = primary_path.to_path_buf();
2865        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2866            let mut s = primary_path.clone();
2867            s.pop();
2868            s.push("SECONDARY");
2869            s.as_path().to_path_buf()
2870        });
2871
2872        let rocksdb = {
2873            options.create_if_missing(true);
2874            options.create_missing_column_families(true);
2875            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2876                &options,
2877                &primary_path,
2878                &secondary_path,
2879                opt_cfs
2880                    .iter()
2881                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2882            )
2883            .map_err(typed_store_err_from_rocks_err)?;
2884            db.try_catch_up_with_primary()
2885                .map_err(typed_store_err_from_rocks_err)?;
2886            db
2887        };
2888        Ok(Arc::new(RocksDB::DBWithThreadMode(
2889            DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2890        )))
2891    })
2892}
2893
2894pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2895    const DB_DEFAULT_CF_NAME: &str = "default";
2896
2897    let opts = rocksdb::Options::default();
2898    rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2899        .map_err(|e| e.into())
2900        .map(|q| {
2901            q.iter()
2902                .filter_map(|s| {
2903                    // The `default` table is not used
2904                    if s != DB_DEFAULT_CF_NAME {
2905                        Some(s.clone())
2906                    } else {
2907                        None
2908                    }
2909                })
2910                .collect()
2911        })
2912}
2913
2914/// TODO: Good description of why we're doing this : RocksDB stores keys in BE and has a seek operator on iterators, see `https://github.com/facebook/rocksdb/wiki/Iterator#introduction`
2915#[inline]
2916pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2917where
2918    S: ?Sized + serde::Serialize,
2919{
2920    bincode::DefaultOptions::new()
2921        .with_big_endian()
2922        .with_fixint_encoding()
2923        .serialize(t)
2924        .map_err(typed_store_err_from_bincode_err)
2925}
2926
2927#[derive(Clone)]
2928pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2929impl DBMapTableConfigMap {
2930    pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2931        Self(map)
2932    }
2933
2934    pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2935        self.0.clone()
2936    }
2937}
2938
2939pub enum RocksDBAccessType {
2940    Primary,
2941    Secondary(Option<PathBuf>),
2942}
2943
2944pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2945    rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2946}
2947
2948fn populate_missing_cfs(
2949    input_cfs: &[(&str, rocksdb::Options)],
2950    path: &Path,
2951) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2952    let mut cfs = vec![];
2953    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2954    let existing_cfs =
2955        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2956            .ok()
2957            .unwrap_or_default();
2958
2959    for cf_name in existing_cfs {
2960        if !input_cf_index.contains(&cf_name[..]) {
2961            cfs.push((cf_name, rocksdb::Options::default()));
2962        }
2963    }
2964    cfs.extend(
2965        input_cfs
2966            .iter()
2967            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2968    );
2969    Ok(cfs)
2970}
2971
2972/// Given a vec<u8>, find the value which is one more than the vector
2973/// if the vector was a big endian number.
2974/// If the vector is already minimum, don't change it.
2975fn big_endian_saturating_add_one(v: &mut [u8]) {
2976    if is_max(v) {
2977        return;
2978    }
2979    for i in (0..v.len()).rev() {
2980        if v[i] == u8::MAX {
2981            v[i] = 0;
2982        } else {
2983            v[i] += 1;
2984            break;
2985        }
2986    }
2987}
2988
2989/// Check if all the bytes in the vector are 0xFF
2990fn is_max(v: &[u8]) -> bool {
2991    v.iter().all(|&x| x == u8::MAX)
2992}
2993
2994// `clippy::manual_div_ceil` is raised by code expanded by the
2995// `uint::construct_uint!` macro so it needs to be fixed by `uint`
2996#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2997#[test]
2998fn test_helpers() {
2999    let v = vec![];
3000    assert!(is_max(&v));
3001
3002    fn check_add(v: Vec<u8>) {
3003        let mut v = v;
3004        let num = Num32::from_big_endian(&v);
3005        big_endian_saturating_add_one(&mut v);
3006        assert!(num + 1 == Num32::from_big_endian(&v));
3007    }
3008
3009    uint::construct_uint! {
3010        // 32 byte number
3011        struct Num32(4);
3012    }
3013
3014    let mut v = vec![255; 32];
3015    big_endian_saturating_add_one(&mut v);
3016    assert!(Num32::MAX == Num32::from_big_endian(&v));
3017
3018    check_add(vec![1; 32]);
3019    check_add(vec![6; 32]);
3020    check_add(vec![254; 32]);
3021
3022    // TBD: More tests coming with randomized arrays
3023}