typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod errors;
6pub(crate) mod safe_iter;
7
8use std::{
9    borrow::Borrow,
10    collections::{BTreeMap, HashSet},
11    env,
12    ffi::CStr,
13    marker::PhantomData,
14    ops::{Bound, RangeBounds},
15    path::{Path, PathBuf},
16    sync::Arc,
17    time::Duration,
18};
19
20use backoff::backoff::Backoff;
21use bincode::Options;
22use iota_macros::{fail_point, nondeterministic};
23use prometheus::{Histogram, HistogramTimer};
24use rocksdb::{
25    AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
26    ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, LiveFile,
27    MultiThreaded, OptimisticTransactionDB, ReadOptions, SnapshotWithThreadMode, WriteBatch,
28    WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
29};
30use serde::{Serialize, de::DeserializeOwned};
31use tap::TapFallible;
32use tokio::sync::oneshot;
33use tracing::{debug, error, info, instrument, warn};
34
35use crate::{
36    TypedStoreError,
37    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
38    rocks::{
39        errors::{
40            typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
41            typed_store_err_from_rocks_err,
42        },
43        safe_iter::{SafeIter, SafeRevIter},
44    },
45    traits::{Map, TableSummary},
46};
47
48// Write buffer size per RocksDB instance can be set via the env var below.
49// If the env var is not set, use the default value in MiB.
50const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
51const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
52
53// Write ahead log size per RocksDB instance can be set via the env var below.
54// If the env var is not set, use the default value in MiB.
55const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
56const DEFAULT_DB_WAL_SIZE: usize = 1024;
57
58// Environment variable to control behavior of write throughput optimized
59// tables.
60const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
61const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
62const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
63const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
64const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
65const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
66const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
67const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
68const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
69
70// Set to 1 to disable blob storage for transactions and effects.
71const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
72
73const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
74
75// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property
76// built-in. From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
77const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
78    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
79
80const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
81
82#[cfg(test)]
83mod tests;
84
85/// A helper macro to reopen multiple column families. The macro returns
86/// a tuple of DBMap structs in the same order that the column families
87/// are defined.
88///
89/// # Arguments
90///
91/// * `db` - a reference to a rocks DB object
92/// * `cf;<ty,ty>` - a comma separated list of column families to open. For each
93///   column family a concatenation of column family name (cf) and Key-Value
94///   <ty, ty> should be provided.
95///
96/// # Examples
97///
98/// We successfully open two different column families.
99/// ```
100/// use typed_store::reopen;
101/// use typed_store::rocks::*;
102/// use tempfile::tempdir;
103/// use prometheus::Registry;
104/// use std::sync::Arc;
105/// use typed_store::metrics::DBMetrics;
106/// use core::fmt::Error;
107///
108/// #[tokio::main]
109/// async fn main() -> Result<(), Error> {
110/// const FIRST_CF: &str = "First_CF";
111/// const SECOND_CF: &str = "Second_CF";
112///
113///
114/// /// Create the rocks database reference for the desired column families
115/// let rocks = open_cf(tempdir().unwrap(), None, MetricConf::default(), &[FIRST_CF, SECOND_CF]).unwrap();
116///
117/// /// Now simply open all the column families for their expected Key-Value types
118/// let (db_map_1, db_map_2) = reopen!(&rocks, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
119/// Ok(())
120/// }
121/// ```
122#[macro_export]
123macro_rules! reopen {
124    ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
125        (
126            $(
127                DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
128            ),*
129        )
130    };
131}
132
133#[derive(Debug)]
134pub struct RocksDB {
135    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
136    pub metric_conf: MetricConf,
137    pub db_path: PathBuf,
138}
139
140impl Drop for RocksDB {
141    fn drop(&mut self) {
142        self.underlying.cancel_all_background_work(/* wait */ true);
143        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
144    }
145}
146
147impl RocksDB {
148    fn new(
149        underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
150        metric_conf: MetricConf,
151        db_path: PathBuf,
152    ) -> Self {
153        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
154        Self {
155            underlying,
156            metric_conf,
157            db_path,
158        }
159    }
160
161    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
162        self.underlying.get(key)
163    }
164
165    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
166        &'a self,
167        keys: I,
168        readopts: &ReadOptions,
169    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
170    where
171        K: AsRef<[u8]>,
172        I: IntoIterator<Item = (&'b W, K)>,
173        W: 'b + AsColumnFamilyRef,
174    {
175        self.underlying.multi_get_cf_opt(keys, readopts)
176    }
177
178    pub fn batched_multi_get_cf_opt<I, K>(
179        &self,
180        cf: &impl AsColumnFamilyRef,
181        keys: I,
182        sorted_input: bool,
183        readopts: &ReadOptions,
184    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
185    where
186        I: IntoIterator<Item = K>,
187        K: AsRef<[u8]>,
188    {
189        self.underlying
190            .batched_multi_get_cf_opt(cf, keys, sorted_input, readopts)
191    }
192
193    pub fn property_int_value_cf(
194        &self,
195        cf: &impl AsColumnFamilyRef,
196        name: impl CStrLike,
197    ) -> Result<Option<u64>, rocksdb::Error> {
198        self.underlying.property_int_value_cf(cf, name)
199    }
200
201    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
202        &self,
203        cf: &impl AsColumnFamilyRef,
204        key: K,
205        readopts: &ReadOptions,
206    ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
207        self.underlying.get_pinned_cf_opt(cf, key, readopts)
208    }
209
210    pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
211        self.underlying.cf_handle(name)
212    }
213
214    pub fn create_cf<N: AsRef<str>>(
215        &self,
216        name: N,
217        opts: &rocksdb::Options,
218    ) -> Result<(), rocksdb::Error> {
219        self.underlying.create_cf(name, opts)
220    }
221
222    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
223        self.underlying.drop_cf(name)
224    }
225
226    pub fn delete_cf<K: AsRef<[u8]>>(
227        &self,
228        cf: &impl AsColumnFamilyRef,
229        key: K,
230        writeopts: &WriteOptions,
231    ) -> Result<(), rocksdb::Error> {
232        fail_point!("delete-cf-before");
233        let ret = self.underlying.delete_cf_opt(cf, key, writeopts);
234        fail_point!("delete-cf-after");
235        #[expect(clippy::let_and_return)]
236        ret
237    }
238
239    pub fn path(&self) -> &Path {
240        self.underlying.path()
241    }
242
243    pub fn put_cf<K, V>(
244        &self,
245        cf: &impl AsColumnFamilyRef,
246        key: K,
247        value: V,
248        writeopts: &WriteOptions,
249    ) -> Result<(), rocksdb::Error>
250    where
251        K: AsRef<[u8]>,
252        V: AsRef<[u8]>,
253    {
254        fail_point!("put-cf-before");
255        let ret = self.underlying.put_cf_opt(cf, key, value, writeopts);
256        fail_point!("put-cf-after");
257        #[expect(clippy::let_and_return)]
258        ret
259    }
260
261    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
262        &self,
263        cf: &impl AsColumnFamilyRef,
264        key: K,
265        readopts: &ReadOptions,
266    ) -> bool {
267        self.underlying.key_may_exist_cf_opt(cf, key, readopts)
268    }
269
270    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
271        self.underlying.try_catch_up_with_primary()
272    }
273
274    pub fn write(
275        &self,
276        batch: rocksdb::WriteBatch,
277        writeopts: &WriteOptions,
278    ) -> Result<(), TypedStoreError> {
279        fail_point!("batch-write-before");
280        self.underlying
281            .write_opt(batch, writeopts)
282            .map_err(typed_store_err_from_rocks_err)?;
283        fail_point!("batch-write-after");
284
285        Ok(())
286    }
287
288    pub fn raw_iterator_cf<'a: 'b, 'b>(
289        &'a self,
290        cf_handle: &impl AsColumnFamilyRef,
291        readopts: ReadOptions,
292    ) -> RocksDBRawIter<'b> {
293        self.underlying.raw_iterator_cf_opt(cf_handle, readopts)
294    }
295
296    pub fn compact_range_cf<K: AsRef<[u8]>>(
297        &self,
298        cf: &impl AsColumnFamilyRef,
299        start: Option<K>,
300        end: Option<K>,
301    ) {
302        self.underlying.compact_range_cf(cf, start, end)
303    }
304
305    pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
306        &self,
307        cf: &impl AsColumnFamilyRef,
308        start: Option<K>,
309        end: Option<K>,
310    ) {
311        let opt = &mut CompactOptions::default();
312        opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
313        self.underlying.compact_range_cf_opt(cf, start, end, opt)
314    }
315
316    pub fn flush(&self) -> Result<(), TypedStoreError> {
317        self.underlying
318            .flush()
319            .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
320    }
321
322    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
323        let checkpoint =
324            Checkpoint::new(&self.underlying).map_err(typed_store_err_from_rocks_err)?;
325        checkpoint
326            .create_checkpoint(path)
327            .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
328        Ok(())
329    }
330
331    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
332        self.underlying.flush_cf(cf)
333    }
334
335    pub fn set_options_cf(
336        &self,
337        cf: &impl AsColumnFamilyRef,
338        opts: &[(&str, &str)],
339    ) -> Result<(), rocksdb::Error> {
340        self.underlying.set_options_cf(cf, opts)
341    }
342
343    pub fn get_sampling_interval(&self) -> SamplingInterval {
344        self.metric_conf.read_sample_interval.new_from_self()
345    }
346
347    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
348        self.metric_conf.read_sample_interval.new_from_self()
349    }
350
351    pub fn write_sampling_interval(&self) -> SamplingInterval {
352        self.metric_conf.write_sample_interval.new_from_self()
353    }
354
355    pub fn iter_sampling_interval(&self) -> SamplingInterval {
356        self.metric_conf.iter_sample_interval.new_from_self()
357    }
358
359    pub fn db_name(&self) -> String {
360        let name = &self.metric_conf.db_name;
361        if name.is_empty() {
362            self.default_db_name()
363        } else {
364            name.clone()
365        }
366    }
367
368    fn default_db_name(&self) -> String {
369        self.path()
370            .file_name()
371            .and_then(|f| f.to_str())
372            .unwrap_or("unknown")
373            .to_string()
374    }
375
376    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
377        self.underlying.live_files()
378    }
379}
380
381// Check if the database is corrupted, and if so, panic.
382// If the corrupted key is not set, we set it to [1].
383pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
384    let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
385
386    db.get(DB_CORRUPTED_KEY)
387        .map_err(|e| format!("Failed to open database: {e}"))
388        .and_then(|value| match value {
389            Some(v) if v[0] == 1 => Err(
390                "Database is corrupted, please remove the current database and start clean!"
391                    .to_string(),
392            ),
393            Some(_) => Ok(()),
394            None => db
395                .put(DB_CORRUPTED_KEY, [1])
396                .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
397        })?;
398
399    Ok(())
400}
401
402pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
403    rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
404}
405
406pub enum RocksDBSnapshot<'a> {
407    DBWithThreadMode(rocksdb::Snapshot<'a>),
408    OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
409}
410
411impl<'a> RocksDBSnapshot<'a> {
412    pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
413        &'a self,
414        keys: I,
415        readopts: ReadOptions,
416    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
417    where
418        K: AsRef<[u8]>,
419        I: IntoIterator<Item = (&'b W, K)>,
420        W: 'b + AsColumnFamilyRef,
421    {
422        match self {
423            Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
424            Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
425        }
426    }
427    pub fn multi_get_cf<'b: 'a, K, I, W>(
428        &'a self,
429        keys: I,
430    ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
431    where
432        K: AsRef<[u8]>,
433        I: IntoIterator<Item = (&'b W, K)>,
434        W: 'b + AsColumnFamilyRef,
435    {
436        match self {
437            Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
438            Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
439        }
440    }
441}
442
443#[derive(Debug, Default)]
444pub struct MetricConf {
445    pub db_name: String,
446    pub read_sample_interval: SamplingInterval,
447    pub write_sample_interval: SamplingInterval,
448    pub iter_sample_interval: SamplingInterval,
449}
450
451impl MetricConf {
452    pub fn new(db_name: &str) -> Self {
453        if db_name.is_empty() {
454            error!("A meaningful db name should be used for metrics reporting.")
455        }
456        Self {
457            db_name: db_name.to_string(),
458            read_sample_interval: SamplingInterval::default(),
459            write_sample_interval: SamplingInterval::default(),
460            iter_sample_interval: SamplingInterval::default(),
461        }
462    }
463
464    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
465        Self {
466            db_name: self.db_name,
467            read_sample_interval: read_interval,
468            write_sample_interval: SamplingInterval::default(),
469            iter_sample_interval: SamplingInterval::default(),
470        }
471    }
472}
473const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
474const METRICS_ERROR: i64 = -1;
475
476/// An interface to a rocksDB database, keyed by a columnfamily
477#[derive(Clone, Debug)]
478pub struct DBMap<K, V> {
479    pub rocksdb: Arc<RocksDB>,
480    _phantom: PhantomData<fn(K) -> V>,
481    // the rocksDB ColumnFamily under which the map is stored
482    cf: String,
483    pub opts: ReadWriteOptions,
484    db_metrics: Arc<DBMetrics>,
485    get_sample_interval: SamplingInterval,
486    multiget_sample_interval: SamplingInterval,
487    write_sample_interval: SamplingInterval,
488    iter_sample_interval: SamplingInterval,
489    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
490}
491
492unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
493
494impl<K, V> DBMap<K, V> {
495    pub(crate) fn new(
496        db: Arc<RocksDB>,
497        opts: &ReadWriteOptions,
498        opt_cf: &str,
499        is_deprecated: bool,
500    ) -> Self {
501        let db_cloned = db.clone();
502        let db_metrics = DBMetrics::get();
503        let db_metrics_cloned = db_metrics.clone();
504        let cf = opt_cf.to_string();
505        let (sender, mut recv) = tokio::sync::oneshot::channel();
506        if !is_deprecated {
507            tokio::task::spawn(async move {
508                let mut interval =
509                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
510                loop {
511                    tokio::select! {
512                        _ = interval.tick() => {
513                            let db = db_cloned.clone();
514                            let cf = cf.clone();
515                            let db_metrics = db_metrics.clone();
516                            if let Err(e) = tokio::task::spawn_blocking(move || {
517                                Self::report_metrics(&db, &cf, &db_metrics);
518                            }).await {
519                                error!("Failed to log metrics with error: {}", e);
520                            }
521                        }
522                        _ = &mut recv => break,
523                    }
524                }
525                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
526            });
527        }
528        DBMap {
529            rocksdb: db.clone(),
530            opts: opts.clone(),
531            _phantom: PhantomData,
532            cf: opt_cf.to_string(),
533            db_metrics: db_metrics_cloned,
534            _metrics_task_cancel_handle: Arc::new(sender),
535            get_sample_interval: db.get_sampling_interval(),
536            multiget_sample_interval: db.multiget_sampling_interval(),
537            write_sample_interval: db.write_sampling_interval(),
538            iter_sample_interval: db.iter_sampling_interval(),
539        }
540    }
541
542    /// Opens a database from a path, with specific options and an optional
543    /// column family.
544    ///
545    /// This database is used to perform operations on single column family, and
546    /// parametrizes all operations in `DBBatch` when writing across column
547    /// families.
548    #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
549    pub fn open<P: AsRef<Path>>(
550        path: P,
551        metric_conf: MetricConf,
552        db_options: Option<rocksdb::Options>,
553        opt_cf: Option<&str>,
554        rw_options: &ReadWriteOptions,
555    ) -> Result<Self, TypedStoreError> {
556        let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
557        let cfs = vec![cf_key];
558        let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
559        Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
560    }
561
562    /// Reopens an open database as a typed map operating under a specific
563    /// column family. if no column family is passed, the default column
564    /// family is used.
565    ///
566    /// ```
567    /// use core::fmt::Error;
568    /// use std::sync::Arc;
569    ///
570    /// use prometheus::Registry;
571    /// use tempfile::tempdir;
572    /// use typed_store::{metrics::DBMetrics, rocks::*};
573    /// #[tokio::main]
574    /// async fn main() -> Result<(), Error> {
575    ///     /// Open the DB with all needed column families first.
576    ///     let rocks = open_cf(
577    ///         tempdir().unwrap(),
578    ///         None,
579    ///         MetricConf::default(),
580    ///         &["First_CF", "Second_CF"],
581    ///     )
582    ///     .unwrap();
583    ///     /// Attach the column families to specific maps.
584    ///     let db_cf_1 = DBMap::<u32, u32>::reopen(
585    ///         &rocks,
586    ///         Some("First_CF"),
587    ///         &ReadWriteOptions::default(),
588    ///         false,
589    ///     )
590    ///     .expect("Failed to open storage");
591    ///     let db_cf_2 = DBMap::<u32, u32>::reopen(
592    ///         &rocks,
593    ///         Some("Second_CF"),
594    ///         &ReadWriteOptions::default(),
595    ///         false,
596    ///     )
597    ///     .expect("Failed to open storage");
598    ///     Ok(())
599    /// }
600    /// ```
601    #[instrument(level = "debug", skip(db), err)]
602    pub fn reopen(
603        db: &Arc<RocksDB>,
604        opt_cf: Option<&str>,
605        rw_options: &ReadWriteOptions,
606        is_deprecated: bool,
607    ) -> Result<Self, TypedStoreError> {
608        let cf_key = opt_cf
609            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
610            .to_owned();
611
612        db.cf_handle(&cf_key)
613            .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
614
615        Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
616    }
617
618    pub fn batch(&self) -> DBBatch {
619        let batch = WriteBatch::default();
620        DBBatch::new(
621            &self.rocksdb,
622            batch,
623            self.opts.writeopts(),
624            &self.db_metrics,
625            &self.write_sample_interval,
626        )
627    }
628
629    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
630        let from_buf = be_fix_int_ser(start)?;
631        let to_buf = be_fix_int_ser(end)?;
632        self.rocksdb
633            .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
634        Ok(())
635    }
636
637    pub fn compact_range_raw(
638        &self,
639        cf_name: &str,
640        start: Vec<u8>,
641        end: Vec<u8>,
642    ) -> Result<(), TypedStoreError> {
643        let cf = self
644            .rocksdb
645            .cf_handle(cf_name)
646            .expect("compact range: column family does not exist");
647        self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
648        Ok(())
649    }
650
651    pub fn compact_range_to_bottom<J: Serialize>(
652        &self,
653        start: &J,
654        end: &J,
655    ) -> Result<(), TypedStoreError> {
656        let from_buf = be_fix_int_ser(start)?;
657        let to_buf = be_fix_int_ser(end)?;
658        self.rocksdb
659            .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
660        Ok(())
661    }
662
663    pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
664        self.rocksdb
665            .cf_handle(&self.cf)
666            .expect("Map-keying column family should have been checked at DB creation")
667    }
668
669    pub fn flush(&self) -> Result<(), TypedStoreError> {
670        self.rocksdb
671            .flush_cf(&self.cf())
672            .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
673    }
674
675    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
676        self.rocksdb.set_options_cf(&self.cf(), opts)
677    }
678
679    fn get_int_property(
680        rocksdb: &RocksDB,
681        cf: &impl AsColumnFamilyRef,
682        property_name: &std::ffi::CStr,
683    ) -> Result<i64, TypedStoreError> {
684        match rocksdb.property_int_value_cf(cf, property_name) {
685            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
686            Ok(None) => Ok(0),
687            Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
688        }
689    }
690
691    /// Returns a vector of raw values corresponding to the keys provided.
692    fn multi_get_pinned<J>(
693        &self,
694        keys: impl IntoIterator<Item = J>,
695    ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
696    where
697        J: Borrow<K>,
698        K: Serialize,
699    {
700        let _timer = self
701            .db_metrics
702            .op_metrics
703            .rocksdb_multiget_latency_seconds
704            .with_label_values(&[&self.cf])
705            .start_timer();
706        let perf_ctx = if self.multiget_sample_interval.sample() {
707            Some(RocksDBPerfContext)
708        } else {
709            None
710        };
711        let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
712            .into_iter()
713            .map(|k| be_fix_int_ser(k.borrow()))
714            .collect();
715        let results: Result<Vec<_>, TypedStoreError> = self
716            .rocksdb
717            .batched_multi_get_cf_opt(
718                &self.cf(),
719                keys_bytes?,
720                // sorted_keys=
721                false,
722                &self.opts.readopts(),
723            )
724            .into_iter()
725            .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
726            .collect();
727        let entries = results?;
728        let entry_size = entries
729            .iter()
730            .flatten()
731            .map(|entry| entry.len())
732            .sum::<usize>();
733        self.db_metrics
734            .op_metrics
735            .rocksdb_multiget_bytes
736            .with_label_values(&[&self.cf])
737            .observe(entry_size as f64);
738        if perf_ctx.is_some() {
739            self.db_metrics
740                .read_perf_ctx_metrics
741                .report_metrics(&self.cf);
742        }
743        Ok(entries)
744    }
745
746    fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
747        let Some(cf) = rocksdb.cf_handle(cf_name) else {
748            tracing::warn!(
749                "unable to report metrics for cf {cf_name:?} in db {:?}",
750                rocksdb.db_name()
751            );
752            return;
753        };
754
755        db_metrics
756            .cf_metrics
757            .rocksdb_total_sst_files_size
758            .with_label_values(&[cf_name])
759            .set(
760                Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
761                    .unwrap_or(METRICS_ERROR),
762            );
763        db_metrics
764            .cf_metrics
765            .rocksdb_total_blob_files_size
766            .with_label_values(&[cf_name])
767            .set(
768                Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
769                    .unwrap_or(METRICS_ERROR),
770            );
771        // 7 is the default number of levels in RocksDB. If we ever change the number of
772        // levels using `set_num_levels`, we need to update here as well. Note
773        // that there isn't an API to query the DB to get the number of levels (yet).
774        let total_num_files: i64 = (0..=6)
775            .map(|level| {
776                Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
777                    .unwrap_or(METRICS_ERROR)
778            })
779            .sum();
780        db_metrics
781            .cf_metrics
782            .rocksdb_total_num_files
783            .with_label_values(&[cf_name])
784            .set(total_num_files);
785        db_metrics
786            .cf_metrics
787            .rocksdb_num_level0_files
788            .with_label_values(&[cf_name])
789            .set(
790                Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
791                    .unwrap_or(METRICS_ERROR),
792            );
793        db_metrics
794            .cf_metrics
795            .rocksdb_current_size_active_mem_tables
796            .with_label_values(&[cf_name])
797            .set(
798                Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
799                    .unwrap_or(METRICS_ERROR),
800            );
801        db_metrics
802            .cf_metrics
803            .rocksdb_size_all_mem_tables
804            .with_label_values(&[cf_name])
805            .set(
806                Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
807                    .unwrap_or(METRICS_ERROR),
808            );
809        db_metrics
810            .cf_metrics
811            .rocksdb_num_snapshots
812            .with_label_values(&[cf_name])
813            .set(
814                Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
815                    .unwrap_or(METRICS_ERROR),
816            );
817        db_metrics
818            .cf_metrics
819            .rocksdb_oldest_snapshot_time
820            .with_label_values(&[cf_name])
821            .set(
822                Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
823                    .unwrap_or(METRICS_ERROR),
824            );
825        db_metrics
826            .cf_metrics
827            .rocksdb_actual_delayed_write_rate
828            .with_label_values(&[cf_name])
829            .set(
830                Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
831                    .unwrap_or(METRICS_ERROR),
832            );
833        db_metrics
834            .cf_metrics
835            .rocksdb_is_write_stopped
836            .with_label_values(&[cf_name])
837            .set(
838                Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
839                    .unwrap_or(METRICS_ERROR),
840            );
841        db_metrics
842            .cf_metrics
843            .rocksdb_block_cache_capacity
844            .with_label_values(&[cf_name])
845            .set(
846                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
847                    .unwrap_or(METRICS_ERROR),
848            );
849        db_metrics
850            .cf_metrics
851            .rocksdb_block_cache_usage
852            .with_label_values(&[cf_name])
853            .set(
854                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
855                    .unwrap_or(METRICS_ERROR),
856            );
857        db_metrics
858            .cf_metrics
859            .rocksdb_block_cache_pinned_usage
860            .with_label_values(&[cf_name])
861            .set(
862                Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
863                    .unwrap_or(METRICS_ERROR),
864            );
865        db_metrics
866            .cf_metrics
867            .rocksdb_estimate_table_readers_mem
868            .with_label_values(&[cf_name])
869            .set(
870                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
871                    .unwrap_or(METRICS_ERROR),
872            );
873        db_metrics
874            .cf_metrics
875            .rocksdb_estimated_num_keys
876            .with_label_values(&[cf_name])
877            .set(
878                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
879                    .unwrap_or(METRICS_ERROR),
880            );
881        db_metrics
882            .cf_metrics
883            .rocksdb_num_immutable_mem_tables
884            .with_label_values(&[cf_name])
885            .set(
886                Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
887                    .unwrap_or(METRICS_ERROR),
888            );
889        db_metrics
890            .cf_metrics
891            .rocksdb_mem_table_flush_pending
892            .with_label_values(&[cf_name])
893            .set(
894                Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
895                    .unwrap_or(METRICS_ERROR),
896            );
897        db_metrics
898            .cf_metrics
899            .rocksdb_compaction_pending
900            .with_label_values(&[cf_name])
901            .set(
902                Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
903                    .unwrap_or(METRICS_ERROR),
904            );
905        db_metrics
906            .cf_metrics
907            .rocksdb_estimate_pending_compaction_bytes
908            .with_label_values(&[cf_name])
909            .set(
910                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
911                    .unwrap_or(METRICS_ERROR),
912            );
913        db_metrics
914            .cf_metrics
915            .rocksdb_num_running_compactions
916            .with_label_values(&[cf_name])
917            .set(
918                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
919                    .unwrap_or(METRICS_ERROR),
920            );
921        db_metrics
922            .cf_metrics
923            .rocksdb_num_running_flushes
924            .with_label_values(&[cf_name])
925            .set(
926                Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
927                    .unwrap_or(METRICS_ERROR),
928            );
929        db_metrics
930            .cf_metrics
931            .rocksdb_estimate_oldest_key_time
932            .with_label_values(&[cf_name])
933            .set(
934                Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
935                    .unwrap_or(METRICS_ERROR),
936            );
937        db_metrics
938            .cf_metrics
939            .rocksdb_background_errors
940            .with_label_values(&[cf_name])
941            .set(
942                Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
943                    .unwrap_or(METRICS_ERROR),
944            );
945        db_metrics
946            .cf_metrics
947            .rocksdb_base_level
948            .with_label_values(&[cf_name])
949            .set(
950                Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
951                    .unwrap_or(METRICS_ERROR),
952            );
953    }
954
955    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
956        self.rocksdb.checkpoint(path)
957    }
958
959    pub fn table_summary(&self) -> eyre::Result<TableSummary>
960    where
961        K: Serialize + DeserializeOwned,
962        V: Serialize + DeserializeOwned,
963    {
964        let mut num_keys = 0;
965        let mut key_bytes_total = 0;
966        let mut value_bytes_total = 0;
967        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
968        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
969        for item in self.safe_iter() {
970            let (key, value) = item?;
971            num_keys += 1;
972            let key_len = be_fix_int_ser(key.borrow())?.len();
973            let value_len = bcs::to_bytes(value.borrow())?.len();
974            key_bytes_total += key_len;
975            value_bytes_total += value_len;
976            key_hist.record(key_len as u64)?;
977            value_hist.record(value_len as u64)?;
978        }
979        Ok(TableSummary {
980            num_keys,
981            key_bytes_total,
982            value_bytes_total,
983            key_hist,
984            value_hist,
985        })
986    }
987
988    // Creates metrics and context for tracking an iterator usage and performance.
989    fn create_iter_context(
990        &self,
991    ) -> (
992        Option<HistogramTimer>,
993        Option<Histogram>,
994        Option<Histogram>,
995        Option<RocksDBPerfContext>,
996    ) {
997        let timer = self
998            .db_metrics
999            .op_metrics
1000            .rocksdb_iter_latency_seconds
1001            .with_label_values(&[&self.cf])
1002            .start_timer();
1003        let bytes_scanned = self
1004            .db_metrics
1005            .op_metrics
1006            .rocksdb_iter_bytes
1007            .with_label_values(&[&self.cf]);
1008        let keys_scanned = self
1009            .db_metrics
1010            .op_metrics
1011            .rocksdb_iter_keys
1012            .with_label_values(&[&self.cf]);
1013        let perf_ctx = if self.iter_sample_interval.sample() {
1014            Some(RocksDBPerfContext)
1015        } else {
1016            None
1017        };
1018        (
1019            Some(timer),
1020            Some(bytes_scanned),
1021            Some(keys_scanned),
1022            perf_ctx,
1023        )
1024    }
1025
1026    // Creates a RocksDB read option with specified lower and upper bounds.
1027    /// Lower bound is inclusive, and upper bound is exclusive.
1028    fn create_read_options_with_bounds(
1029        &self,
1030        lower_bound: Option<K>,
1031        upper_bound: Option<K>,
1032    ) -> ReadOptions
1033    where
1034        K: Serialize,
1035    {
1036        let mut readopts = self.opts.readopts();
1037        if let Some(lower_bound) = lower_bound {
1038            let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1039            readopts.set_iterate_lower_bound(key_buf);
1040        }
1041        if let Some(upper_bound) = upper_bound {
1042            let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1043            readopts.set_iterate_upper_bound(key_buf);
1044        }
1045        readopts
1046    }
1047
1048    /// Creates a safe reversed iterator with optional bounds.
1049    /// Upper bound is included.
1050    pub fn reversed_safe_iter_with_bounds(
1051        &self,
1052        lower_bound: Option<K>,
1053        upper_bound: Option<K>,
1054    ) -> Result<SafeRevIter<'_, K, V>, TypedStoreError>
1055    where
1056        K: Serialize + DeserializeOwned,
1057        V: Serialize + DeserializeOwned,
1058    {
1059        let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1060        let readopts = self.create_read_options_with_range((
1061            lower_bound
1062                .as_ref()
1063                .map(Bound::Included)
1064                .unwrap_or(Bound::Unbounded),
1065            upper_bound
1066                .as_ref()
1067                .map(Bound::Included)
1068                .unwrap_or(Bound::Unbounded),
1069        ));
1070
1071        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1072        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1073        let iter = SafeIter::new(
1074            self.cf.clone(),
1075            db_iter,
1076            _timer,
1077            _perf_ctx,
1078            bytes_scanned,
1079            keys_scanned,
1080            Some(self.db_metrics.clone()),
1081        );
1082        Ok(SafeRevIter::new(iter, upper_bound_key.transpose()?))
1083    }
1084
1085    // Creates a RocksDB read option with lower and upper bounds set corresponding
1086    // to `range`.
1087    fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1088    where
1089        K: Serialize,
1090    {
1091        let mut readopts = self.opts.readopts();
1092
1093        let lower_bound = range.start_bound();
1094        let upper_bound = range.end_bound();
1095
1096        match lower_bound {
1097            Bound::Included(lower_bound) => {
1098                // Rocksdb lower bound is inclusive by default so nothing to do
1099                let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1100                readopts.set_iterate_lower_bound(key_buf);
1101            }
1102            Bound::Excluded(lower_bound) => {
1103                let mut key_buf =
1104                    be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1105
1106                // Since we want exclusive, we need to increment the key to exclude the previous
1107                big_endian_saturating_add_one(&mut key_buf);
1108                readopts.set_iterate_lower_bound(key_buf);
1109            }
1110            Bound::Unbounded => (),
1111        };
1112
1113        match upper_bound {
1114            Bound::Included(upper_bound) => {
1115                let mut key_buf =
1116                    be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1117
1118                // If the key is already at the limit, there's nowhere else to go, so no upper
1119                // bound
1120                if !is_max(&key_buf) {
1121                    // Since we want exclusive, we need to increment the key to get the upper bound
1122                    big_endian_saturating_add_one(&mut key_buf);
1123                    readopts.set_iterate_upper_bound(key_buf);
1124                }
1125            }
1126            Bound::Excluded(upper_bound) => {
1127                // Rocksdb upper bound is inclusive by default so nothing to do
1128                let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1129                readopts.set_iterate_upper_bound(key_buf);
1130            }
1131            Bound::Unbounded => (),
1132        };
1133
1134        readopts
1135    }
1136}
1137
1138/// Provides a mutable struct to form a collection of database write operations,
1139/// and execute them.
1140///
1141/// Batching write and delete operations is faster than performing them one by
1142/// one and ensures their atomicity,  ie. they are all written or none is.
1143/// This is also true of operations across column families in the same database.
1144///
1145/// Serializations / Deserialization, and naming of column families is performed
1146/// by passing a DBMap<K,V> with each operation.
1147///
1148/// ```
1149/// use core::fmt::Error;
1150/// use std::sync::Arc;
1151///
1152/// use prometheus::Registry;
1153/// use tempfile::tempdir;
1154/// use typed_store::{Map, metrics::DBMetrics, rocks::*};
1155///
1156/// #[tokio::main]
1157/// async fn main() -> Result<(), Error> {
1158///     let rocks = open_cf(
1159///         tempfile::tempdir().unwrap(),
1160///         None,
1161///         MetricConf::default(),
1162///         &["First_CF", "Second_CF"],
1163///     )
1164///     .unwrap();
1165///
1166///     let db_cf_1 = DBMap::reopen(
1167///         &rocks,
1168///         Some("First_CF"),
1169///         &ReadWriteOptions::default(),
1170///         false,
1171///     )
1172///     .expect("Failed to open storage");
1173///     let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1174///
1175///     let db_cf_2 = DBMap::reopen(
1176///         &rocks,
1177///         Some("Second_CF"),
1178///         &ReadWriteOptions::default(),
1179///         false,
1180///     )
1181///     .expect("Failed to open storage");
1182///     let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1183///
1184///     let mut batch = db_cf_1.batch();
1185///     batch
1186///         .insert_batch(&db_cf_1, keys_vals_1.clone())
1187///         .expect("Failed to batch insert")
1188///         .insert_batch(&db_cf_2, keys_vals_2.clone())
1189///         .expect("Failed to batch insert");
1190///
1191///     let _ = batch.write().expect("Failed to execute batch");
1192///     for (k, v) in keys_vals_1 {
1193///         let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1194///         assert_eq!(Some(v), val);
1195///     }
1196///
1197///     for (k, v) in keys_vals_2 {
1198///         let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1199///         assert_eq!(Some(v), val);
1200///     }
1201///     Ok(())
1202/// }
1203/// ```
1204pub struct DBBatch {
1205    rocksdb: Arc<RocksDB>,
1206    batch: rocksdb::WriteBatch,
1207    opts: WriteOptions,
1208    db_metrics: Arc<DBMetrics>,
1209    write_sample_interval: SamplingInterval,
1210}
1211
1212impl DBBatch {
1213    /// Create a new batch associated with a DB reference.
1214    ///
1215    /// Use `open_cf` to get the DB reference or an existing open database.
1216    pub fn new(
1217        dbref: &Arc<RocksDB>,
1218        batch: rocksdb::WriteBatch,
1219        opts: WriteOptions,
1220        db_metrics: &Arc<DBMetrics>,
1221        write_sample_interval: &SamplingInterval,
1222    ) -> Self {
1223        DBBatch {
1224            rocksdb: dbref.clone(),
1225            batch,
1226            opts,
1227            db_metrics: db_metrics.clone(),
1228            write_sample_interval: write_sample_interval.clone(),
1229        }
1230    }
1231
1232    /// Consume the batch and write its operations to the database
1233    #[instrument(level = "trace", skip_all, err)]
1234    pub fn write(self) -> Result<(), TypedStoreError> {
1235        let db_name = self.rocksdb.db_name();
1236        let timer = self
1237            .db_metrics
1238            .op_metrics
1239            .rocksdb_batch_commit_latency_seconds
1240            .with_label_values(&[&db_name])
1241            .start_timer();
1242        let batch_size = self.batch.size_in_bytes();
1243
1244        let perf_ctx = if self.write_sample_interval.sample() {
1245            Some(RocksDBPerfContext)
1246        } else {
1247            None
1248        };
1249        self.rocksdb.write(self.batch, &self.opts)?;
1250        self.db_metrics
1251            .op_metrics
1252            .rocksdb_batch_commit_bytes
1253            .with_label_values(&[&db_name])
1254            .observe(batch_size as f64);
1255
1256        if perf_ctx.is_some() {
1257            self.db_metrics
1258                .write_perf_ctx_metrics
1259                .report_metrics(&db_name);
1260        }
1261        let elapsed = timer.stop_and_record();
1262        if elapsed > 1.0 {
1263            warn!(?elapsed, ?db_name, "very slow batch write");
1264            self.db_metrics
1265                .op_metrics
1266                .rocksdb_very_slow_batch_writes_count
1267                .with_label_values(&[&db_name])
1268                .inc();
1269            self.db_metrics
1270                .op_metrics
1271                .rocksdb_very_slow_batch_writes_duration_ms
1272                .with_label_values(&[&db_name])
1273                .inc_by((elapsed * 1000.0) as u64);
1274        }
1275        Ok(())
1276    }
1277
1278    pub fn size_in_bytes(&self) -> usize {
1279        self.batch.size_in_bytes()
1280    }
1281}
1282
1283impl DBBatch {
1284    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1285        &mut self,
1286        db: &DBMap<K, V>,
1287        purged_vals: impl IntoIterator<Item = J>,
1288    ) -> Result<(), TypedStoreError> {
1289        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1290            return Err(TypedStoreError::CrossDBBatch);
1291        }
1292
1293        purged_vals
1294            .into_iter()
1295            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1296                let k_buf = be_fix_int_ser(k.borrow())?;
1297                self.batch.delete_cf(&db.cf(), k_buf);
1298
1299                Ok(())
1300            })?;
1301        Ok(())
1302    }
1303
1304    /// Deletes a range of keys between `from` (inclusive) and `to`
1305    /// (non-inclusive) by writing a range delete tombstone in the db map
1306    /// If the DBMap is configured with ignore_range_deletions set to false,
1307    /// the effect of this write will be visible immediately i.e. you won't
1308    /// see old values when you do a lookup or scan. But if it is configured
1309    /// with ignore_range_deletions set to true, the old value are visible until
1310    /// compaction actually deletes them which will happen sometime after. By
1311    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1312    /// overridden in the config), so please use this function with caution
1313    pub fn schedule_delete_range<K: Serialize, V>(
1314        &mut self,
1315        db: &DBMap<K, V>,
1316        from: &K,
1317        to: &K,
1318    ) -> Result<(), TypedStoreError> {
1319        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1320            return Err(TypedStoreError::CrossDBBatch);
1321        }
1322
1323        let from_buf = be_fix_int_ser(from)?;
1324        let to_buf = be_fix_int_ser(to)?;
1325
1326        self.batch.delete_range_cf(&db.cf(), from_buf, to_buf);
1327        Ok(())
1328    }
1329
1330    /// inserts a range of (key, value) pairs given as an iterator
1331    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1332        &mut self,
1333        db: &DBMap<K, V>,
1334        new_vals: impl IntoIterator<Item = (J, U)>,
1335    ) -> Result<&mut Self, TypedStoreError> {
1336        if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1337            return Err(TypedStoreError::CrossDBBatch);
1338        }
1339        let mut total = 0usize;
1340        new_vals
1341            .into_iter()
1342            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1343                let k_buf = be_fix_int_ser(k.borrow())?;
1344                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1345                total += k_buf.len() + v_buf.len();
1346                self.batch.put_cf(&db.cf(), k_buf, v_buf);
1347                Ok(())
1348            })?;
1349        self.db_metrics
1350            .op_metrics
1351            .rocksdb_batch_put_bytes
1352            .with_label_values(&[&db.cf])
1353            .observe(total as f64);
1354        Ok(self)
1355    }
1356}
1357
1358pub type RocksDBRawIter<'a> =
1359    rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>;
1360
1361impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1362where
1363    K: Serialize + DeserializeOwned,
1364    V: Serialize + DeserializeOwned,
1365{
1366    type Error = TypedStoreError;
1367    type SafeIterator = SafeIter<'a, K, V>;
1368
1369    #[instrument(level = "trace", skip_all, err)]
1370    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1371        let key_buf = be_fix_int_ser(key)?;
1372        // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
1373        // but no false negatives. We use it to short-circuit the absent case
1374        let readopts = self.opts.readopts();
1375        Ok(self
1376            .rocksdb
1377            .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1378            && self
1379                .rocksdb
1380                .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1381                .map_err(typed_store_err_from_rocks_err)?
1382                .is_some())
1383    }
1384
1385    #[instrument(level = "trace", skip_all, err)]
1386    fn multi_contains_keys<J>(
1387        &self,
1388        keys: impl IntoIterator<Item = J>,
1389    ) -> Result<Vec<bool>, Self::Error>
1390    where
1391        J: Borrow<K>,
1392    {
1393        let values = self.multi_get_pinned(keys)?;
1394        Ok(values.into_iter().map(|v| v.is_some()).collect())
1395    }
1396
1397    #[instrument(level = "trace", skip_all, err)]
1398    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1399        let _timer = self
1400            .db_metrics
1401            .op_metrics
1402            .rocksdb_get_latency_seconds
1403            .with_label_values(&[&self.cf])
1404            .start_timer();
1405        let perf_ctx = if self.get_sample_interval.sample() {
1406            Some(RocksDBPerfContext)
1407        } else {
1408            None
1409        };
1410        let key_buf = be_fix_int_ser(key)?;
1411        let res = self
1412            .rocksdb
1413            .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1414            .map_err(typed_store_err_from_rocks_err)?;
1415        self.db_metrics
1416            .op_metrics
1417            .rocksdb_get_bytes
1418            .with_label_values(&[&self.cf])
1419            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1420        if perf_ctx.is_some() {
1421            self.db_metrics
1422                .read_perf_ctx_metrics
1423                .report_metrics(&self.cf);
1424        }
1425        match res {
1426            Some(data) => Ok(Some(
1427                bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1428            )),
1429            None => Ok(None),
1430        }
1431    }
1432
1433    #[instrument(level = "trace", skip_all, err)]
1434    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1435        let timer = self
1436            .db_metrics
1437            .op_metrics
1438            .rocksdb_put_latency_seconds
1439            .with_label_values(&[&self.cf])
1440            .start_timer();
1441        let perf_ctx = if self.write_sample_interval.sample() {
1442            Some(RocksDBPerfContext)
1443        } else {
1444            None
1445        };
1446        let key_buf = be_fix_int_ser(key)?;
1447        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1448        self.db_metrics
1449            .op_metrics
1450            .rocksdb_put_bytes
1451            .with_label_values(&[&self.cf])
1452            .observe((key_buf.len() + value_buf.len()) as f64);
1453        if perf_ctx.is_some() {
1454            self.db_metrics
1455                .write_perf_ctx_metrics
1456                .report_metrics(&self.cf);
1457        }
1458        self.rocksdb
1459            .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
1460            .map_err(typed_store_err_from_rocks_err)?;
1461
1462        let elapsed = timer.stop_and_record();
1463        if elapsed > 1.0 {
1464            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1465            self.db_metrics
1466                .op_metrics
1467                .rocksdb_very_slow_puts_count
1468                .with_label_values(&[&self.cf])
1469                .inc();
1470            self.db_metrics
1471                .op_metrics
1472                .rocksdb_very_slow_puts_duration_ms
1473                .with_label_values(&[&self.cf])
1474                .inc_by((elapsed * 1000.0) as u64);
1475        }
1476
1477        Ok(())
1478    }
1479
1480    #[instrument(level = "trace", skip_all, err)]
1481    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1482        let _timer = self
1483            .db_metrics
1484            .op_metrics
1485            .rocksdb_delete_latency_seconds
1486            .with_label_values(&[&self.cf])
1487            .start_timer();
1488        let perf_ctx = if self.write_sample_interval.sample() {
1489            Some(RocksDBPerfContext)
1490        } else {
1491            None
1492        };
1493        let key_buf = be_fix_int_ser(key)?;
1494        self.rocksdb
1495            .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
1496            .map_err(typed_store_err_from_rocks_err)?;
1497        self.db_metrics
1498            .op_metrics
1499            .rocksdb_deletes
1500            .with_label_values(&[&self.cf])
1501            .inc();
1502        if perf_ctx.is_some() {
1503            self.db_metrics
1504                .write_perf_ctx_metrics
1505                .report_metrics(&self.cf);
1506        }
1507        Ok(())
1508    }
1509
1510    /// This method first drops the existing column family and then creates a
1511    /// new one with the same name. The two operations are not atomic and
1512    /// hence it is possible to get into a race condition where the column
1513    /// family has been dropped but new one is not created yet
1514    #[instrument(level = "trace", skip_all, err)]
1515    fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
1516        let _ = self.rocksdb.drop_cf(&self.cf);
1517        self.rocksdb
1518            .create_cf(self.cf.clone(), &default_db_options().options)
1519            .map_err(typed_store_err_from_rocks_err)?;
1520        Ok(())
1521    }
1522
1523    /// Writes a range delete tombstone to delete all entries in the db map
1524    /// If the DBMap is configured with ignore_range_deletions set to false,
1525    /// the effect of this write will be visible immediately i.e. you won't
1526    /// see old values when you do a lookup or scan. But if it is configured
1527    /// with ignore_range_deletions set to true, the old value are visible until
1528    /// compaction actually deletes them which will happen sometime after. By
1529    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1530    /// overridden in the config), so please use this function with caution
1531    #[instrument(level = "trace", skip_all, err)]
1532    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1533        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1534        let last_key = self
1535            .reversed_safe_iter_with_bounds(None, None)?
1536            .next()
1537            .transpose()?
1538            .map(|(k, _v)| k);
1539        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1540            let mut batch = self.batch();
1541            batch.schedule_delete_range(self, &first_key, &last_key)?;
1542            batch.write()?;
1543        }
1544        Ok(())
1545    }
1546
1547    fn is_empty(&self) -> bool {
1548        self.safe_iter().next().is_none()
1549    }
1550
1551    fn safe_iter(&'a self) -> Self::SafeIterator {
1552        let db_iter = self
1553            .rocksdb
1554            .raw_iterator_cf(&self.cf(), self.opts.readopts());
1555        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1556        SafeIter::new(
1557            self.cf.clone(),
1558            db_iter,
1559            _timer,
1560            _perf_ctx,
1561            bytes_scanned,
1562            keys_scanned,
1563            Some(self.db_metrics.clone()),
1564        )
1565    }
1566
1567    fn safe_iter_with_bounds(
1568        &'a self,
1569        lower_bound: Option<K>,
1570        upper_bound: Option<K>,
1571    ) -> Self::SafeIterator {
1572        let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
1573        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1574        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1575        SafeIter::new(
1576            self.cf.clone(),
1577            db_iter,
1578            _timer,
1579            _perf_ctx,
1580            bytes_scanned,
1581            keys_scanned,
1582            Some(self.db_metrics.clone()),
1583        )
1584    }
1585
1586    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
1587        let readopts = self.create_read_options_with_range(range);
1588        let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1589        let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1590        SafeIter::new(
1591            self.cf.clone(),
1592            db_iter,
1593            _timer,
1594            _perf_ctx,
1595            bytes_scanned,
1596            keys_scanned,
1597            Some(self.db_metrics.clone()),
1598        )
1599    }
1600
1601    /// Returns a vector of values corresponding to the keys provided.
1602    #[instrument(level = "trace", skip_all, err)]
1603    fn multi_get<J>(
1604        &self,
1605        keys: impl IntoIterator<Item = J>,
1606    ) -> Result<Vec<Option<V>>, TypedStoreError>
1607    where
1608        J: Borrow<K>,
1609    {
1610        let results = self.multi_get_pinned(keys)?;
1611        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1612            .into_iter()
1613            .map(|value_byte| match value_byte {
1614                Some(data) => Ok(Some(
1615                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1616                )),
1617                None => Ok(None),
1618            })
1619            .collect();
1620
1621        values_parsed
1622    }
1623
1624    /// Convenience method for batch insertion
1625    #[instrument(level = "trace", skip_all, err)]
1626    fn multi_insert<J, U>(
1627        &self,
1628        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1629    ) -> Result<(), Self::Error>
1630    where
1631        J: Borrow<K>,
1632        U: Borrow<V>,
1633    {
1634        let mut batch = self.batch();
1635        batch.insert_batch(self, key_val_pairs)?;
1636        batch.write()
1637    }
1638
1639    /// Convenience method for batch removal
1640    #[instrument(level = "trace", skip_all, err)]
1641    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1642    where
1643        J: Borrow<K>,
1644    {
1645        let mut batch = self.batch();
1646        batch.delete_batch(self, keys)?;
1647        batch.write()
1648    }
1649
1650    /// Try to catch up with primary when running as secondary
1651    #[instrument(level = "trace", skip_all, err)]
1652    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1653        self.rocksdb
1654            .try_catch_up_with_primary()
1655            .map_err(typed_store_err_from_rocks_err)
1656    }
1657}
1658
1659pub fn read_size_from_env(var_name: &str) -> Option<usize> {
1660    env::var(var_name)
1661        .ok()?
1662        .parse::<usize>()
1663        .tap_err(|e| {
1664            warn!(
1665                "Env var {} does not contain valid usize integer: {}",
1666                var_name, e
1667            )
1668        })
1669        .ok()
1670}
1671
1672#[derive(Clone, Debug)]
1673pub struct ReadWriteOptions {
1674    pub ignore_range_deletions: bool,
1675    // Whether to sync to disk on every write.
1676    sync_to_disk: bool,
1677}
1678
1679impl ReadWriteOptions {
1680    pub fn readopts(&self) -> ReadOptions {
1681        let mut readopts = ReadOptions::default();
1682        readopts.set_ignore_range_deletions(self.ignore_range_deletions);
1683        readopts
1684    }
1685
1686    pub fn writeopts(&self) -> WriteOptions {
1687        let mut opts = WriteOptions::default();
1688        opts.set_sync(self.sync_to_disk);
1689        opts
1690    }
1691
1692    pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
1693        self.ignore_range_deletions = ignore;
1694        self
1695    }
1696}
1697
1698impl Default for ReadWriteOptions {
1699    fn default() -> Self {
1700        Self {
1701            ignore_range_deletions: true,
1702            sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
1703        }
1704    }
1705}
1706// TODO: refactor this into a builder pattern, where rocksdb::Options are
1707// generated after a call to build().
1708#[derive(Default, Clone)]
1709pub struct DBOptions {
1710    pub options: rocksdb::Options,
1711    pub rw_options: ReadWriteOptions,
1712}
1713
1714impl DBOptions {
1715    // Optimize lookup perf for tables where no scans are performed.
1716    // If non-trivial number of values can be > 512B in size, it is beneficial to
1717    // also specify optimize_for_large_values_no_scan().
1718    pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
1719        // NOTE: this overwrites the block options.
1720        self.options
1721            .optimize_for_point_lookup(block_cache_size_mb as u64);
1722        self
1723    }
1724
1725    // Optimize write and lookup perf for tables which are rarely scanned, and have
1726    // large values. https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html
1727    pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
1728        if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
1729            info!("Large value blob storage optimization is disabled via env var.");
1730            return self;
1731        }
1732
1733        // Blob settings.
1734        self.options.set_enable_blob_files(true);
1735        self.options
1736            .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
1737        self.options.set_enable_blob_gc(true);
1738        // Since each blob can have non-trivial size overhead, and compression does not
1739        // work across blobs, set a min blob size in bytes to so small
1740        // transactions and effects are kept in sst files.
1741        self.options.set_min_blob_size(min_blob_size);
1742
1743        // Increase write buffer size to 256MiB.
1744        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
1745            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
1746            * 1024
1747            * 1024;
1748        self.options.set_write_buffer_size(write_buffer_size);
1749        // Since large blobs are not in sst files, reduce the target file size and base
1750        // level target size.
1751        let target_file_size_base = 64 << 20;
1752        self.options
1753            .set_target_file_size_base(target_file_size_base);
1754        // Level 1 default to 64MiB * 4 ~ 256MiB.
1755        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
1756            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
1757        self.options
1758            .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
1759
1760        self
1761    }
1762
1763    // Optimize tables with a mix of lookup and scan workloads.
1764    pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
1765        self.options
1766            .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
1767        self
1768    }
1769
1770    // Optimize DB receiving significant insertions.
1771    pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
1772        self.options
1773            .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
1774        self.options
1775            .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
1776        self
1777    }
1778
1779    // Optimize tables receiving significant insertions.
1780    pub fn optimize_for_write_throughput(mut self) -> DBOptions {
1781        // Increase write buffer size to 256MiB.
1782        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
1783            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
1784            * 1024
1785            * 1024;
1786        self.options.set_write_buffer_size(write_buffer_size);
1787        // Increase write buffers to keep to 6 before slowing down writes.
1788        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
1789            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
1790        self.options
1791            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
1792        // Keep 1 write buffer so recent writes can be read from memory.
1793        self.options
1794            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
1795
1796        // Increase compaction trigger for level 0 to 6.
1797        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
1798            .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
1799        self.options.set_level_zero_file_num_compaction_trigger(
1800            max_level_zero_file_num.try_into().unwrap(),
1801        );
1802        self.options.set_level_zero_slowdown_writes_trigger(
1803            (max_level_zero_file_num * 12).try_into().unwrap(),
1804        );
1805        self.options
1806            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
1807
1808        // Increase sst file size to 128MiB.
1809        self.options.set_target_file_size_base(
1810            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
1811                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
1812                * 1024
1813                * 1024,
1814        );
1815
1816        // Increase level 1 target size to 256MiB * 6 ~ 1.5GiB.
1817        self.options
1818            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
1819
1820        self
1821    }
1822
1823    // Optimize tables receiving significant insertions, without any deletions.
1824    // TODO: merge this function with optimize_for_write_throughput(), and use a
1825    // flag to indicate if deletion is received.
1826    pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
1827        // Increase write buffer size to 256MiB.
1828        let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
1829            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
1830            * 1024
1831            * 1024;
1832        self.options.set_write_buffer_size(write_buffer_size);
1833        // Increase write buffers to keep to 6 before slowing down writes.
1834        let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
1835            .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
1836        self.options
1837            .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
1838        // Keep 1 write buffer so recent writes can be read from memory.
1839        self.options
1840            .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
1841
1842        // Switch to universal compactions.
1843        self.options
1844            .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
1845        let mut compaction_options = rocksdb::UniversalCompactOptions::default();
1846        compaction_options.set_max_size_amplification_percent(10000);
1847        compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
1848        self.options
1849            .set_universal_compaction_options(&compaction_options);
1850
1851        let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
1852            .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
1853        self.options.set_level_zero_file_num_compaction_trigger(
1854            max_level_zero_file_num.try_into().unwrap(),
1855        );
1856        self.options.set_level_zero_slowdown_writes_trigger(
1857            (max_level_zero_file_num * 12).try_into().unwrap(),
1858        );
1859        self.options
1860            .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
1861
1862        // Increase sst file size to 128MiB.
1863        self.options.set_target_file_size_base(
1864            read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
1865                .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
1866                * 1024
1867                * 1024,
1868        );
1869
1870        // This should be a no-op for universal compaction but increasing it to be safe.
1871        self.options
1872            .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
1873
1874        self
1875    }
1876
1877    // Overrides the block options with different block cache size and block size.
1878    pub fn set_block_options(
1879        mut self,
1880        block_cache_size_mb: usize,
1881        block_size_bytes: usize,
1882    ) -> DBOptions {
1883        self.options
1884            .set_block_based_table_factory(&get_block_options(
1885                block_cache_size_mb,
1886                block_size_bytes,
1887            ));
1888        self
1889    }
1890
1891    // Disables write stalling and stopping based on pending compaction bytes.
1892    pub fn disable_write_throttling(mut self) -> DBOptions {
1893        self.options.set_soft_pending_compaction_bytes_limit(0);
1894        self.options.set_hard_pending_compaction_bytes_limit(0);
1895        self
1896    }
1897}
1898
1899/// Creates a default RocksDB option, to be used when RocksDB option is
1900/// unspecified.
1901pub fn default_db_options() -> DBOptions {
1902    let mut opt = rocksdb::Options::default();
1903
1904    // One common issue when running tests on Mac is that the default ulimit is too
1905    // low, leading to I/O errors such as "Too many open files". Raising fdlimit
1906    // to bypass it.
1907    if let Some(limit) = fdlimit::raise_fd_limit() {
1908        // on windows raise_fd_limit return None
1909        opt.set_max_open_files((limit / 8) as i32);
1910    }
1911
1912    // The table cache is locked for updates and this determines the number
1913    // of shards, ie 2^10. Increase in case of lock contentions.
1914    opt.set_table_cache_num_shard_bits(10);
1915
1916    // LSM compression settings
1917    opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
1918    opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
1919    opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
1920
1921    // IOTA uses multiple RocksDB in a node, so total sizes of write buffers and WAL
1922    // can be higher than the limits below.
1923    //
1924    // RocksDB also exposes the option to configure total write buffer size across
1925    // multiple instances via `write_buffer_manager`. But the write buffer flush
1926    // policy (flushing the buffer receiving the next write) may not work well.
1927    // So sticking to per-db write buffer size limit for now.
1928    //
1929    // The environment variables are only meant to be emergency overrides. They may
1930    // go away in future. It is preferable to update the default value, or
1931    // override the option in code.
1932    opt.set_db_write_buffer_size(
1933        read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
1934            * 1024
1935            * 1024,
1936    );
1937    opt.set_max_total_wal_size(
1938        read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
1939    );
1940
1941    // Num threads for compactions and memtable flushes.
1942    opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
1943
1944    opt.set_enable_pipelined_write(true);
1945
1946    // Increase block size to 16KiB.
1947    // https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md#indexes-and-filter-blocks
1948    opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
1949
1950    // Set memtable bloomfilter.
1951    opt.set_memtable_prefix_bloom_ratio(0.02);
1952
1953    DBOptions {
1954        options: opt,
1955        rw_options: ReadWriteOptions::default(),
1956    }
1957}
1958
1959fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
1960    // Set options mostly similar to those used in optimize_for_point_lookup(),
1961    // except non-default binary and hash index, to hopefully reduce lookup
1962    // latencies without causing any regression for scanning, with slightly more
1963    // memory usages. https://github.com/facebook/rocksdb/blob/11cb6af6e5009c51794641905ca40ce5beec7fee/options/options.cc#L611-L621
1964    let mut block_options = BlockBasedOptions::default();
1965    // Overrides block size.
1966    block_options.set_block_size(block_size_bytes);
1967    // Configure a block cache.
1968    block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
1969    // Set a bloomfilter with 1% false positive rate.
1970    block_options.set_bloom_filter(10.0, false);
1971    // From https://github.com/EighteenZi/rocksdb_wiki/blob/master/Block-Cache.md#caching-index-and-filter-blocks
1972    block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
1973    block_options
1974}
1975
1976/// Opens a database with options, and a number of column families that are
1977/// created if they do not exist.
1978#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
1979pub fn open_cf<P: AsRef<Path>>(
1980    path: P,
1981    db_options: Option<rocksdb::Options>,
1982    metric_conf: MetricConf,
1983    opt_cfs: &[&str],
1984) -> Result<Arc<RocksDB>, TypedStoreError> {
1985    let options = db_options.unwrap_or_else(|| default_db_options().options);
1986    let column_descriptors: Vec<_> = opt_cfs
1987        .iter()
1988        .map(|name| (*name, options.clone()))
1989        .collect();
1990    open_cf_opts(
1991        path,
1992        Some(options.clone()),
1993        metric_conf,
1994        &column_descriptors[..],
1995    )
1996}
1997
1998fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
1999    // Customize database options
2000    let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2001    options.create_if_missing(true);
2002    options.create_missing_column_families(true);
2003    options
2004}
2005
2006/// Opens a database with options, and a number of column families with
2007/// individual options that are created if they do not exist.
2008#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2009pub fn open_cf_opts<P: AsRef<Path>>(
2010    path: P,
2011    db_options: Option<rocksdb::Options>,
2012    metric_conf: MetricConf,
2013    opt_cfs: &[(&str, rocksdb::Options)],
2014) -> Result<Arc<RocksDB>, TypedStoreError> {
2015    let path = path.as_ref();
2016    // In the simulator, we intercept the wall clock in the test thread only. This
2017    // causes problems because rocksdb uses the simulated clock when creating
2018    // its background threads, but then those threads see the real wall clock
2019    // (because they are not the test thread), which causes rocksdb to panic.
2020    // The `nondeterministic` macro evaluates expressions in new threads, which
2021    // resolves the issue.
2022    //
2023    // This is a no-op in non-simulator builds.
2024
2025    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2026    nondeterministic!({
2027        let options = prepare_db_options(db_options);
2028        let rocksdb = {
2029            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2030                &options,
2031                path,
2032                cfs.into_iter()
2033                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2034            )
2035            .map_err(typed_store_err_from_rocks_err)?
2036        };
2037        Ok(Arc::new(RocksDB::new(
2038            rocksdb,
2039            metric_conf,
2040            PathBuf::from(path),
2041        )))
2042    })
2043}
2044
2045/// Opens a database with options, and a number of column families with
2046/// individual options that are created if they do not exist.
2047pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2048    primary_path: P,
2049    secondary_path: Option<P>,
2050    db_options: Option<rocksdb::Options>,
2051    metric_conf: MetricConf,
2052    opt_cfs: &[(&str, rocksdb::Options)],
2053) -> Result<Arc<RocksDB>, TypedStoreError> {
2054    let primary_path = primary_path.as_ref();
2055    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2056    // See comment above for explanation of why nondeterministic is necessary here.
2057    nondeterministic!({
2058        // Customize database options
2059        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2060
2061        fdlimit::raise_fd_limit();
2062        // This is a requirement by RocksDB when opening as secondary
2063        options.set_max_open_files(-1);
2064
2065        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2066        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2067            .ok()
2068            .unwrap_or_default();
2069
2070        let default_db_options = default_db_options();
2071        // Add CFs not explicitly listed
2072        for cf_key in cfs.iter() {
2073            if !opt_cfs.contains_key(&cf_key[..]) {
2074                opt_cfs.insert(cf_key, default_db_options.options.clone());
2075            }
2076        }
2077
2078        let primary_path = primary_path.to_path_buf();
2079        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2080            let mut s = primary_path.clone();
2081            s.pop();
2082            s.push("SECONDARY");
2083            s.as_path().to_path_buf()
2084        });
2085
2086        let rocksdb = {
2087            options.create_if_missing(true);
2088            options.create_missing_column_families(true);
2089            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2090                &options,
2091                &primary_path,
2092                &secondary_path,
2093                opt_cfs
2094                    .iter()
2095                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2096            )
2097            .map_err(typed_store_err_from_rocks_err)?;
2098            db.try_catch_up_with_primary()
2099                .map_err(typed_store_err_from_rocks_err)?;
2100            db
2101        };
2102        Ok(Arc::new(RocksDB::new(rocksdb, metric_conf, secondary_path)))
2103    })
2104}
2105
2106pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2107    const DB_DEFAULT_CF_NAME: &str = "default";
2108
2109    let opts = rocksdb::Options::default();
2110    rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2111        .map_err(|e| e.into())
2112        .map(|q| {
2113            q.iter()
2114                .filter_map(|s| {
2115                    // The `default` table is not used
2116                    if s != DB_DEFAULT_CF_NAME {
2117                        Some(s.clone())
2118                    } else {
2119                        None
2120                    }
2121                })
2122                .collect()
2123        })
2124}
2125
2126/// TODO: Good description of why we're doing this : RocksDB stores keys in BE and has a seek operator on iterators, see `https://github.com/facebook/rocksdb/wiki/Iterator#introduction`
2127#[inline]
2128pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2129where
2130    S: ?Sized + serde::Serialize,
2131{
2132    bincode::DefaultOptions::new()
2133        .with_big_endian()
2134        .with_fixint_encoding()
2135        .serialize(t)
2136        .map_err(typed_store_err_from_bincode_err)
2137}
2138
2139#[derive(Clone)]
2140pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2141impl DBMapTableConfigMap {
2142    pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2143        Self(map)
2144    }
2145
2146    pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2147        self.0.clone()
2148    }
2149}
2150
2151pub enum RocksDBAccessType {
2152    Primary,
2153    Secondary(Option<PathBuf>),
2154}
2155
2156// Drops a database if there is no other handle to it, with retries and timeout.
2157pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
2158    let mut backoff = backoff::ExponentialBackoff {
2159        max_elapsed_time: Some(timeout),
2160        ..Default::default()
2161    };
2162    loop {
2163        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2164            Ok(()) => return Ok(()),
2165            Err(err) => match backoff.next_backoff() {
2166                Some(duration) => tokio::time::sleep(duration).await,
2167                None => return Err(err),
2168            },
2169        }
2170    }
2171}
2172
2173fn populate_missing_cfs(
2174    input_cfs: &[(&str, rocksdb::Options)],
2175    path: &Path,
2176) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2177    let mut cfs = vec![];
2178    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2179    let existing_cfs =
2180        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2181            .ok()
2182            .unwrap_or_default();
2183
2184    for cf_name in existing_cfs {
2185        if !input_cf_index.contains(&cf_name[..]) {
2186            cfs.push((cf_name, rocksdb::Options::default()));
2187        }
2188    }
2189    cfs.extend(
2190        input_cfs
2191            .iter()
2192            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2193    );
2194    Ok(cfs)
2195}
2196
2197/// Given a vec<u8>, find the value which is one more than the vector
2198/// if the vector was a big endian number.
2199/// If the vector is already minimum, don't change it.
2200fn big_endian_saturating_add_one(v: &mut [u8]) {
2201    if is_max(v) {
2202        return;
2203    }
2204    for i in (0..v.len()).rev() {
2205        if v[i] == u8::MAX {
2206            v[i] = 0;
2207        } else {
2208            v[i] += 1;
2209            break;
2210        }
2211    }
2212}
2213
2214/// Check if all the bytes in the vector are 0xFF
2215fn is_max(v: &[u8]) -> bool {
2216    v.iter().all(|&x| x == u8::MAX)
2217}
2218
2219// `clippy::manual_div_ceil` is raised by code expanded by the
2220// `uint::construct_uint!` macro so it needs to be fixed by `uint`
2221#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2222#[test]
2223fn test_helpers() {
2224    let v = vec![];
2225    assert!(is_max(&v));
2226
2227    fn check_add(v: Vec<u8>) {
2228        let mut v = v;
2229        let num = Num32::from_big_endian(&v);
2230        big_endian_saturating_add_one(&mut v);
2231        assert!(num + 1 == Num32::from_big_endian(&v));
2232    }
2233
2234    uint::construct_uint! {
2235        // 32 byte number
2236        struct Num32(4);
2237    }
2238
2239    let mut v = vec![255; 32];
2240    big_endian_saturating_add_one(&mut v);
2241    assert!(Num32::MAX == Num32::from_big_endian(&v));
2242
2243    check_add(vec![1; 32]);
2244    check_add(vec![6; 32]);
2245    check_add(vec![254; 32]);
2246
2247    // TBD: More tests coming with randomized arrays
2248}