typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod keys;
8pub(crate) mod safe_iter;
9pub mod util;
10pub(crate) mod values;
11
12use std::{
13    borrow::Borrow,
14    collections::{BTreeMap, HashSet},
15    env,
16    ffi::CStr,
17    marker::PhantomData,
18    ops::{Bound, RangeBounds},
19    path::{Path, PathBuf},
20    sync::Arc,
21    time::Duration,
22};
23
24use bincode::Options;
25use collectable::TryExtend;
26use iota_macros::{fail_point, nondeterministic};
27use itertools::Itertools;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::{
30    AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
31    ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
32    IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
33    ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
34    WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
35};
36use serde::{Serialize, de::DeserializeOwned};
37use tap::TapFallible;
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, instrument, warn};
40
41use self::{iter::Iter, keys::Keys, values::Values};
42use crate::{
43    TypedStoreError,
44    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
45    rocks::{
46        errors::{
47            typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
48            typed_store_err_from_rocks_err,
49        },
50        safe_iter::SafeIter,
51    },
52    traits::{Map, TableSummary},
53};
54
55// Write buffer size per RocksDB instance can be set via the env var below.
56// If the env var is not set, use the default value in MiB.
57const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
58const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
59
60// Write ahead log size per RocksDB instance can be set via the env var below.
61// If the env var is not set, use the default value in MiB.
62const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
63const DEFAULT_DB_WAL_SIZE: usize = 1024;
64
65// Environment variable to control behavior of write throughput optimized
66// tables.
67const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
68const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
69const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
70const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
71const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
72const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
73const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
74const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
75const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
76
77// Set to 1 to disable blob storage for transactions and effects.
78const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
79
80const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
81
82// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property
83// built-in. From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
84const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
85    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
86
87#[cfg(test)]
88mod tests;
89
90/// A helper macro to reopen multiple column families. The macro returns
91/// a tuple of DBMap structs in the same order that the column families
92/// are defined.
93///
94/// # Arguments
95///
96/// * `db` - a reference to a rocks DB object
97/// * `cf;<ty,ty>` - a comma separated list of column families to open. For each
98///   column family a concatenation of column family name (cf) and Key-Value
99///   <ty, ty> should be provided.
100///
101/// # Examples
102///
103/// We successfully open two different column families.
104/// ```
105/// use typed_store::reopen;
106/// use typed_store::rocks::*;
107/// use tempfile::tempdir;
108/// use prometheus::Registry;
109/// use std::sync::Arc;
110/// use typed_store::metrics::DBMetrics;
111/// use core::fmt::Error;
112///
113/// #[tokio::main]
114/// async fn main() -> Result<(), Error> {
115/// const FIRST_CF: &str = "First_CF";
116/// const SECOND_CF: &str = "Second_CF";
117///
118///
119/// /// Create the rocks database reference for the desired column families
120/// let rocks = open_cf(tempdir().unwrap(), None, MetricConf::default(), &[FIRST_CF, SECOND_CF]).unwrap();
121///
122/// /// Now simply open all the column families for their expected Key-Value types
123/// let (db_map_1, db_map_2) = reopen!(&rocks, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
124/// Ok(())
125/// }
126/// ```
127#[macro_export]
128macro_rules! reopen {
129    ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
130        (
131            $(
132                DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
133            ),*
134        )
135    };
136}
137
138/// Repeatedly attempt an Optimistic Transaction until it succeeds.
139/// Since many callsites (e.g. the consensus handler) cannot proceed in the case
140/// of failed writes, this will loop forever until the transaction succeeds.
141#[macro_export]
142macro_rules! retry_transaction {
143    ($transaction:expr) => {
144        retry_transaction!($transaction, Some(20))
145    };
146
147    (
148        $transaction:expr,
149        $max_retries:expr // should be an Option<int type>, None for unlimited
150        $(,)?
151
152    ) => {{
153        use rand::{
154            distributions::{Distribution, Uniform},
155            rngs::ThreadRng,
156        };
157        use tokio::time::{Duration, sleep};
158        use tracing::{error, info};
159
160        let mut retries = 0;
161        let max_retries = $max_retries;
162        loop {
163            let status = $transaction;
164            match status {
165                Err(TypedStoreError::RetryableTransaction) => {
166                    retries += 1;
167                    // Randomized delay to help racing transactions get out of each other's way.
168                    let delay = {
169                        let mut rng = ThreadRng::default();
170                        Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
171                    };
172                    if let Some(max_retries) = max_retries {
173                        if retries > max_retries {
174                            error!(?max_retries, "max retries exceeded");
175                            break status;
176                        }
177                    }
178                    if retries > 10 {
179                        // TODO: monitoring needed?
180                        error!(?delay, ?retries, "excessive transaction retries...");
181                    } else {
182                        info!(
183                            ?delay,
184                            ?retries,
185                            "transaction write conflict detected, sleeping"
186                        );
187                    }
188                    sleep(delay).await;
189                }
190                _ => break status,
191            }
192        }
193    }};
194}
195
196#[macro_export]
197macro_rules! retry_transaction_forever {
198    ($transaction:expr) => {
199        $crate::retry_transaction!($transaction, None)
200    };
201}
202
203#[derive(Debug)]
204pub struct DBWithThreadModeWrapper {
205    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
206    pub metric_conf: MetricConf,
207    pub db_path: PathBuf,
208}
209
210impl DBWithThreadModeWrapper {
211    fn new(
212        underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
213        metric_conf: MetricConf,
214        db_path: PathBuf,
215    ) -> Self {
216        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
217        Self {
218            underlying,
219            metric_conf,
220            db_path,
221        }
222    }
223}
224
225impl Drop for DBWithThreadModeWrapper {
226    fn drop(&mut self) {
227        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
228    }
229}
230
231#[derive(Debug)]
232pub struct OptimisticTransactionDBWrapper {
233    pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
234    pub metric_conf: MetricConf,
235    pub db_path: PathBuf,
236}
237
238impl OptimisticTransactionDBWrapper {
239    fn new(
240        underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
241        metric_conf: MetricConf,
242        db_path: PathBuf,
243    ) -> Self {
244        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
245        Self {
246            underlying,
247            metric_conf,
248            db_path,
249        }
250    }
251}
252
253impl Drop for OptimisticTransactionDBWrapper {
254    fn drop(&mut self) {
255        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
256    }
257}
258
259/// Thin wrapper to unify interface across different db types
260#[derive(Debug)]
261pub enum RocksDB {
262    DBWithThreadMode(DBWithThreadModeWrapper),
263    OptimisticTransactionDB(OptimisticTransactionDBWrapper),
264}
265
266macro_rules! delegate_call {
267    ($self:ident.$method:ident($($args:ident),*)) => {
268        match $self {
269            Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
270            Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
271        }
272    }
273}
274
275impl Drop for RocksDB {
276    fn drop(&mut self) {
277        delegate_call!(self.cancel_all_background_work(/* wait */ true))
278    }
279}
280
281impl RocksDB {
282    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
283        delegate_call!(self.get(key))
284    }
285
286    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
287        &'a self,
288        keys: I,
289        readopts: &ReadOptions,
290    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
291    where
292        K: AsRef<[u8]>,
293        I: IntoIterator<Item = (&'b W, K)>,
294        W: 'b + AsColumnFamilyRef,
295    {
296        delegate_call!(self.multi_get_cf_opt(keys, readopts))
297    }
298
299    pub fn batched_multi_get_cf_opt<I, K>(
300        &self,
301        cf: &impl AsColumnFamilyRef,
302        keys: I,
303        sorted_input: bool,
304        readopts: &ReadOptions,
305    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
306    where
307        I: IntoIterator<Item = K>,
308        K: AsRef<[u8]>,
309    {
310        delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
311    }
312
313    pub fn property_int_value_cf(
314        &self,
315        cf: &impl AsColumnFamilyRef,
316        name: impl CStrLike,
317    ) -> Result<Option<u64>, rocksdb::Error> {
318        delegate_call!(self.property_int_value_cf(cf, name))
319    }
320
321    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
322        &self,
323        cf: &impl AsColumnFamilyRef,
324        key: K,
325        readopts: &ReadOptions,
326    ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
327        delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
328    }
329
330    pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
331        delegate_call!(self.cf_handle(name))
332    }
333
334    pub fn create_cf<N: AsRef<str>>(
335        &self,
336        name: N,
337        opts: &rocksdb::Options,
338    ) -> Result<(), rocksdb::Error> {
339        delegate_call!(self.create_cf(name, opts))
340    }
341
342    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
343        delegate_call!(self.drop_cf(name))
344    }
345
346    pub fn delete_file_in_range<K: AsRef<[u8]>>(
347        &self,
348        cf: &impl AsColumnFamilyRef,
349        from: K,
350        to: K,
351    ) -> Result<(), rocksdb::Error> {
352        delegate_call!(self.delete_file_in_range_cf(cf, from, to))
353    }
354
355    pub fn delete_cf<K: AsRef<[u8]>>(
356        &self,
357        cf: &impl AsColumnFamilyRef,
358        key: K,
359        writeopts: &WriteOptions,
360    ) -> Result<(), rocksdb::Error> {
361        fail_point!("delete-cf-before");
362        let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
363        fail_point!("delete-cf-after");
364        #[expect(clippy::let_and_return)]
365        ret
366    }
367
368    pub fn path(&self) -> &Path {
369        delegate_call!(self.path())
370    }
371
372    pub fn put_cf<K, V>(
373        &self,
374        cf: &impl AsColumnFamilyRef,
375        key: K,
376        value: V,
377        writeopts: &WriteOptions,
378    ) -> Result<(), rocksdb::Error>
379    where
380        K: AsRef<[u8]>,
381        V: AsRef<[u8]>,
382    {
383        fail_point!("put-cf-before");
384        let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
385        fail_point!("put-cf-after");
386        #[expect(clippy::let_and_return)]
387        ret
388    }
389
390    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
391        &self,
392        cf: &impl AsColumnFamilyRef,
393        key: K,
394        readopts: &ReadOptions,
395    ) -> bool {
396        delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
397    }
398
399    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
400        delegate_call!(self.try_catch_up_with_primary())
401    }
402
403    pub fn write(
404        &self,
405        batch: RocksDBBatch,
406        writeopts: &WriteOptions,
407    ) -> Result<(), TypedStoreError> {
408        fail_point!("batch-write-before");
409        let ret = match (self, batch) {
410            (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
411                db.underlying
412                    .write_opt(batch, writeopts)
413                    .map_err(typed_store_err_from_rocks_err)?;
414                Ok(())
415            }
416            (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
417                db.underlying
418                    .write_opt(batch, writeopts)
419                    .map_err(typed_store_err_from_rocks_err)?;
420                Ok(())
421            }
422            _ => Err(TypedStoreError::RocksDB(
423                "using invalid batch type for the database".to_string(),
424            )),
425        };
426        fail_point!("batch-write-after");
427        #[expect(clippy::let_and_return)]
428        ret
429    }
430
431    pub fn transaction_without_snapshot(
432        &self,
433    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
434        match self {
435            Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
436            Self::DBWithThreadMode(_) => panic!(),
437        }
438    }
439
440    pub fn transaction(
441        &self,
442    ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
443        match self {
444            Self::OptimisticTransactionDB(db) => {
445                let mut tx_opts = OptimisticTransactionOptions::new();
446                tx_opts.set_snapshot(true);
447
448                Ok(db
449                    .underlying
450                    .transaction_opt(&WriteOptions::default(), &tx_opts))
451            }
452            Self::DBWithThreadMode(_) => panic!(),
453        }
454    }
455
456    pub fn raw_iterator_cf<'a: 'b, 'b>(
457        &'a self,
458        cf_handle: &impl AsColumnFamilyRef,
459        readopts: ReadOptions,
460    ) -> RocksDBRawIter<'b> {
461        match self {
462            Self::DBWithThreadMode(db) => {
463                RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
464            }
465            Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
466                db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
467            ),
468        }
469    }
470
471    pub fn iterator_cf<'a: 'b, 'b>(
472        &'a self,
473        cf_handle: &impl AsColumnFamilyRef,
474        readopts: ReadOptions,
475        mode: IteratorMode<'_>,
476    ) -> RocksDBIter<'b> {
477        match self {
478            Self::DBWithThreadMode(db) => {
479                RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
480            }
481            Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
482                db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
483            ),
484        }
485    }
486
487    pub fn compact_range_cf<K: AsRef<[u8]>>(
488        &self,
489        cf: &impl AsColumnFamilyRef,
490        start: Option<K>,
491        end: Option<K>,
492    ) {
493        delegate_call!(self.compact_range_cf(cf, start, end))
494    }
495
496    pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
497        &self,
498        cf: &impl AsColumnFamilyRef,
499        start: Option<K>,
500        end: Option<K>,
501    ) {
502        let opt = &mut CompactOptions::default();
503        opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
504        delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
505    }
506
507    pub fn flush(&self) -> Result<(), TypedStoreError> {
508        delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
509    }
510
511    pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
512        match self {
513            Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
514            Self::OptimisticTransactionDB(d) => {
515                RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
516            }
517        }
518    }
519
520    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
521        let checkpoint = match self {
522            Self::DBWithThreadMode(d) => {
523                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
524            }
525            Self::OptimisticTransactionDB(d) => {
526                Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
527            }
528        };
529        checkpoint
530            .create_checkpoint(path)
531            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
532        Ok(())
533    }
534
535    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
536        delegate_call!(self.flush_cf(cf))
537    }
538
539    pub fn set_options_cf(
540        &self,
541        cf: &impl AsColumnFamilyRef,
542        opts: &[(&str, &str)],
543    ) -> Result<(), rocksdb::Error> {
544        delegate_call!(self.set_options_cf(cf, opts))
545    }
546
547    pub fn get_sampling_interval(&self) -> SamplingInterval {
548        match self {
549            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
550            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
551        }
552    }
553
554    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
555        match self {
556            Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
557            Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
558        }
559    }
560
561    pub fn write_sampling_interval(&self) -> SamplingInterval {
562        match self {
563            Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
564            Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
565        }
566    }
567
568    pub fn iter_sampling_interval(&self) -> SamplingInterval {
569        match self {
570            Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
571            Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
572        }
573    }
574
575    pub fn db_name(&self) -> String {
576        let name = match self {
577            Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
578            Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
579        };
580        if name.is_empty() {
581            self.default_db_name()
582        } else {
583            name.clone()
584        }
585    }
586
587    fn default_db_name(&self) -> String {
588        self.path()
589            .file_name()
590            .and_then(|f| f.to_str())
591            .unwrap_or("unknown")
592            .to_string()
593    }
594
595    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
596        delegate_call!(self.live_files())
597    }
598}
599
600pub enum RocksDBSnapshot<'a> {
601    DBWithThreadMode(rocksdb::Snapshot<'a>),
602    OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
603}
604
605impl<'a> RocksDBSnapshot<'a> {
606    pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
607        &'a self,
608        keys: I,
609        readopts: ReadOptions,
610    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
611    where
612        K: AsRef<[u8]>,
613        I: IntoIterator<Item = (&'b W, K)>,
614        W: 'b + AsColumnFamilyRef,
615    {
616        match self {
617            Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
618            Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
619        }
620    }
621    pub fn multi_get_cf<'b: 'a, K, I, W>(
622        &'a self,
623        keys: I,
624    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
625    where
626        K: AsRef<[u8]>,
627        I: IntoIterator<Item = (&'b W, K)>,
628        W: 'b + AsColumnFamilyRef,
629    {
630        match self {
631            Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
632            Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
633        }
634    }
635}
636
637pub enum RocksDBBatch {
638    Regular(rocksdb::WriteBatch),
639    Transactional(rocksdb::WriteBatchWithTransaction<true>),
640}
641
642macro_rules! delegate_batch_call {
643    ($self:ident.$method:ident($($args:ident),*)) => {
644        match $self {
645            Self::Regular(b) => b.$method($($args),*),
646            Self::Transactional(b) => b.$method($($args),*),
647        }
648    }
649}
650
651impl RocksDBBatch {
652    fn size_in_bytes(&self) -> usize {
653        delegate_batch_call!(self.size_in_bytes())
654    }
655
656    pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
657        delegate_batch_call!(self.delete_cf(cf, key))
658    }
659
660    pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
661    where
662        K: AsRef<[u8]>,
663        V: AsRef<[u8]>,
664    {
665        delegate_batch_call!(self.put_cf(cf, key, value))
666    }
667
668    pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
669    where
670        K: AsRef<[u8]>,
671        V: AsRef<[u8]>,
672    {
673        delegate_batch_call!(self.merge_cf(cf, key, value))
674    }
675
676    pub fn delete_range_cf<K: AsRef<[u8]>>(
677        &mut self,
678        cf: &impl AsColumnFamilyRef,
679        from: K,
680        to: K,
681    ) -> Result<(), TypedStoreError> {
682        match self {
683            Self::Regular(batch) => {
684                batch.delete_range_cf(cf, from, to);
685                Ok(())
686            }
687            Self::Transactional(_) => panic!(),
688        }
689    }
690}
691
692#[derive(Debug, Default)]
693pub struct MetricConf {
694    pub db_name: String,
695    pub read_sample_interval: SamplingInterval,
696    pub write_sample_interval: SamplingInterval,
697    pub iter_sample_interval: SamplingInterval,
698}
699
700impl MetricConf {
701    pub fn new(db_name: &str) -> Self {
702        if db_name.is_empty() {
703            error!("A meaningful db name should be used for metrics reporting.")
704        }
705        Self {
706            db_name: db_name.to_string(),
707            read_sample_interval: SamplingInterval::default(),
708            write_sample_interval: SamplingInterval::default(),
709            iter_sample_interval: SamplingInterval::default(),
710        }
711    }
712
713    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
714        Self {
715            db_name: self.db_name,
716            read_sample_interval: read_interval,
717            write_sample_interval: SamplingInterval::default(),
718            iter_sample_interval: SamplingInterval::default(),
719        }
720    }
721}
722const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
723const METRICS_ERROR: i64 = -1;
724
725/// An interface to a rocksDB database, keyed by a columnfamily
726#[derive(Clone, Debug)]
727pub struct DBMap<K, V> {
728    pub rocksdb: Arc<RocksDB>,
729    _phantom: PhantomData<fn(K) -> V>,
730    // the rocksDB ColumnFamily under which the map is stored
731    cf: String,
732    pub opts: ReadWriteOptions,
733    db_metrics: Arc<DBMetrics>,
734    get_sample_interval: SamplingInterval,
735    multiget_sample_interval: SamplingInterval,
736    write_sample_interval: SamplingInterval,
737    iter_sample_interval: SamplingInterval,
738    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
739}
740
741unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
742
743impl<K, V> DBMap<K, V> {
744    pub(crate) fn new(
745        db: Arc<RocksDB>,
746        opts: &ReadWriteOptions,
747        opt_cf: &str,
748        is_deprecated: bool,
749    ) -> Self {
750        let db_cloned = db.clone();
751        let db_metrics = DBMetrics::get();
752        let db_metrics_cloned = db_metrics.clone();
753        let cf = opt_cf.to_string();
754        let (sender, mut recv) = tokio::sync::oneshot::channel();
755        if !is_deprecated {
756            tokio::task::spawn(async move {
757                let mut interval =
758                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
759                loop {
760                    tokio::select! {
761                        _ = interval.tick() => {
762                            let db = db_cloned.clone();
763                            let cf = cf.clone();
764                            let db_metrics = db_metrics.clone();
765                            if let Err(e) = tokio::task::spawn_blocking(move || {
766                                Self::report_metrics(&db, &cf, &db_metrics);
767                            }).await {
768                                error!("Failed to log metrics with error: {}", e);
769                            }
770                        }
771                        _ = &mut recv => break,
772                    }
773                }
774                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
775            });
776        }
777        DBMap {
778            rocksdb: db.clone(),
779            opts: opts.clone(),
780            _phantom: PhantomData,
781            cf: opt_cf.to_string(),
782            db_metrics: db_metrics_cloned,
783            _metrics_task_cancel_handle: Arc::new(sender),
784            get_sample_interval: db.get_sampling_interval(),
785            multiget_sample_interval: db.multiget_sampling_interval(),
786            write_sample_interval: db.write_sampling_interval(),
787            iter_sample_interval: db.iter_sampling_interval(),
788        }
789    }
790
791    /// Opens a database from a path, with specific options and an optional
792    /// column family.
793    ///
794    /// This database is used to perform operations on single column family, and
795    /// parametrizes all operations in `DBBatch` when writing across column
796    /// families.
797    #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
798    pub fn open<P: AsRef<Path>>(
799        path: P,
800        metric_conf: MetricConf,
801        db_options: Option<rocksdb::Options>,
802        opt_cf: Option<&str>,
803        rw_options: &ReadWriteOptions,
804    ) -> Result<Self, TypedStoreError> {
805        let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
806        let cfs = vec![cf_key];
807        let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
808        Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
809    }
810
811    /// Reopens an open database as a typed map operating under a specific
812    /// column family. if no column family is passed, the default column
813    /// family is used.
814    ///
815    /// ```
816    /// use core::fmt::Error;
817    /// use std::sync::Arc;
818    ///
819    /// use prometheus::Registry;
820    /// use tempfile::tempdir;
821    /// use typed_store::{metrics::DBMetrics, rocks::*};
822    /// #[tokio::main]
823    /// async fn main() -> Result<(), Error> {
824    ///     /// Open the DB with all needed column families first.
825    ///     let rocks = open_cf(
826    ///         tempdir().unwrap(),
827    ///         None,
828    ///         MetricConf::default(),
829    ///         &["First_CF", "Second_CF"],
830    ///     )
831    ///     .unwrap();
832    ///     /// Attach the column families to specific maps.
833    ///     let db_cf_1 = DBMap::<u32, u32>::reopen(
834    ///         &rocks,
835    ///         Some("First_CF"),
836    ///         &ReadWriteOptions::default(),
837    ///         false,
838    ///     )
839    ///     .expect("Failed to open storage");
840    ///     let db_cf_2 = DBMap::<u32, u32>::reopen(
841    ///         &rocks,
842    ///         Some("Second_CF"),
843    ///         &ReadWriteOptions::default(),
844    ///         false,
845    ///     )
846    ///     .expect("Failed to open storage");
847    ///     Ok(())
848    /// }
849    /// ```
850    #[instrument(level = "debug", skip(db), err)]
851    pub fn reopen(
852        db: &Arc<RocksDB>,
853        opt_cf: Option<&str>,
854        rw_options: &ReadWriteOptions,
855        is_deprecated: bool,
856    ) -> Result<Self, TypedStoreError> {
857        let cf_key = opt_cf
858            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
859            .to_owned();
860
861        db.cf_handle(&cf_key)
862            .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
863
864        Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
865    }
866
867    pub fn batch(&self) -> DBBatch {
868        let batch = match *self.rocksdb {
869            RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
870            RocksDB::OptimisticTransactionDB(_) => {
871                RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
872            }
873        };
874        DBBatch::new(
875            &self.rocksdb,
876            batch,
877            self.opts.writeopts(),
878            &self.db_metrics,
879            &self.write_sample_interval,
880        )
881    }
882
883    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
884        let from_buf = be_fix_int_ser(start)?;
885        let to_buf = be_fix_int_ser(end)?;
886        self.rocksdb
887            .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
888        Ok(())
889    }
890
891    pub fn compact_range_raw(
892        &self,
893        cf_name: &str,
894        start: Vec<u8>,
895        end: Vec<u8>,
896    ) -> Result<(), TypedStoreError> {
897        let cf = self
898            .rocksdb
899            .cf_handle(cf_name)
900            .expect("compact range: column family does not exist");
901        self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
902        Ok(())
903    }
904
905    pub fn compact_range_to_bottom<J: Serialize>(
906        &self,
907        start: &J,
908        end: &J,
909    ) -> Result<(), TypedStoreError> {
910        let from_buf = be_fix_int_ser(start)?;
911        let to_buf = be_fix_int_ser(end)?;
912        self.rocksdb
913            .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
914        Ok(())
915    }
916
917    pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
918        self.rocksdb
919            .cf_handle(&self.cf)
920            .expect("Map-keying column family should have been checked at DB creation")
921    }
922
923    pub fn iterator_cf(&self) -> RocksDBIter<'_> {
924        self.rocksdb
925            .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
926    }
927
928    pub fn flush(&self) -> Result<(), TypedStoreError> {
929        self.rocksdb
930            .flush_cf(&self.cf())
931            .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
932    }
933
934    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
935        self.rocksdb.set_options_cf(&self.cf(), opts)
936    }
937
938    fn get_int_property(
939        rocksdb: &RocksDB,
940        cf: &impl AsColumnFamilyRef,
941        property_name: &std::ffi::CStr,
942    ) -> Result<i64, TypedStoreError> {
943        match rocksdb.property_int_value_cf(cf, property_name) {
944            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
945            Ok(None) => Ok(0),
946            Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
947        }
948    }
949
950    /// Returns a vector of raw values corresponding to the keys provided.
951    fn multi_get_pinned<J>(
952        &self,
953        keys: impl IntoIterator<Item = J>,
954    ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
955    where
956        J: Borrow<K>,
957        K: Serialize,
958    {
959        let _timer = self
960            .db_metrics
961            .op_metrics
962            .rocksdb_multiget_latency_seconds
963            .with_label_values(&[&self.cf])
964            .start_timer();
965        let perf_ctx = if self.multiget_sample_interval.sample() {
966            Some(RocksDBPerfContext)
967        } else {
968            None
969        };
970        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
971            .into_iter()
972            .map(|k| be_fix_int_ser(k.borrow()))
973            .collect();
974        let results: Result<Vec<_>, TypedStoreError> = self
975            .rocksdb
976            .batched_multi_get_cf_opt(
977                &self.cf(),
978                keys_bytes?,
979                // sorted_keys=
980                false,
981                &self.opts.readopts(),
982            )
983            .into_iter()
984            .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
985            .collect();
986        let entries = results?;
987        let entry_size = entries
988            .iter()
989            .flatten()
990            .map(|entry| entry.len())
991            .sum::<usize>();
992        self.db_metrics
993            .op_metrics
994            .rocksdb_multiget_bytes
995            .with_label_values(&[&self.cf])
996            .observe(entry_size as f64);
997        if perf_ctx.is_some() {
998            self.db_metrics
999                .read_perf_ctx_metrics
1000                .report_metrics(&self.cf);
1001        }
1002        Ok(entries)
1003    }
1004
1005    fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
1006        let cf = rocksdb.cf_handle(cf_name).expect("Failed to get cf");
1007        db_metrics
1008            .cf_metrics
1009            .rocksdb_total_sst_files_size
1010            .with_label_values(&[cf_name])
1011            .set(
1012                Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1013                    .unwrap_or(METRICS_ERROR),
1014            );
1015        db_metrics
1016            .cf_metrics
1017            .rocksdb_total_blob_files_size
1018            .with_label_values(&[cf_name])
1019            .set(
1020                Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1021                    .unwrap_or(METRICS_ERROR),
1022            );
1023        // 7 is the default number of levels in RocksDB. If we ever change the number of
1024        // levels using `set_num_levels`, we need to update here as well. Note
1025        // that there isn't an API to query the DB to get the number of levels (yet).
1026        let total_num_files: i64 = (0..=6)
1027            .map(|level| {
1028                Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
1029                    .unwrap_or(METRICS_ERROR)
1030            })
1031            .sum();
1032        db_metrics
1033            .cf_metrics
1034            .rocksdb_total_num_files
1035            .with_label_values(&[cf_name])
1036            .set(total_num_files);
1037        db_metrics
1038            .cf_metrics
1039            .rocksdb_num_level0_files
1040            .with_label_values(&[cf_name])
1041            .set(
1042                Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
1043                    .unwrap_or(METRICS_ERROR),
1044            );
1045        db_metrics
1046            .cf_metrics
1047            .rocksdb_current_size_active_mem_tables
1048            .with_label_values(&[cf_name])
1049            .set(
1050                Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1051                    .unwrap_or(METRICS_ERROR),
1052            );
1053        db_metrics
1054            .cf_metrics
1055            .rocksdb_size_all_mem_tables
1056            .with_label_values(&[cf_name])
1057            .set(
1058                Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1059                    .unwrap_or(METRICS_ERROR),
1060            );
1061        db_metrics
1062            .cf_metrics
1063            .rocksdb_num_snapshots
1064            .with_label_values(&[cf_name])
1065            .set(
1066                Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1067                    .unwrap_or(METRICS_ERROR),
1068            );
1069        db_metrics
1070            .cf_metrics
1071            .rocksdb_oldest_snapshot_time
1072            .with_label_values(&[cf_name])
1073            .set(
1074                Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1075                    .unwrap_or(METRICS_ERROR),
1076            );
1077        db_metrics
1078            .cf_metrics
1079            .rocksdb_actual_delayed_write_rate
1080            .with_label_values(&[cf_name])
1081            .set(
1082                Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1083                    .unwrap_or(METRICS_ERROR),
1084            );
1085        db_metrics
1086            .cf_metrics
1087            .rocksdb_is_write_stopped
1088            .with_label_values(&[cf_name])
1089            .set(
1090                Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1091                    .unwrap_or(METRICS_ERROR),
1092            );
1093        db_metrics
1094            .cf_metrics
1095            .rocksdb_block_cache_capacity
1096            .with_label_values(&[cf_name])
1097            .set(
1098                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1099                    .unwrap_or(METRICS_ERROR),
1100            );
1101        db_metrics
1102            .cf_metrics
1103            .rocksdb_block_cache_usage
1104            .with_label_values(&[cf_name])
1105            .set(
1106                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1107                    .unwrap_or(METRICS_ERROR),
1108            );
1109        db_metrics
1110            .cf_metrics
1111            .rocksdb_block_cache_pinned_usage
1112            .with_label_values(&[cf_name])
1113            .set(
1114                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1115                    .unwrap_or(METRICS_ERROR),
1116            );
1117        db_metrics
1118            .cf_metrics
1119            .rocksdb_estimate_table_readers_mem
1120            .with_label_values(&[cf_name])
1121            .set(
1122                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1123                    .unwrap_or(METRICS_ERROR),
1124            );
1125        db_metrics
1126            .cf_metrics
1127            .rocksdb_estimated_num_keys
1128            .with_label_values(&[cf_name])
1129            .set(
1130                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1131                    .unwrap_or(METRICS_ERROR),
1132            );
1133        db_metrics
1134            .cf_metrics
1135            .rocksdb_num_immutable_mem_tables
1136            .with_label_values(&[cf_name])
1137            .set(
1138                Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1139                    .unwrap_or(METRICS_ERROR),
1140            );
1141        db_metrics
1142            .cf_metrics
1143            .rocksdb_mem_table_flush_pending
1144            .with_label_values(&[cf_name])
1145            .set(
1146                Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1147                    .unwrap_or(METRICS_ERROR),
1148            );
1149        db_metrics
1150            .cf_metrics
1151            .rocksdb_compaction_pending
1152            .with_label_values(&[cf_name])
1153            .set(
1154                Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1155                    .unwrap_or(METRICS_ERROR),
1156            );
1157        db_metrics
1158            .cf_metrics
1159            .rocksdb_estimate_pending_compaction_bytes
1160            .with_label_values(&[cf_name])
1161            .set(
1162                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1163                    .unwrap_or(METRICS_ERROR),
1164            );
1165        db_metrics
1166            .cf_metrics
1167            .rocksdb_num_running_compactions
1168            .with_label_values(&[cf_name])
1169            .set(
1170                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1171                    .unwrap_or(METRICS_ERROR),
1172            );
1173        db_metrics
1174            .cf_metrics
1175            .rocksdb_num_running_flushes
1176            .with_label_values(&[cf_name])
1177            .set(
1178                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1179                    .unwrap_or(METRICS_ERROR),
1180            );
1181        db_metrics
1182            .cf_metrics
1183            .rocksdb_estimate_oldest_key_time
1184            .with_label_values(&[cf_name])
1185            .set(
1186                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1187                    .unwrap_or(METRICS_ERROR),
1188            );
1189        db_metrics
1190            .cf_metrics
1191            .rocksdb_background_errors
1192            .with_label_values(&[cf_name])
1193            .set(
1194                Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1195                    .unwrap_or(METRICS_ERROR),
1196            );
1197        db_metrics
1198            .cf_metrics
1199            .rocksdb_base_level
1200            .with_label_values(&[cf_name])
1201            .set(
1202                Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1203                    .unwrap_or(METRICS_ERROR),
1204            );
1205    }
1206
1207    pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1208        DBTransaction::new(&self.rocksdb)
1209    }
1210
1211    pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1212        DBTransaction::new_without_snapshot(&self.rocksdb)
1213    }
1214
1215    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1216        self.rocksdb.checkpoint(path)
1217    }
1218
1219    pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1220        Ok(self.rocksdb.snapshot())
1221    }
1222
1223    pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1224        let mut num_keys = 0;
1225        let mut key_bytes_total = 0;
1226        let mut value_bytes_total = 0;
1227        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1228        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1229        let iter = self.iterator_cf().map(Result::unwrap);
1230        for (key, value) in iter {
1231            num_keys += 1;
1232            key_bytes_total += key.len();
1233            value_bytes_total += value.len();
1234            key_hist.record(key.len() as u64)?;
1235            value_hist.record(value.len() as u64)?;
1236        }
1237        Ok(TableSummary {
1238            num_keys,
1239            key_bytes_total,
1240            value_bytes_total,
1241            key_hist,
1242            value_hist,
1243        })
1244    }
1245
1246    // Creates metrics and context for tracking an iterator usage and performance.
1247    fn create_iter_context(
1248        &self,
1249    ) -> (
1250        Option<HistogramTimer>,
1251        Option<Histogram>,
1252        Option<Histogram>,
1253        Option<RocksDBPerfContext>,
1254    ) {
1255        let timer = self
1256            .db_metrics
1257            .op_metrics
1258            .rocksdb_iter_latency_seconds
1259            .with_label_values(&[&self.cf])
1260            .start_timer();
1261        let bytes_scanned = self
1262            .db_metrics
1263            .op_metrics
1264            .rocksdb_iter_bytes
1265            .with_label_values(&[&self.cf]);
1266        let keys_scanned = self
1267            .db_metrics
1268            .op_metrics
1269            .rocksdb_iter_keys
1270            .with_label_values(&[&self.cf]);
1271        let perf_ctx = if self.iter_sample_interval.sample() {
1272            Some(RocksDBPerfContext)
1273        } else {
1274            None
1275        };
1276        (
1277            Some(timer),
1278            Some(bytes_scanned),
1279            Some(keys_scanned),
1280            perf_ctx,
1281        )
1282    }
1283
1284    // Creates a RocksDB read option with specified lower and upper bounds.
1285    /// Lower bound is inclusive, and upper bound is exclusive.
1286    fn create_read_options_with_bounds(
1287        &self,
1288        lower_bound: Option<K>,
1289        upper_bound: Option<K>,
1290    ) -> ReadOptions
1291    where
1292        K: Serialize,
1293    {
1294        let mut readopts = self.opts.readopts();
1295        if let Some(lower_bound) = lower_bound {
1296            let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1297            readopts.set_iterate_lower_bound(key_buf);
1298        }
1299        if let Some(upper_bound) = upper_bound {
1300            let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1301            readopts.set_iterate_upper_bound(key_buf);
1302        }
1303        readopts
1304    }
1305
1306    // Creates a RocksDB read option with lower and upper bounds set corresponding
1307    // to `range`.
1308    fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1309    where
1310        K: Serialize,
1311    {
1312        let mut readopts = self.opts.readopts();
1313
1314        let lower_bound = range.start_bound();
1315        let upper_bound = range.end_bound();
1316
1317        match lower_bound {
1318            Bound::Included(lower_bound) => {
1319                // Rocksdb lower bound is inclusive by default so nothing to do
1320                let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1321                readopts.set_iterate_lower_bound(key_buf);
1322            }
1323            Bound::Excluded(lower_bound) => {
1324                let mut key_buf =
1325                    be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1326
1327                // Since we want exclusive, we need to increment the key to exclude the previous
1328                big_endian_saturating_add_one(&mut key_buf);
1329                readopts.set_iterate_lower_bound(key_buf);
1330            }
1331            Bound::Unbounded => (),
1332        };
1333
1334        match upper_bound {
1335            Bound::Included(upper_bound) => {
1336                let mut key_buf =
1337                    be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1338
1339                // If the key is already at the limit, there's nowhere else to go, so no upper
1340                // bound
1341                if !is_max(&key_buf) {
1342                    // Since we want exclusive, we need to increment the key to get the upper bound
1343                    big_endian_saturating_add_one(&mut key_buf);
1344                    readopts.set_iterate_upper_bound(key_buf);
1345                }
1346            }
1347            Bound::Excluded(upper_bound) => {
1348                // Rocksdb upper bound is inclusive by default so nothing to do
1349                let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1350                readopts.set_iterate_upper_bound(key_buf);
1351            }
1352            Bound::Unbounded => (),
1353        };
1354
1355        readopts
1356    }
1357}
1358
1359/// Provides a mutable struct to form a collection of database write operations,
1360/// and execute them.
1361///
1362/// Batching write and delete operations is faster than performing them one by
1363/// one and ensures their atomicity,  ie. they are all written or none is.
1364/// This is also true of operations across column families in the same database.
1365///
1366/// Serializations / Deserialization, and naming of column families is performed
1367/// by passing a DBMap<K,V> with each operation.
1368///
1369/// ```
1370/// use core::fmt::Error;
1371/// use std::sync::Arc;
1372///
1373/// use prometheus::Registry;
1374/// use tempfile::tempdir;
1375/// use typed_store::{Map, metrics::DBMetrics, rocks::*};
1376///
1377/// #[tokio::main]
1378/// async fn main() -> Result<(), Error> {
1379///     let rocks = open_cf(
1380///         tempfile::tempdir().unwrap(),
1381///         None,
1382///         MetricConf::default(),
1383///         &["First_CF", "Second_CF"],
1384///     )
1385///     .unwrap();
1386///
1387///     let db_cf_1 = DBMap::reopen(
1388///         &rocks,
1389///         Some("First_CF"),
1390///         &ReadWriteOptions::default(),
1391///         false,
1392///     )
1393///     .expect("Failed to open storage");
1394///     let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1395///
1396///     let db_cf_2 = DBMap::reopen(
1397///         &rocks,
1398///         Some("Second_CF"),
1399///         &ReadWriteOptions::default(),
1400///         false,
1401///     )
1402///     .expect("Failed to open storage");
1403///     let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1404///
1405///     let mut batch = db_cf_1.batch();
1406///     batch
1407///         .insert_batch(&db_cf_1, keys_vals_1.clone())
1408///         .expect("Failed to batch insert")
1409///         .insert_batch(&db_cf_2, keys_vals_2.clone())
1410///         .expect("Failed to batch insert");
1411///
1412///     let _ = batch.write().expect("Failed to execute batch");
1413///     for (k, v) in keys_vals_1 {
1414///         let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1415///         assert_eq!(Some(v), val);
1416///     }
1417///
1418///     for (k, v) in keys_vals_2 {
1419///         let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1420///         assert_eq!(Some(v), val);
1421///     }
1422///     Ok(())
1423/// }
1424/// ```
1425pub struct DBBatch {
1426    rocksdb: Arc<RocksDB>,
1427    batch: RocksDBBatch,
1428    opts: WriteOptions,
1429    db_metrics: Arc<DBMetrics>,
1430    write_sample_interval: SamplingInterval,
1431}
1432
1433impl DBBatch {
1434    /// Create a new batch associated with a DB reference.
1435    ///
1436    /// Use `open_cf` to get the DB reference or an existing open database.
1437    pub fn new(
1438        dbref: &Arc<RocksDB>,
1439        batch: RocksDBBatch,
1440        opts: WriteOptions,
1441        db_metrics: &Arc<DBMetrics>,
1442        write_sample_interval: &SamplingInterval,
1443    ) -> Self {
1444        DBBatch {
1445            rocksdb: dbref.clone(),
1446            batch,
1447            opts,
1448            db_metrics: db_metrics.clone(),
1449            write_sample_interval: write_sample_interval.clone(),
1450        }
1451    }
1452
1453    /// Consume the batch and write its operations to the database
1454    #[instrument(level = "trace", skip_all, err)]
1455    pub fn write(self) -> Result<(), TypedStoreError> {
1456        let db_name = self.rocksdb.db_name();
1457        let timer = self
1458            .db_metrics
1459            .op_metrics
1460            .rocksdb_batch_commit_latency_seconds
1461            .with_label_values(&[&db_name])
1462            .start_timer();
1463        let batch_size = self.batch.size_in_bytes();
1464
1465        let perf_ctx = if self.write_sample_interval.sample() {
1466            Some(RocksDBPerfContext)
1467        } else {
1468            None
1469        };
1470        self.rocksdb.write(self.batch, &self.opts)?;
1471        self.db_metrics
1472            .op_metrics
1473            .rocksdb_batch_commit_bytes
1474            .with_label_values(&[&db_name])
1475            .observe(batch_size as f64);
1476
1477        if perf_ctx.is_some() {
1478            self.db_metrics
1479                .write_perf_ctx_metrics
1480                .report_metrics(&db_name);
1481        }
1482        let elapsed = timer.stop_and_record();
1483        if elapsed > 1.0 {
1484            warn!(?elapsed, ?db_name, "very slow batch write");
1485            self.db_metrics
1486                .op_metrics
1487                .rocksdb_very_slow_batch_writes_count
1488                .with_label_values(&[&db_name])
1489                .inc();
1490            self.db_metrics
1491                .op_metrics
1492                .rocksdb_very_slow_batch_writes_duration_ms
1493                .with_label_values(&[&db_name])
1494                .inc_by((elapsed * 1000.0) as u64);
1495        }
1496        Ok(())
1497    }
1498
1499    pub fn size_in_bytes(&self) -> usize {
1500        self.batch.size_in_bytes()
1501    }
1502}
1503
1504// TODO: Remove this entire implementation once we switch to sally
1505impl DBBatch {
1506    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1507        &mut self,
1508        db: &DBMap<K, V>,
1509        purged_vals: impl IntoIterator<Item = J>,
1510    ) -> Result<(), TypedStoreError> {
1511        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1512            return Err(TypedStoreError::CrossDBBatch);
1513        }
1514
1515        purged_vals
1516            .into_iter()
1517            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1518                let k_buf = be_fix_int_ser(k.borrow())?;
1519                self.batch.delete_cf(&db.cf(), k_buf);
1520
1521                Ok(())
1522            })?;
1523        Ok(())
1524    }
1525
1526    /// Deletes a range of keys between `from` (inclusive) and `to`
1527    /// (non-inclusive) by writing a range delete tombstone in the db map
1528    /// If the DBMap is configured with ignore_range_deletions set to false,
1529    /// the effect of this write will be visible immediately i.e. you won't
1530    /// see old values when you do a lookup or scan. But if it is configured
1531    /// with ignore_range_deletions set to true, the old value are visible until
1532    /// compaction actually deletes them which will happen sometime after. By
1533    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1534    /// overridden in the config), so please use this function with caution
1535    pub fn schedule_delete_range<K: Serialize, V>(
1536        &mut self,
1537        db: &DBMap<K, V>,
1538        from: &K,
1539        to: &K,
1540    ) -> Result<(), TypedStoreError> {
1541        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1542            return Err(TypedStoreError::CrossDBBatch);
1543        }
1544
1545        let from_buf = be_fix_int_ser(from)?;
1546        let to_buf = be_fix_int_ser(to)?;
1547
1548        self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1549        Ok(())
1550    }
1551
1552    /// inserts a range of (key, value) pairs given as an iterator
1553    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1554        &mut self,
1555        db: &DBMap<K, V>,
1556        new_vals: impl IntoIterator<Item = (J, U)>,
1557    ) -> Result<&mut Self, TypedStoreError> {
1558        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1559            return Err(TypedStoreError::CrossDBBatch);
1560        }
1561        let mut total = 0usize;
1562        new_vals
1563            .into_iter()
1564            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1565                let k_buf = be_fix_int_ser(k.borrow())?;
1566                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1567                total += k_buf.len() + v_buf.len();
1568                self.batch.put_cf(&db.cf(), k_buf, v_buf);
1569                Ok(())
1570            })?;
1571        self.db_metrics
1572            .op_metrics
1573            .rocksdb_batch_put_bytes
1574            .with_label_values(&[&db.cf])
1575            .observe(total as f64);
1576        Ok(self)
1577    }
1578
1579    /// merges a range of (key, value) pairs given as an iterator
1580    pub fn merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1581        &mut self,
1582        db: &DBMap<K, V>,
1583        new_vals: impl IntoIterator<Item = (J, U)>,
1584    ) -> Result<&mut Self, TypedStoreError> {
1585        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1586            return Err(TypedStoreError::CrossDBBatch);
1587        }
1588
1589        new_vals
1590            .into_iter()
1591            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1592                let k_buf = be_fix_int_ser(k.borrow())?;
1593                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1594                self.batch.merge_cf(&db.cf(), k_buf, v_buf);
1595                Ok(())
1596            })?;
1597        Ok(self)
1598    }
1599
1600    /// similar to `merge_batch` but allows merge with partial values
1601    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, V: Serialize, B: AsRef<[u8]>>(
1602        &mut self,
1603        db: &DBMap<K, V>,
1604        new_vals: impl IntoIterator<Item = (J, B)>,
1605    ) -> Result<&mut Self, TypedStoreError> {
1606        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1607            return Err(TypedStoreError::CrossDBBatch);
1608        }
1609        new_vals
1610            .into_iter()
1611            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1612                let k_buf = be_fix_int_ser(k.borrow())?;
1613                self.batch.merge_cf(&db.cf(), k_buf, v);
1614                Ok(())
1615            })?;
1616        Ok(self)
1617    }
1618}
1619
1620pub struct DBTransaction<'a> {
1621    rocksdb: Arc<RocksDB>,
1622    transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1623}
1624
1625impl<'a> DBTransaction<'a> {
1626    pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1627        Ok(Self {
1628            rocksdb: db.clone(),
1629            transaction: db.transaction()?,
1630        })
1631    }
1632
1633    pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1634        Ok(Self {
1635            rocksdb: db.clone(),
1636            transaction: db.transaction_without_snapshot()?,
1637        })
1638    }
1639
1640    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1641        &mut self,
1642        db: &DBMap<K, V>,
1643        new_vals: impl IntoIterator<Item = (J, U)>,
1644    ) -> Result<&mut Self, TypedStoreError> {
1645        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1646            return Err(TypedStoreError::CrossDBBatch);
1647        }
1648
1649        new_vals
1650            .into_iter()
1651            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1652                let k_buf = be_fix_int_ser(k.borrow())?;
1653                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1654                self.transaction
1655                    .put_cf(&db.cf(), k_buf, v_buf)
1656                    .map_err(typed_store_err_from_rocks_err)?;
1657                Ok(())
1658            })?;
1659        Ok(self)
1660    }
1661
1662    /// Deletes a set of keys given as an iterator
1663    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1664        &mut self,
1665        db: &DBMap<K, V>,
1666        purged_vals: impl IntoIterator<Item = J>,
1667    ) -> Result<&mut Self, TypedStoreError> {
1668        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1669            return Err(TypedStoreError::CrossDBBatch);
1670        }
1671        purged_vals
1672            .into_iter()
1673            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1674                let k_buf = be_fix_int_ser(k.borrow())?;
1675                self.transaction
1676                    .delete_cf(&db.cf(), k_buf)
1677                    .map_err(typed_store_err_from_rocks_err)?;
1678                Ok(())
1679            })?;
1680        Ok(self)
1681    }
1682
1683    pub fn snapshot(
1684        &self,
1685    ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1686    {
1687        self.transaction.snapshot()
1688    }
1689
1690    pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1691        &self,
1692        db: &DBMap<K, V>,
1693        key: &K,
1694    ) -> Result<Option<V>, TypedStoreError> {
1695        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1696            return Err(TypedStoreError::CrossDBBatch);
1697        }
1698        let k_buf = be_fix_int_ser(key)?;
1699        match self
1700            .transaction
1701            .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1702            .map_err(typed_store_err_from_rocks_err)?
1703        {
1704            Some(data) => Ok(Some(
1705                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1706            )),
1707            None => Ok(None),
1708        }
1709    }
1710
1711    pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1712        &self,
1713        db: &DBMap<K, V>,
1714        key: &K,
1715    ) -> Result<Option<V>, TypedStoreError> {
1716        let key_buf = be_fix_int_ser(key)?;
1717        self.transaction
1718            .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1719            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1720            .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1721    }
1722
1723    pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1724        &self,
1725        db: &DBMap<K, V>,
1726        keys: impl IntoIterator<Item = J>,
1727    ) -> Result<Vec<Option<V>>, TypedStoreError> {
1728        let cf = db.cf();
1729        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1730            .into_iter()
1731            .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1732            .collect();
1733
1734        let results = self
1735            .transaction
1736            .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1737
1738        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1739            .into_iter()
1740            .map(
1741                |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1742                    Some(data) => Ok(Some(
1743                        bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1744                    )),
1745                    None => Ok(None),
1746                },
1747            )
1748            .collect();
1749
1750        values_parsed
1751    }
1752
1753    pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1754        &'a self,
1755        db: &DBMap<K, V>,
1756    ) -> Iter<'a, K, V> {
1757        let db_iter = self
1758            .transaction
1759            .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1760        Iter::new(
1761            db.cf.clone(),
1762            RocksDBRawIter::OptimisticTransaction(db_iter),
1763            None,
1764            None,
1765            None,
1766            None,
1767            None,
1768        )
1769    }
1770
1771    pub fn keys<K: DeserializeOwned, V: DeserializeOwned>(
1772        &'a self,
1773        db: &DBMap<K, V>,
1774    ) -> Keys<'a, K> {
1775        let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1776            self.transaction
1777                .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1778        );
1779        db_iter.seek_to_first();
1780
1781        Keys::new(db_iter)
1782    }
1783
1784    pub fn values<K: DeserializeOwned, V: DeserializeOwned>(
1785        &'a self,
1786        db: &DBMap<K, V>,
1787    ) -> Values<'a, V> {
1788        let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1789            self.transaction
1790                .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1791        );
1792        db_iter.seek_to_first();
1793
1794        Values::new(db_iter)
1795    }
1796
1797    pub fn commit(self) -> Result<(), TypedStoreError> {
1798        fail_point!("transaction-commit");
1799        self.transaction.commit().map_err(|e| match e.kind() {
1800            // empirically, this is what you get when there is a write conflict. it is not
1801            // documented whether this is the only time you can get this error.
1802            ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1803            _ => typed_store_err_from_rocks_err(e),
1804        })?;
1805        Ok(())
1806    }
1807}
1808
1809macro_rules! delegate_iter_call {
1810    ($self:ident.$method:ident($($args:ident),*)) => {
1811        match $self {
1812            Self::DB(db) => db.$method($($args),*),
1813            Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1814            Self::OptimisticTransaction(db) => db.$method($($args),*),
1815        }
1816    }
1817}
1818
1819pub enum RocksDBRawIter<'a> {
1820    DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1821    OptimisticTransactionDB(
1822        rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1823    ),
1824    OptimisticTransaction(
1825        rocksdb::DBRawIteratorWithThreadMode<
1826            'a,
1827            Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1828        >,
1829    ),
1830}
1831
1832impl RocksDBRawIter<'_> {
1833    pub fn valid(&self) -> bool {
1834        delegate_iter_call!(self.valid())
1835    }
1836    pub fn key(&self) -> Option<&[u8]> {
1837        delegate_iter_call!(self.key())
1838    }
1839    pub fn value(&self) -> Option<&[u8]> {
1840        delegate_iter_call!(self.value())
1841    }
1842    pub fn next(&mut self) {
1843        delegate_iter_call!(self.next())
1844    }
1845    pub fn prev(&mut self) {
1846        delegate_iter_call!(self.prev())
1847    }
1848    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1849        delegate_iter_call!(self.seek(key))
1850    }
1851    pub fn seek_to_last(&mut self) {
1852        delegate_iter_call!(self.seek_to_last())
1853    }
1854    pub fn seek_to_first(&mut self) {
1855        delegate_iter_call!(self.seek_to_first())
1856    }
1857    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1858        delegate_iter_call!(self.seek_for_prev(key))
1859    }
1860    pub fn status(&self) -> Result<(), rocksdb::Error> {
1861        delegate_iter_call!(self.status())
1862    }
1863}
1864
1865pub enum RocksDBIter<'a> {
1866    DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1867    OptimisticTransactionDB(
1868        rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1869    ),
1870}
1871
1872impl Iterator for RocksDBIter<'_> {
1873    type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1874    fn next(&mut self) -> Option<Self::Item> {
1875        match self {
1876            Self::DB(db) => db.next(),
1877            Self::OptimisticTransactionDB(db) => db.next(),
1878        }
1879    }
1880}
1881
1882impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1883where
1884    K: Serialize + DeserializeOwned,
1885    V: Serialize + DeserializeOwned,
1886{
1887    type Error = TypedStoreError;
1888    type Iterator = Iter<'a, K, V>;
1889    type SafeIterator = SafeIter<'a, K, V>;
1890    type Keys = Keys<'a, K>;
1891    type Values = Values<'a, V>;
1892
1893    #[instrument(level = "trace", skip_all, err)]
1894    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1895        let key_buf = be_fix_int_ser(key)?;
1896        // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
1897        // but no false negatives. We use it to short-circuit the absent case
1898        let readopts = self.opts.readopts();
1899        Ok(self
1900            .rocksdb
1901            .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1902            && self
1903                .rocksdb
1904                .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1905                .map_err(typed_store_err_from_rocks_err)?
1906                .is_some())
1907    }
1908
1909    #[instrument(level = "trace", skip_all, err)]
1910    fn multi_contains_keys<J>(
1911        &self,
1912        keys: impl IntoIterator<Item = J>,
1913    ) -> Result<Vec<bool>, Self::Error>
1914    where
1915        J: Borrow<K>,
1916    {
1917        let values = self.multi_get_pinned(keys)?;
1918        Ok(values.into_iter().map(|v| v.is_some()).collect())
1919    }
1920
1921    #[instrument(level = "trace", skip_all, err)]
1922    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1923        let _timer = self
1924            .db_metrics
1925            .op_metrics
1926            .rocksdb_get_latency_seconds
1927            .with_label_values(&[&self.cf])
1928            .start_timer();
1929        let perf_ctx = if self.get_sample_interval.sample() {
1930            Some(RocksDBPerfContext)
1931        } else {
1932            None
1933        };
1934        let key_buf = be_fix_int_ser(key)?;
1935        let res = self
1936            .rocksdb
1937            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1938            .map_err(typed_store_err_from_rocks_err)?;
1939        self.db_metrics
1940            .op_metrics
1941            .rocksdb_get_bytes
1942            .with_label_values(&[&self.cf])
1943            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1944        if perf_ctx.is_some() {
1945            self.db_metrics
1946                .read_perf_ctx_metrics
1947                .report_metrics(&self.cf);
1948        }
1949        match res {
1950            Some(data) => Ok(Some(
1951                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1952            )),
1953            None => Ok(None),
1954        }
1955    }
1956
1957    #[instrument(level = "trace", skip_all, err)]
1958    fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
1959        let _timer = self
1960            .db_metrics
1961            .op_metrics
1962            .rocksdb_get_latency_seconds
1963            .with_label_values(&[&self.cf])
1964            .start_timer();
1965        let perf_ctx = if self.get_sample_interval.sample() {
1966            Some(RocksDBPerfContext)
1967        } else {
1968            None
1969        };
1970        let key_buf = be_fix_int_ser(key)?;
1971        let res = self
1972            .rocksdb
1973            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1974            .map_err(typed_store_err_from_rocks_err)?;
1975        self.db_metrics
1976            .op_metrics
1977            .rocksdb_get_bytes
1978            .with_label_values(&[&self.cf])
1979            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1980        if perf_ctx.is_some() {
1981            self.db_metrics
1982                .read_perf_ctx_metrics
1983                .report_metrics(&self.cf);
1984        }
1985        match res {
1986            Some(data) => Ok(Some(data.to_vec())),
1987            None => Ok(None),
1988        }
1989    }
1990
1991    #[instrument(level = "trace", skip_all, err)]
1992    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1993        let timer = self
1994            .db_metrics
1995            .op_metrics
1996            .rocksdb_put_latency_seconds
1997            .with_label_values(&[&self.cf])
1998            .start_timer();
1999        let perf_ctx = if self.write_sample_interval.sample() {
2000            Some(RocksDBPerfContext)
2001        } else {
2002            None
2003        };
2004        let key_buf = be_fix_int_ser(key)?;
2005        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
2006        self.db_metrics
2007            .op_metrics
2008            .rocksdb_put_bytes
2009            .with_label_values(&[&self.cf])
2010            .observe((key_buf.len() + value_buf.len()) as f64);
2011        if perf_ctx.is_some() {
2012            self.db_metrics
2013                .write_perf_ctx_metrics
2014                .report_metrics(&self.cf);
2015        }
2016        self.rocksdb
2017            .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
2018            .map_err(typed_store_err_from_rocks_err)?;
2019
2020        let elapsed = timer.stop_and_record();
2021        if elapsed > 1.0 {
2022            warn!(?elapsed, cf = ?self.cf, "very slow insert");
2023            self.db_metrics
2024                .op_metrics
2025                .rocksdb_very_slow_puts_count
2026                .with_label_values(&[&self.cf])
2027                .inc();
2028            self.db_metrics
2029                .op_metrics
2030                .rocksdb_very_slow_puts_duration_ms
2031                .with_label_values(&[&self.cf])
2032                .inc_by((elapsed * 1000.0) as u64);
2033        }
2034
2035        Ok(())
2036    }
2037
2038    #[instrument(level = "trace", skip_all, err)]
2039    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
2040        let _timer = self
2041            .db_metrics
2042            .op_metrics
2043            .rocksdb_delete_latency_seconds
2044            .with_label_values(&[&self.cf])
2045            .start_timer();
2046        let perf_ctx = if self.write_sample_interval.sample() {
2047            Some(RocksDBPerfContext)
2048        } else {
2049            None
2050        };
2051        let key_buf = be_fix_int_ser(key)?;
2052        self.rocksdb
2053            .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2054            .map_err(typed_store_err_from_rocks_err)?;
2055        self.db_metrics
2056            .op_metrics
2057            .rocksdb_deletes
2058            .with_label_values(&[&self.cf])
2059            .inc();
2060        if perf_ctx.is_some() {
2061            self.db_metrics
2062                .write_perf_ctx_metrics
2063                .report_metrics(&self.cf);
2064        }
2065        Ok(())
2066    }
2067
2068    /// Deletes a range of keys between `from` (inclusive) and `to`
2069    /// (non-inclusive) by immediately deleting any sst files whose key
2070    /// range overlaps with the range. Files whose range only partially
2071    /// overlaps with the range are not deleted. This can be useful for
2072    /// quickly removing a large amount of data without having
2073    /// to delete individual keys. Only files at level 1 or higher are
2074    /// considered ( Level 0 files are skipped). It doesn't guarantee that
2075    /// all keys in the range are deleted, as there might be keys in files
2076    /// that weren't entirely within the range.
2077    #[instrument(level = "trace", skip_all, err)]
2078    fn delete_file_in_range(&self, from: &K, to: &K) -> Result<(), TypedStoreError> {
2079        let from_buf = be_fix_int_ser(from.borrow())?;
2080        let to_buf = be_fix_int_ser(to.borrow())?;
2081        self.rocksdb
2082            .delete_file_in_range(&self.cf(), from_buf, to_buf)
2083            .map_err(typed_store_err_from_rocks_err)?;
2084        Ok(())
2085    }
2086
2087    /// This method first drops the existing column family and then creates a
2088    /// new one with the same name. The two operations are not atomic and
2089    /// hence it is possible to get into a race condition where the column
2090    /// family has been dropped but new one is not created yet
2091    #[instrument(level = "trace", skip_all, err)]
2092    fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2093        let _ = self.rocksdb.drop_cf(&self.cf);
2094        self.rocksdb
2095            .create_cf(self.cf.clone(), &default_db_options().options)
2096            .map_err(typed_store_err_from_rocks_err)?;
2097        Ok(())
2098    }
2099
2100    /// Writes a range delete tombstone to delete all entries in the db map
2101    /// If the DBMap is configured with ignore_range_deletions set to false,
2102    /// the effect of this write will be visible immediately i.e. you won't
2103    /// see old values when you do a lookup or scan. But if it is configured
2104    /// with ignore_range_deletions set to true, the old value are visible until
2105    /// compaction actually deletes them which will happen sometime after. By
2106    /// default ignore_range_deletions is set to true on a DBMap (unless it is
2107    /// overridden in the config), so please use this function with caution
2108    #[instrument(level = "trace", skip_all, err)]
2109    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2110        let mut iter = self.unbounded_iter().seek_to_first();
2111        let first_key = iter.next().map(|(k, _v)| k);
2112        let last_key = iter.skip_to_last().next().map(|(k, _v)| k);
2113        if let Some((first_key, last_key)) = first_key.zip(last_key) {
2114            let mut batch = self.batch();
2115            batch.schedule_delete_range(self, &first_key, &last_key)?;
2116            batch.write()?;
2117        }
2118        Ok(())
2119    }
2120
2121    fn is_empty(&self) -> bool {
2122        self.safe_iter().next().is_none()
2123    }
2124
2125    /// Returns an unbounded iterator visiting each key-value pair in the map.
2126    /// This is potentially unsafe as it can perform a full table scan
2127    fn unbounded_iter(&'a self) -> Self::Iterator {
2128        let db_iter = self
2129            .rocksdb
2130            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2131        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2132        Iter::new(
2133            self.cf.clone(),
2134            db_iter,
2135            _timer,
2136            _perf_ctx,
2137            bytes_scanned,
2138            keys_scanned,
2139            Some(self.db_metrics.clone()),
2140        )
2141    }
2142
2143    /// Returns an iterator visiting each key-value pair in the map. By proving
2144    /// bounds of the scan range, RocksDB scan avoid unnecessary scans.
2145    /// Lower bound is inclusive, while upper bound is exclusive.
2146    fn iter_with_bounds(
2147        &'a self,
2148        lower_bound: Option<K>,
2149        upper_bound: Option<K>,
2150    ) -> Self::Iterator {
2151        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2152        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2153        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2154        Iter::new(
2155            self.cf.clone(),
2156            db_iter,
2157            _timer,
2158            _perf_ctx,
2159            bytes_scanned,
2160            keys_scanned,
2161            Some(self.db_metrics.clone()),
2162        )
2163    }
2164
2165    /// Similar to `iter_with_bounds` but allows specifying
2166    /// inclusivity/exclusivity of ranges explicitly. TODO: find better name
2167    fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2168        let readopts = self.create_read_options_with_range(range);
2169        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2170        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2171        Iter::new(
2172            self.cf.clone(),
2173            db_iter,
2174            _timer,
2175            _perf_ctx,
2176            bytes_scanned,
2177            keys_scanned,
2178            Some(self.db_metrics.clone()),
2179        )
2180    }
2181
2182    fn safe_iter(&'a self) -> Self::SafeIterator {
2183        let db_iter = self
2184            .rocksdb
2185            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2186        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2187        SafeIter::new(
2188            self.cf.clone(),
2189            db_iter,
2190            _timer,
2191            _perf_ctx,
2192            bytes_scanned,
2193            keys_scanned,
2194            Some(self.db_metrics.clone()),
2195        )
2196    }
2197
2198    fn safe_iter_with_bounds(
2199        &'a self,
2200        lower_bound: Option<K>,
2201        upper_bound: Option<K>,
2202    ) -> Self::SafeIterator {
2203        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2204        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2205        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2206        SafeIter::new(
2207            self.cf.clone(),
2208            db_iter,
2209            _timer,
2210            _perf_ctx,
2211            bytes_scanned,
2212            keys_scanned,
2213            Some(self.db_metrics.clone()),
2214        )
2215    }
2216
2217    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2218        let readopts = self.create_read_options_with_range(range);
2219        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2220        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2221        SafeIter::new(
2222            self.cf.clone(),
2223            db_iter,
2224            _timer,
2225            _perf_ctx,
2226            bytes_scanned,
2227            keys_scanned,
2228            Some(self.db_metrics.clone()),
2229        )
2230    }
2231
2232    fn keys(&'a self) -> Self::Keys {
2233        let mut db_iter = self
2234            .rocksdb
2235            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2236        db_iter.seek_to_first();
2237
2238        Keys::new(db_iter)
2239    }
2240
2241    fn values(&'a self) -> Self::Values {
2242        let mut db_iter = self
2243            .rocksdb
2244            .raw_iterator_cf(&self.cf(), self.opts.readopts());
2245        db_iter.seek_to_first();
2246
2247        Values::new(db_iter)
2248    }
2249
2250    /// Returns a vector of raw values corresponding to the keys provided.
2251    #[instrument(level = "trace", skip_all, err)]
2252    fn multi_get_raw_bytes<J>(
2253        &self,
2254        keys: impl IntoIterator<Item = J>,
2255    ) -> Result<Vec<Option<Vec<u8>>>, TypedStoreError>
2256    where
2257        J: Borrow<K>,
2258    {
2259        let results = self
2260            .multi_get_pinned(keys)?
2261            .into_iter()
2262            .map(|val| val.map(|v| v.to_vec()))
2263            .collect();
2264        Ok(results)
2265    }
2266
2267    /// Returns a vector of values corresponding to the keys provided.
2268    #[instrument(level = "trace", skip_all, err)]
2269    fn multi_get<J>(
2270        &self,
2271        keys: impl IntoIterator<Item = J>,
2272    ) -> Result<Vec<Option<V>>, TypedStoreError>
2273    where
2274        J: Borrow<K>,
2275    {
2276        let results = self.multi_get_pinned(keys)?;
2277        let values_parsed: Result<Vec<_>, TypedStoreError> = results
2278            .into_iter()
2279            .map(|value_byte| match value_byte {
2280                Some(data) => Ok(Some(
2281                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2282                )),
2283                None => Ok(None),
2284            })
2285            .collect();
2286
2287        values_parsed
2288    }
2289
2290    /// Returns a vector of values corresponding to the keys provided.
2291    #[instrument(level = "trace", skip_all, err)]
2292    fn chunked_multi_get<J>(
2293        &self,
2294        keys: impl IntoIterator<Item = J>,
2295        chunk_size: usize,
2296    ) -> Result<Vec<Option<V>>, TypedStoreError>
2297    where
2298        J: Borrow<K>,
2299    {
2300        let cf = self.cf();
2301        let keys_bytes = keys
2302            .into_iter()
2303            .map(|k| (&cf, be_fix_int_ser(k.borrow()).unwrap()));
2304        let chunked_keys = keys_bytes.into_iter().chunks(chunk_size);
2305        let snapshot = self.snapshot()?;
2306        let mut results = vec![];
2307        for chunk in chunked_keys.into_iter() {
2308            let chunk_result = snapshot.multi_get_cf(chunk);
2309            let values_parsed: Result<Vec<_>, TypedStoreError> = chunk_result
2310                .into_iter()
2311                .map(|value_byte| {
2312                    let value_byte = value_byte.map_err(typed_store_err_from_rocks_err)?;
2313                    match value_byte {
2314                        Some(data) => Ok(Some(
2315                            bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2316                        )),
2317                        None => Ok(None),
2318                    }
2319                })
2320                .collect();
2321            results.extend(values_parsed?);
2322        }
2323        Ok(results)
2324    }
2325
2326    /// Convenience method for batch insertion
2327    #[instrument(level = "trace", skip_all, err)]
2328    fn multi_insert<J, U>(
2329        &self,
2330        key_val_pairs: impl IntoIterator<Item = (J, U)>,
2331    ) -> Result<(), Self::Error>
2332    where
2333        J: Borrow<K>,
2334        U: Borrow<V>,
2335    {
2336        let mut batch = self.batch();
2337        batch.insert_batch(self, key_val_pairs)?;
2338        batch.write()
2339    }
2340
2341    /// Convenience method for batch removal
2342    #[instrument(level = "trace", skip_all, err)]
2343    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2344    where
2345        J: Borrow<K>,
2346    {
2347        let mut batch = self.batch();
2348        batch.delete_batch(self, keys)?;
2349        batch.write()
2350    }
2351
2352    /// Try to catch up with primary when running as secondary
2353    #[instrument(level = "trace", skip_all, err)]
2354    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2355        self.rocksdb
2356            .try_catch_up_with_primary()
2357            .map_err(typed_store_err_from_rocks_err)
2358    }
2359}
2360
2361impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2362where
2363    J: Borrow<K>,
2364    U: Borrow<V>,
2365    K: Serialize,
2366    V: Serialize,
2367{
2368    type Error = TypedStoreError;
2369
2370    fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2371    where
2372        T: Iterator<Item = (J, U)>,
2373    {
2374        let mut batch = self.batch();
2375        batch.insert_batch(self, iter)?;
2376        batch.write()
2377    }
2378
2379    fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2380        let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2381        let mut batch = self.batch();
2382        batch.insert_batch(self, slice_of_refs)?;
2383        batch.write()
2384    }
2385}
2386
2387pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2388    env::var(var_name)
2389        .ok()?
2390        .parse::<usize>()
2391        .tap_err(|e| {
2392            warn!(
2393                "Env var {} does not contain valid usize integer: {}",
2394                var_name, e
2395            )
2396        })
2397        .ok()
2398}
2399
2400#[derive(Clone, Debug)]
2401pub struct ReadWriteOptions {
2402    pub ignore_range_deletions: bool,
2403    // Whether to sync to disk on every write.
2404    sync_to_disk: bool,
2405}
2406
2407impl ReadWriteOptions {
2408    pub fn readopts(&self) -> ReadOptions {
2409        let mut readopts = ReadOptions::default();
2410        readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2411        readopts
2412    }
2413
2414    pub fn writeopts(&self) -> WriteOptions {
2415        let mut opts = WriteOptions::default();
2416        opts.set_sync(self.sync_to_disk);
2417        opts
2418    }
2419
2420    pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2421        self.ignore_range_deletions = ignore;
2422        self
2423    }
2424}
2425
2426impl Default for ReadWriteOptions {
2427    fn default() -> Self {
2428        Self {
2429            ignore_range_deletions: true,
2430            sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2431        }
2432    }
2433}
2434// TODO: refactor this into a builder pattern, where rocksdb::Options are
2435// generated after a call to build().
2436#[derive(Default, Clone)]
2437pub struct DBOptions {
2438    pub options: rocksdb::Options,
2439    pub rw_options: ReadWriteOptions,
2440}
2441
2442impl DBOptions {
2443    // Optimize lookup perf for tables where no scans are performed.
2444    // If non-trivial number of values can be > 512B in size, it is beneficial to
2445    // also specify optimize_for_large_values_no_scan().
2446    pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2447        // NOTE: this overwrites the block options.
2448        self.options
2449            .optimize_for_point_lookup(block_cache_size_mb as u64);
2450        self
2451    }
2452
2453    // Optimize write and lookup perf for tables which are rarely scanned, and have
2454    // large values. https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html
2455    pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2456        if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2457            info!("Large value blob storage optimization is disabled via env var.");
2458            return self;
2459        }
2460
2461        // Blob settings.
2462        self.options.set_enable_blob_files(true);
2463        self.options
2464            .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2465        self.options.set_enable_blob_gc(true);
2466        // Since each blob can have non-trivial size overhead, and compression does not
2467        // work across blobs, set a min blob size in bytes to so small
2468        // transactions and effects are kept in sst files.
2469        self.options.set_min_blob_size(min_blob_size);
2470
2471        // Increase write buffer size to 256MiB.
2472        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2473            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2474            * 1024
2475            * 1024;
2476        self.options.set_write_buffer_size(write_buffer_size);
2477        // Since large blobs are not in sst files, reduce the target file size and base
2478        // level target size.
2479        let target_file_size_base = 64 << 20;
2480        self.options
2481            .set_target_file_size_base(target_file_size_base);
2482        // Level 1 default to 64MiB * 4 ~ 256MiB.
2483        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2484            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2485        self.options
2486            .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2487
2488        self
2489    }
2490
2491    // Optimize tables with a mix of lookup and scan workloads.
2492    pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2493        self.options
2494            .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2495        self
2496    }
2497
2498    // Optimize DB receiving significant insertions.
2499    pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2500        self.options
2501            .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2502        self.options
2503            .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2504        self
2505    }
2506
2507    // Optimize tables receiving significant insertions.
2508    pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2509        // Increase write buffer size to 256MiB.
2510        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2511            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2512            * 1024
2513            * 1024;
2514        self.options.set_write_buffer_size(write_buffer_size);
2515        // Increase write buffers to keep to 6 before slowing down writes.
2516        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2517            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2518        self.options
2519            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2520        // Keep 1 write buffer so recent writes can be read from memory.
2521        self.options
2522            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2523
2524        // Increase compaction trigger for level 0 to 6.
2525        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2526            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2527        self.options.set_level_zero_file_num_compaction_trigger(
2528            max_level_zero_file_num.try_into().unwrap(),
2529        );
2530        self.options.set_level_zero_slowdown_writes_trigger(
2531            (max_level_zero_file_num * 12).try_into().unwrap(),
2532        );
2533        self.options
2534            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2535
2536        // Increase sst file size to 128MiB.
2537        self.options.set_target_file_size_base(
2538            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2539                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2540                * 1024
2541                * 1024,
2542        );
2543
2544        // Increase level 1 target size to 256MiB * 6 ~ 1.5GiB.
2545        self.options
2546            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2547
2548        self
2549    }
2550
2551    // Optimize tables receiving significant insertions, without any deletions.
2552    // TODO: merge this function with optimize_for_write_throughput(), and use a
2553    // flag to indicate if deletion is received.
2554    pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2555        // Increase write buffer size to 256MiB.
2556        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2557            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2558            * 1024
2559            * 1024;
2560        self.options.set_write_buffer_size(write_buffer_size);
2561        // Increase write buffers to keep to 6 before slowing down writes.
2562        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2563            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2564        self.options
2565            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2566        // Keep 1 write buffer so recent writes can be read from memory.
2567        self.options
2568            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2569
2570        // Switch to universal compactions.
2571        self.options
2572            .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2573        let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2574        compaction_options.set_max_size_amplification_percent(10000);
2575        compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2576        self.options
2577            .set_universal_compaction_options(&compaction_options);
2578
2579        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2580            .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2581        self.options.set_level_zero_file_num_compaction_trigger(
2582            max_level_zero_file_num.try_into().unwrap(),
2583        );
2584        self.options.set_level_zero_slowdown_writes_trigger(
2585            (max_level_zero_file_num * 12).try_into().unwrap(),
2586        );
2587        self.options
2588            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2589
2590        // Increase sst file size to 128MiB.
2591        self.options.set_target_file_size_base(
2592            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2593                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2594                * 1024
2595                * 1024,
2596        );
2597
2598        // This should be a no-op for universal compaction but increasing it to be safe.
2599        self.options
2600            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2601
2602        self
2603    }
2604
2605    // Overrides the block options with different block cache size and block size.
2606    pub fn set_block_options(
2607        mut self,
2608        block_cache_size_mb: usize,
2609        block_size_bytes: usize,
2610    ) -> DBOptions {
2611        self.options
2612            .set_block_based_table_factory(&get_block_options(
2613                block_cache_size_mb,
2614                block_size_bytes,
2615            ));
2616        self
2617    }
2618
2619    // Disables write stalling and stopping based on pending compaction bytes.
2620    pub fn disable_write_throttling(mut self) -> DBOptions {
2621        self.options.set_soft_pending_compaction_bytes_limit(0);
2622        self.options.set_hard_pending_compaction_bytes_limit(0);
2623        self
2624    }
2625}
2626
2627/// Creates a default RocksDB option, to be used when RocksDB option is
2628/// unspecified.
2629pub fn default_db_options() -> DBOptions {
2630    let mut opt = rocksdb::Options::default();
2631
2632    // One common issue when running tests on Mac is that the default ulimit is too
2633    // low, leading to I/O errors such as "Too many open files". Raising fdlimit
2634    // to bypass it.
2635    if let Some(limit) = fdlimit::raise_fd_limit() {
2636        // on windows raise_fd_limit return None
2637        opt.set_max_open_files((limit / 8) as i32);
2638    }
2639
2640    // The table cache is locked for updates and this determines the number
2641    // of shards, ie 2^10. Increase in case of lock contentions.
2642    opt.set_table_cache_num_shard_bits(10);
2643
2644    // LSM compression settings
2645    opt.set_min_level_to_compress(2);
2646    opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2647    opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2648    opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2649
2650    // IOTA uses multiple RocksDB in a node, so total sizes of write buffers and WAL
2651    // can be higher than the limits below.
2652    //
2653    // RocksDB also exposes the option to configure total write buffer size across
2654    // multiple instances via `write_buffer_manager`. But the write buffer flush
2655    // policy (flushing the buffer receiving the next write) may not work well.
2656    // So sticking to per-db write buffer size limit for now.
2657    //
2658    // The environment variables are only meant to be emergency overrides. They may
2659    // go away in future. It is preferable to update the default value, or
2660    // override the option in code.
2661    opt.set_db_write_buffer_size(
2662        read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2663            * 1024
2664            * 1024,
2665    );
2666    opt.set_max_total_wal_size(
2667        read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2668    );
2669
2670    // Num threads for compactions and memtable flushes.
2671    opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2672
2673    opt.set_enable_pipelined_write(true);
2674
2675    // Increase block size to 16KiB.
2676    // https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md#indexes-and-filter-blocks
2677    opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2678
2679    // Set memtable bloomfilter.
2680    opt.set_memtable_prefix_bloom_ratio(0.02);
2681
2682    DBOptions {
2683        options: opt,
2684        rw_options: ReadWriteOptions::default(),
2685    }
2686}
2687
2688fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2689    // Set options mostly similar to those used in optimize_for_point_lookup(),
2690    // except non-default binary and hash index, to hopefully reduce lookup
2691    // latencies without causing any regression for scanning, with slightly more
2692    // memory usages. https://github.com/facebook/rocksdb/blob/11cb6af6e5009c51794641905ca40ce5beec7fee/options/options.cc#L611-L621
2693    let mut block_options = BlockBasedOptions::default();
2694    // Overrides block size.
2695    block_options.set_block_size(block_size_bytes);
2696    // Configure a block cache.
2697    block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2698    // Set a bloomfilter with 1% false positive rate.
2699    block_options.set_bloom_filter(10.0, false);
2700    // From https://github.com/EighteenZi/rocksdb_wiki/blob/master/Block-Cache.md#caching-index-and-filter-blocks
2701    block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2702    block_options
2703}
2704
2705/// Opens a database with options, and a number of column families that are
2706/// created if they do not exist.
2707#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2708pub fn open_cf<P: AsRef<Path>>(
2709    path: P,
2710    db_options: Option<rocksdb::Options>,
2711    metric_conf: MetricConf,
2712    opt_cfs: &[&str],
2713) -> Result<Arc<RocksDB>, TypedStoreError> {
2714    let options = db_options.unwrap_or_else(|| default_db_options().options);
2715    let column_descriptors: Vec<_> = opt_cfs
2716        .iter()
2717        .map(|name| (*name, options.clone()))
2718        .collect();
2719    open_cf_opts(
2720        path,
2721        Some(options.clone()),
2722        metric_conf,
2723        &column_descriptors[..],
2724    )
2725}
2726
2727fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2728    // Customize database options
2729    let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2730    options.create_if_missing(true);
2731    options.create_missing_column_families(true);
2732    options
2733}
2734
2735/// Opens a database with options, and a number of column families with
2736/// individual options that are created if they do not exist.
2737#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2738pub fn open_cf_opts<P: AsRef<Path>>(
2739    path: P,
2740    db_options: Option<rocksdb::Options>,
2741    metric_conf: MetricConf,
2742    opt_cfs: &[(&str, rocksdb::Options)],
2743) -> Result<Arc<RocksDB>, TypedStoreError> {
2744    let path = path.as_ref();
2745    // In the simulator, we intercept the wall clock in the test thread only. This
2746    // causes problems because rocksdb uses the simulated clock when creating
2747    // its background threads, but then those threads see the real wall clock
2748    // (because they are not the test thread), which causes rocksdb to panic.
2749    // The `nondeterministic` macro evaluates expressions in new threads, which
2750    // resolves the issue.
2751    //
2752    // This is a no-op in non-simulator builds.
2753
2754    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2755    nondeterministic!({
2756        let options = prepare_db_options(db_options);
2757        let rocksdb = {
2758            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2759                &options,
2760                path,
2761                cfs.into_iter()
2762                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2763            )
2764            .map_err(typed_store_err_from_rocks_err)?
2765        };
2766        Ok(Arc::new(RocksDB::DBWithThreadMode(
2767            DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2768        )))
2769    })
2770}
2771
2772/// Opens a database with options, and a number of column families with
2773/// individual options that are created if they do not exist.
2774#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2775pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2776    path: P,
2777    db_options: Option<rocksdb::Options>,
2778    metric_conf: MetricConf,
2779    opt_cfs: &[(&str, rocksdb::Options)],
2780) -> Result<Arc<RocksDB>, TypedStoreError> {
2781    let path = path.as_ref();
2782    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2783    // See comment above for explanation of why nondeterministic is necessary here.
2784    nondeterministic!({
2785        let options = prepare_db_options(db_options);
2786        let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2787            &options,
2788            path,
2789            cfs.into_iter()
2790                .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2791        )
2792        .map_err(typed_store_err_from_rocks_err)?;
2793        Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2794            OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2795        )))
2796    })
2797}
2798
2799/// Opens a database with options, and a number of column families with
2800/// individual options that are created if they do not exist.
2801pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2802    primary_path: P,
2803    secondary_path: Option<P>,
2804    db_options: Option<rocksdb::Options>,
2805    metric_conf: MetricConf,
2806    opt_cfs: &[(&str, rocksdb::Options)],
2807) -> Result<Arc<RocksDB>, TypedStoreError> {
2808    let primary_path = primary_path.as_ref();
2809    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2810    // See comment above for explanation of why nondeterministic is necessary here.
2811    nondeterministic!({
2812        // Customize database options
2813        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2814
2815        fdlimit::raise_fd_limit();
2816        // This is a requirement by RocksDB when opening as secondary
2817        options.set_max_open_files(-1);
2818
2819        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2820        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2821            .ok()
2822            .unwrap_or_default();
2823
2824        let default_db_options = default_db_options();
2825        // Add CFs not explicitly listed
2826        for cf_key in cfs.iter() {
2827            if !opt_cfs.contains_key(&cf_key[..]) {
2828                opt_cfs.insert(cf_key, default_db_options.options.clone());
2829            }
2830        }
2831
2832        let primary_path = primary_path.to_path_buf();
2833        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2834            let mut s = primary_path.clone();
2835            s.pop();
2836            s.push("SECONDARY");
2837            s.as_path().to_path_buf()
2838        });
2839
2840        let rocksdb = {
2841            options.create_if_missing(true);
2842            options.create_missing_column_families(true);
2843            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2844                &options,
2845                &primary_path,
2846                &secondary_path,
2847                opt_cfs
2848                    .iter()
2849                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2850            )
2851            .map_err(typed_store_err_from_rocks_err)?;
2852            db.try_catch_up_with_primary()
2853                .map_err(typed_store_err_from_rocks_err)?;
2854            db
2855        };
2856        Ok(Arc::new(RocksDB::DBWithThreadMode(
2857            DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2858        )))
2859    })
2860}
2861
2862pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2863    const DB_DEFAULT_CF_NAME: &str = "default";
2864
2865    let opts = rocksdb::Options::default();
2866    rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2867        .map_err(|e| e.into())
2868        .map(|q| {
2869            q.iter()
2870                .filter_map(|s| {
2871                    // The `default` table is not used
2872                    if s != DB_DEFAULT_CF_NAME {
2873                        Some(s.clone())
2874                    } else {
2875                        None
2876                    }
2877                })
2878                .collect()
2879        })
2880}
2881
2882/// TODO: Good description of why we're doing this : RocksDB stores keys in BE and has a seek operator on iterators, see `https://github.com/facebook/rocksdb/wiki/Iterator#introduction`
2883#[inline]
2884pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2885where
2886    S: ?Sized + serde::Serialize,
2887{
2888    bincode::DefaultOptions::new()
2889        .with_big_endian()
2890        .with_fixint_encoding()
2891        .serialize(t)
2892        .map_err(typed_store_err_from_bincode_err)
2893}
2894
2895#[derive(Clone)]
2896pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2897impl DBMapTableConfigMap {
2898    pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2899        Self(map)
2900    }
2901
2902    pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2903        self.0.clone()
2904    }
2905}
2906
2907pub enum RocksDBAccessType {
2908    Primary,
2909    Secondary(Option<PathBuf>),
2910}
2911
2912pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2913    rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2914}
2915
2916fn populate_missing_cfs(
2917    input_cfs: &[(&str, rocksdb::Options)],
2918    path: &Path,
2919) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2920    let mut cfs = vec![];
2921    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2922    let existing_cfs =
2923        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2924            .ok()
2925            .unwrap_or_default();
2926
2927    for cf_name in existing_cfs {
2928        if !input_cf_index.contains(&cf_name[..]) {
2929            cfs.push((cf_name, rocksdb::Options::default()));
2930        }
2931    }
2932    cfs.extend(
2933        input_cfs
2934            .iter()
2935            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2936    );
2937    Ok(cfs)
2938}
2939
2940/// Given a vec<u8>, find the value which is one more than the vector
2941/// if the vector was a big endian number.
2942/// If the vector is already minimum, don't change it.
2943fn big_endian_saturating_add_one(v: &mut [u8]) {
2944    if is_max(v) {
2945        return;
2946    }
2947    for i in (0..v.len()).rev() {
2948        if v[i] == u8::MAX {
2949            v[i] = 0;
2950        } else {
2951            v[i] += 1;
2952            break;
2953        }
2954    }
2955}
2956
2957/// Check if all the bytes in the vector are 0xFF
2958fn is_max(v: &[u8]) -> bool {
2959    v.iter().all(|&x| x == u8::MAX)
2960}
2961
2962// `clippy::manual_div_ceil` is raised by code expanded by the
2963// `uint::construct_uint!` macro so it needs to be fixed by `uint`
2964#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2965#[test]
2966fn test_helpers() {
2967    let v = vec![];
2968    assert!(is_max(&v));
2969
2970    fn check_add(v: Vec<u8>) {
2971        let mut v = v;
2972        let num = Num32::from_big_endian(&v);
2973        big_endian_saturating_add_one(&mut v);
2974        assert!(num + 1 == Num32::from_big_endian(&v));
2975    }
2976
2977    uint::construct_uint! {
2978        // 32 byte number
2979        struct Num32(4);
2980    }
2981
2982    let mut v = vec![255; 32];
2983    big_endian_saturating_add_one(&mut v);
2984    assert!(Num32::MAX == Num32::from_big_endian(&v));
2985
2986    check_add(vec![1; 32]);
2987    check_add(vec![6; 32]);
2988    check_add(vec![254; 32]);
2989
2990    // TBD: More tests coming with randomized arrays
2991}