typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod errors;
6pub(crate) mod options;
7pub(crate) mod rocks_util;
8pub(crate) mod safe_iter;
9
10use std::{
11    collections::HashSet,
12    ffi::CStr,
13    path::{Path, PathBuf},
14    sync::Arc,
15    time::Duration,
16};
17
18use backoff::backoff::Backoff;
19use iota_macros::nondeterministic;
20use rocksdb::{
21    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, properties,
22    properties::num_files_at_level,
23};
24use tracing::warn;
25use typed_store_error::TypedStoreError;
26
27pub use crate::{
28    database::{DBBatch, DBMap, MetricConf},
29    rocks::options::{
30        DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, list_tables,
31        read_size_from_env,
32    },
33};
34use crate::{
35    database::{Database, Storage},
36    metrics::DBMetrics,
37    rocks::errors::typed_store_err_from_rocks_err,
38};
39
40// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property
41// built-in. From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
42const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
43    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
44
45const METRICS_ERROR: i64 = -1;
46
47const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
48
49#[cfg(test)]
50mod tests;
51
52// TODO: deprecate macros use
53#[macro_export]
54macro_rules! reopen {
55    ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
56        (
57            $(
58                DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
59            ),*
60        )
61    };
62}
63
64#[derive(Debug)]
65pub(crate) struct RocksDB {
66    pub(crate) underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
67}
68
69impl Drop for RocksDB {
70    fn drop(&mut self) {
71        self.underlying.cancel_all_background_work(/* wait */ true);
72    }
73}
74
75pub(crate) fn rocks_cf<'a>(
76    rocks_db: &'a RocksDB,
77    cf_name: &str,
78) -> Arc<rocksdb::BoundColumnFamily<'a>> {
79    rocks_db
80        .underlying
81        .cf_handle(cf_name)
82        .expect("Map-keying column family should have been checked at DB creation")
83}
84
85// Check if the database is corrupted, and if so, panic.
86// If the corrupted key is not set, we set it to [1].
87pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
88    let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
89
90    db.get(DB_CORRUPTED_KEY)
91        .map_err(|e| format!("Failed to open database: {e}"))
92        .and_then(|value| match value {
93            Some(v) if v[0] == 1 => Err(
94                "Database is corrupted, please remove the current database and start clean!"
95                    .to_string(),
96            ),
97            Some(_) => Ok(()),
98            None => db
99                .put(DB_CORRUPTED_KEY, [1])
100                .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
101        })?;
102
103    Ok(())
104}
105
106pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
107    rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
108}
109
110/// Opens a database with options, and a number of column families with
111/// individual options that are created if they do not exist.
112#[tracing::instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
113pub fn open_cf_opts<P: AsRef<Path>>(
114    path: P,
115    db_options: Option<rocksdb::Options>,
116    metric_conf: MetricConf,
117    opt_cfs: &[(&str, rocksdb::Options)],
118) -> Result<Arc<Database>, TypedStoreError> {
119    let path = path.as_ref();
120    // In the simulator, we intercept the wall clock in the test thread only. This
121    // causes problems because rocksdb uses the simulated clock when creating
122    // its background threads, but then those threads see the real wall clock
123    // (because they are not the test thread), which causes rocksdb to panic.
124    // The `nondeterministic` macro evaluates expressions in new threads, which
125    // resolves the issue.
126    //
127    // This is a no-op in non-simulator builds.
128
129    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
130    nondeterministic!({
131        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
132        options.create_if_missing(true);
133        options.create_missing_column_families(true);
134        let rocksdb = {
135            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
136                &options,
137                path,
138                cfs.into_iter()
139                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
140            )
141            .map_err(typed_store_err_from_rocks_err)?
142        };
143        Ok(Arc::new(Database::new(
144            Storage::Rocks(RocksDB {
145                underlying: rocksdb,
146            }),
147            metric_conf,
148        )))
149    })
150}
151
152/// Opens a database with options, and a number of column families with
153/// individual options that are created if they do not exist.
154pub fn open_cf_opts_secondary<P: AsRef<Path>>(
155    primary_path: P,
156    secondary_path: Option<P>,
157    db_options: Option<rocksdb::Options>,
158    metric_conf: MetricConf,
159    opt_cfs: &[(&str, rocksdb::Options)],
160) -> Result<Arc<Database>, TypedStoreError> {
161    let primary_path = primary_path.as_ref();
162    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
163    // See comment above for explanation of why nondeterministic is necessary here.
164    nondeterministic!({
165        // Customize database options
166        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
167
168        fdlimit::raise_fd_limit();
169        // This is a requirement by RocksDB when opening as secondary
170        options.set_max_open_files(-1);
171
172        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
173        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
174            .ok()
175            .unwrap_or_default();
176
177        let default_db_options = default_db_options();
178        // Add CFs not explicitly listed
179        for cf_key in cfs.iter() {
180            if !opt_cfs.contains_key(&cf_key[..]) {
181                opt_cfs.insert(cf_key, default_db_options.options.clone());
182            }
183        }
184
185        let primary_path = primary_path.to_path_buf();
186        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
187            let mut s = primary_path.clone();
188            s.pop();
189            s.push("SECONDARY");
190            s.as_path().to_path_buf()
191        });
192
193        let rocksdb = {
194            options.create_if_missing(true);
195            options.create_missing_column_families(true);
196            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
197                &options,
198                &primary_path,
199                &secondary_path,
200                opt_cfs
201                    .iter()
202                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
203            )
204            .map_err(typed_store_err_from_rocks_err)?;
205            db.try_catch_up_with_primary()
206                .map_err(typed_store_err_from_rocks_err)?;
207            db
208        };
209        Ok(Arc::new(Database::new(
210            Storage::Rocks(RocksDB {
211                underlying: rocksdb,
212            }),
213            metric_conf,
214        )))
215    })
216}
217
218// Drops a database if there is no other handle to it, with retries and timeout.
219pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
220    let mut backoff = backoff::ExponentialBackoff {
221        max_elapsed_time: Some(timeout),
222        ..Default::default()
223    };
224    loop {
225        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
226            Ok(()) => return Ok(()),
227            Err(err) => match backoff.next_backoff() {
228                Some(duration) => tokio::time::sleep(duration).await,
229                None => return Err(err),
230            },
231        }
232    }
233}
234
235fn populate_missing_cfs(
236    input_cfs: &[(&str, rocksdb::Options)],
237    path: &Path,
238) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
239    let mut cfs = vec![];
240    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
241    let existing_cfs =
242        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
243            .ok()
244            .unwrap_or_default();
245
246    for cf_name in existing_cfs {
247        if !input_cf_index.contains(&cf_name[..]) {
248            cfs.push((cf_name, rocksdb::Options::default()));
249        }
250    }
251    cfs.extend(
252        input_cfs
253            .iter()
254            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
255    );
256    Ok(cfs)
257}
258
259/// RocksDB-specific methods on `DBMap`. These are kept separate from the
260/// generic impl in `database.rs` because they directly access RocksDB
261/// internals and have no meaning for other storage backends.
262impl<K, V> DBMap<K, V> {
263    fn get_rocksdb_int_property(
264        rocksdb: &RocksDB,
265        cf: &impl AsColumnFamilyRef,
266        property_name: &CStr,
267    ) -> Result<i64, TypedStoreError> {
268        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
269            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
270            Ok(None) => Ok(0),
271            Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
272        }
273    }
274
275    pub(crate) fn report_rocksdb_metrics(
276        database: &Arc<Database>,
277        cf_name: &str,
278        db_metrics: &Arc<DBMetrics>,
279    ) {
280        let Storage::Rocks(rocksdb) = &database.storage else {
281            return;
282        };
283
284        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
285            warn!(
286                "unable to report metrics for cf {cf_name:?} in db {:?}",
287                database.db_name()
288            );
289            return;
290        };
291
292        db_metrics
293            .cf_metrics
294            .rocksdb_total_sst_files_size
295            .with_label_values(&[cf_name])
296            .set(
297                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
298                    .unwrap_or(METRICS_ERROR),
299            );
300        db_metrics
301            .cf_metrics
302            .rocksdb_total_blob_files_size
303            .with_label_values(&[cf_name])
304            .set(
305                Self::get_rocksdb_int_property(
306                    rocksdb,
307                    &cf,
308                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
309                )
310                .unwrap_or(METRICS_ERROR),
311            );
312        // 7 is the default number of levels in RocksDB. If we ever change the number of
313        // levels using `set_num_levels`, we need to update here as well. Note
314        // that there isn't an API to query the DB to get the number of levels (yet).
315        let total_num_files: i64 = (0..=6)
316            .map(|level| {
317                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
318                    .unwrap_or(METRICS_ERROR)
319            })
320            .sum();
321        db_metrics
322            .cf_metrics
323            .rocksdb_total_num_files
324            .with_label_values(&[cf_name])
325            .set(total_num_files);
326        db_metrics
327            .cf_metrics
328            .rocksdb_num_level0_files
329            .with_label_values(&[cf_name])
330            .set(
331                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
332                    .unwrap_or(METRICS_ERROR),
333            );
334        db_metrics
335            .cf_metrics
336            .rocksdb_current_size_active_mem_tables
337            .with_label_values(&[cf_name])
338            .set(
339                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
340                    .unwrap_or(METRICS_ERROR),
341            );
342        db_metrics
343            .cf_metrics
344            .rocksdb_size_all_mem_tables
345            .with_label_values(&[cf_name])
346            .set(
347                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
348                    .unwrap_or(METRICS_ERROR),
349            );
350        db_metrics
351            .cf_metrics
352            .rocksdb_num_snapshots
353            .with_label_values(&[cf_name])
354            .set(
355                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
356                    .unwrap_or(METRICS_ERROR),
357            );
358        db_metrics
359            .cf_metrics
360            .rocksdb_oldest_snapshot_time
361            .with_label_values(&[cf_name])
362            .set(
363                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
364                    .unwrap_or(METRICS_ERROR),
365            );
366        db_metrics
367            .cf_metrics
368            .rocksdb_actual_delayed_write_rate
369            .with_label_values(&[cf_name])
370            .set(
371                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
372                    .unwrap_or(METRICS_ERROR),
373            );
374        db_metrics
375            .cf_metrics
376            .rocksdb_is_write_stopped
377            .with_label_values(&[cf_name])
378            .set(
379                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
380                    .unwrap_or(METRICS_ERROR),
381            );
382        db_metrics
383            .cf_metrics
384            .rocksdb_block_cache_capacity
385            .with_label_values(&[cf_name])
386            .set(
387                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
388                    .unwrap_or(METRICS_ERROR),
389            );
390        db_metrics
391            .cf_metrics
392            .rocksdb_block_cache_usage
393            .with_label_values(&[cf_name])
394            .set(
395                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
396                    .unwrap_or(METRICS_ERROR),
397            );
398        db_metrics
399            .cf_metrics
400            .rocksdb_block_cache_pinned_usage
401            .with_label_values(&[cf_name])
402            .set(
403                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
404                    .unwrap_or(METRICS_ERROR),
405            );
406        db_metrics
407            .cf_metrics
408            .rocksdb_estimate_table_readers_mem
409            .with_label_values(&[cf_name])
410            .set(
411                Self::get_rocksdb_int_property(
412                    rocksdb,
413                    &cf,
414                    properties::ESTIMATE_TABLE_READERS_MEM,
415                )
416                .unwrap_or(METRICS_ERROR),
417            );
418        db_metrics
419            .cf_metrics
420            .rocksdb_estimated_num_keys
421            .with_label_values(&[cf_name])
422            .set(
423                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
424                    .unwrap_or(METRICS_ERROR),
425            );
426        db_metrics
427            .cf_metrics
428            .rocksdb_num_immutable_mem_tables
429            .with_label_values(&[cf_name])
430            .set(
431                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
432                    .unwrap_or(METRICS_ERROR),
433            );
434        db_metrics
435            .cf_metrics
436            .rocksdb_mem_table_flush_pending
437            .with_label_values(&[cf_name])
438            .set(
439                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
440                    .unwrap_or(METRICS_ERROR),
441            );
442        db_metrics
443            .cf_metrics
444            .rocksdb_compaction_pending
445            .with_label_values(&[cf_name])
446            .set(
447                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
448                    .unwrap_or(METRICS_ERROR),
449            );
450        db_metrics
451            .cf_metrics
452            .rocksdb_estimate_pending_compaction_bytes
453            .with_label_values(&[cf_name])
454            .set(
455                Self::get_rocksdb_int_property(
456                    rocksdb,
457                    &cf,
458                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
459                )
460                .unwrap_or(METRICS_ERROR),
461            );
462        db_metrics
463            .cf_metrics
464            .rocksdb_num_running_compactions
465            .with_label_values(&[cf_name])
466            .set(
467                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
468                    .unwrap_or(METRICS_ERROR),
469            );
470        db_metrics
471            .cf_metrics
472            .rocksdb_num_running_flushes
473            .with_label_values(&[cf_name])
474            .set(
475                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
476                    .unwrap_or(METRICS_ERROR),
477            );
478        db_metrics
479            .cf_metrics
480            .rocksdb_estimate_oldest_key_time
481            .with_label_values(&[cf_name])
482            .set(
483                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
484                    .unwrap_or(METRICS_ERROR),
485            );
486        db_metrics
487            .cf_metrics
488            .rocksdb_background_errors
489            .with_label_values(&[cf_name])
490            .set(
491                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
492                    .unwrap_or(METRICS_ERROR),
493            );
494        db_metrics
495            .cf_metrics
496            .rocksdb_base_level
497            .with_label_values(&[cf_name])
498            .set(
499                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
500                    .unwrap_or(METRICS_ERROR),
501            );
502    }
503}