Skip to main content

typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub mod errors;
6pub(crate) mod options;
7pub(crate) mod rocks_util;
8pub(crate) mod safe_iter;
9
10use std::{
11    collections::HashSet,
12    ffi::CStr,
13    path::{Path, PathBuf},
14    sync::Arc,
15    time::Duration,
16};
17
18use backoff::backoff::Backoff;
19use iota_macros::nondeterministic;
20use rocksdb::{
21    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, properties,
22    properties::num_files_at_level,
23};
24use tracing::warn;
25use typed_store_error::TypedStoreError;
26
27pub use crate::{
28    database::{DBBatch, DBMap, MetricConf},
29    rocks::options::{
30        DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, list_tables,
31        read_size_from_env,
32    },
33};
34use crate::{
35    database::{Database, Storage},
36    metrics::DBMetrics,
37    rocks::errors::typed_store_err_from_rocks_err,
38};
39
40// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property
41// built-in. From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
42const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
43    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
44
45const METRICS_ERROR: i64 = -1;
46
47const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
48
49#[cfg(test)]
50mod tests;
51
52#[derive(Debug)]
53pub(crate) struct RocksDB {
54    pub(crate) underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
55}
56
57impl Drop for RocksDB {
58    fn drop(&mut self) {
59        self.underlying.cancel_all_background_work(/* wait */ true);
60    }
61}
62
63pub(crate) fn rocks_cf<'a>(
64    rocks_db: &'a RocksDB,
65    cf_name: &str,
66) -> Arc<rocksdb::BoundColumnFamily<'a>> {
67    rocks_db
68        .underlying
69        .cf_handle(cf_name)
70        .expect("Map-keying column family should have been checked at DB creation")
71}
72
73// Check if the database is corrupted, and if so, panic.
74// If the corrupted key is not set, we set it to [1].
75pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
76    let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
77
78    db.get(DB_CORRUPTED_KEY)
79        .map_err(|e| format!("Failed to open database: {e}"))
80        .and_then(|value| match value {
81            Some(v) if v[0] == 1 => Err(
82                "Database is corrupted, please remove the current database and start clean!"
83                    .to_string(),
84            ),
85            Some(_) => Ok(()),
86            None => db
87                .put(DB_CORRUPTED_KEY, [1])
88                .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
89        })?;
90
91    Ok(())
92}
93
94pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
95    rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
96}
97
98/// Opens a database with options, and a number of column families with
99/// individual options that are created if they do not exist.
100#[tracing::instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
101pub fn open_cf_opts<P: AsRef<Path>>(
102    path: P,
103    db_options: Option<rocksdb::Options>,
104    metric_conf: MetricConf,
105    opt_cfs: &[(&str, rocksdb::Options)],
106) -> Result<Arc<Database>, TypedStoreError> {
107    let path = path.as_ref();
108    // In the simulator, we intercept the wall clock in the test thread only. This
109    // causes problems because rocksdb uses the simulated clock when creating
110    // its background threads, but then those threads see the real wall clock
111    // (because they are not the test thread), which causes rocksdb to panic.
112    // The `nondeterministic` macro evaluates expressions in new threads, which
113    // resolves the issue.
114    //
115    // This is a no-op in non-simulator builds.
116
117    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
118    nondeterministic!({
119        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
120        options.create_if_missing(true);
121        options.create_missing_column_families(true);
122        let rocksdb = {
123            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
124                &options,
125                path,
126                cfs.into_iter()
127                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
128            )
129            .map_err(typed_store_err_from_rocks_err)?
130        };
131        Ok(Arc::new(Database::new(
132            Storage::Rocks(RocksDB {
133                underlying: rocksdb,
134            }),
135            metric_conf,
136        )))
137    })
138}
139
140/// Opens a database with options, and a number of column families with
141/// individual options that are created if they do not exist.
142pub fn open_cf_opts_secondary<P: AsRef<Path>>(
143    primary_path: P,
144    secondary_path: Option<P>,
145    db_options: Option<rocksdb::Options>,
146    metric_conf: MetricConf,
147    opt_cfs: &[(&str, rocksdb::Options)],
148) -> Result<Arc<Database>, TypedStoreError> {
149    let primary_path = primary_path.as_ref();
150    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
151    // See comment above for explanation of why nondeterministic is necessary here.
152    nondeterministic!({
153        // Customize database options
154        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
155
156        fdlimit::raise_fd_limit();
157        // This is a requirement by RocksDB when opening as secondary
158        options.set_max_open_files(-1);
159
160        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
161        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
162            .ok()
163            .unwrap_or_default();
164
165        let default_db_options = default_db_options();
166        // Add CFs not explicitly listed
167        for cf_key in cfs.iter() {
168            if !opt_cfs.contains_key(&cf_key[..]) {
169                opt_cfs.insert(cf_key, default_db_options.options.clone());
170            }
171        }
172
173        let primary_path = primary_path.to_path_buf();
174        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
175            let mut s = primary_path.clone();
176            s.pop();
177            s.push("SECONDARY");
178            s.as_path().to_path_buf()
179        });
180
181        let rocksdb = {
182            options.create_if_missing(true);
183            options.create_missing_column_families(true);
184            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
185                &options,
186                &primary_path,
187                &secondary_path,
188                opt_cfs
189                    .iter()
190                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
191            )
192            .map_err(typed_store_err_from_rocks_err)?;
193            db.try_catch_up_with_primary()
194                .map_err(typed_store_err_from_rocks_err)?;
195            db
196        };
197        Ok(Arc::new(Database::new(
198            Storage::Rocks(RocksDB {
199                underlying: rocksdb,
200            }),
201            metric_conf,
202        )))
203    })
204}
205
206// Drops a database if there is no other handle to it, with retries and timeout.
207pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
208    let mut backoff = backoff::ExponentialBackoff {
209        max_elapsed_time: Some(timeout),
210        ..Default::default()
211    };
212    loop {
213        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
214            Ok(()) => return Ok(()),
215            Err(err) => match backoff.next_backoff() {
216                Some(duration) => tokio::time::sleep(duration).await,
217                None => return Err(err),
218            },
219        }
220    }
221}
222
223fn populate_missing_cfs(
224    input_cfs: &[(&str, rocksdb::Options)],
225    path: &Path,
226) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
227    let mut cfs = vec![];
228    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
229    let existing_cfs =
230        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
231            .ok()
232            .unwrap_or_default();
233
234    for cf_name in existing_cfs {
235        if !input_cf_index.contains(&cf_name[..]) {
236            cfs.push((cf_name, rocksdb::Options::default()));
237        }
238    }
239    cfs.extend(
240        input_cfs
241            .iter()
242            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
243    );
244    Ok(cfs)
245}
246
247/// RocksDB-specific methods on `DBMap`. These are kept separate from the
248/// generic impl in `database.rs` because they directly access RocksDB
249/// internals and have no meaning for other storage backends.
250impl<K, V> DBMap<K, V> {
251    fn get_rocksdb_int_property(
252        rocksdb: &RocksDB,
253        cf: &impl AsColumnFamilyRef,
254        property_name: &CStr,
255    ) -> Result<i64, TypedStoreError> {
256        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
257            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
258            Ok(None) => Ok(0),
259            Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
260        }
261    }
262
263    pub(crate) fn report_rocksdb_metrics(
264        database: &Arc<Database>,
265        cf_name: &str,
266        db_metrics: &Arc<DBMetrics>,
267    ) {
268        let Storage::Rocks(rocksdb) = &database.storage else {
269            return;
270        };
271
272        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
273            warn!(
274                "unable to report metrics for cf {cf_name:?} in db {:?}",
275                database.db_name()
276            );
277            return;
278        };
279
280        db_metrics
281            .cf_metrics
282            .rocksdb_total_sst_files_size
283            .with_label_values(&[cf_name])
284            .set(
285                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
286                    .unwrap_or(METRICS_ERROR),
287            );
288        db_metrics
289            .cf_metrics
290            .rocksdb_total_blob_files_size
291            .with_label_values(&[cf_name])
292            .set(
293                Self::get_rocksdb_int_property(
294                    rocksdb,
295                    &cf,
296                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
297                )
298                .unwrap_or(METRICS_ERROR),
299            );
300        // 7 is the default number of levels in RocksDB. If we ever change the number of
301        // levels using `set_num_levels`, we need to update here as well. Note
302        // that there isn't an API to query the DB to get the number of levels (yet).
303        let total_num_files: i64 = (0..=6)
304            .map(|level| {
305                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
306                    .unwrap_or(METRICS_ERROR)
307            })
308            .sum();
309        db_metrics
310            .cf_metrics
311            .rocksdb_total_num_files
312            .with_label_values(&[cf_name])
313            .set(total_num_files);
314        db_metrics
315            .cf_metrics
316            .rocksdb_num_level0_files
317            .with_label_values(&[cf_name])
318            .set(
319                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
320                    .unwrap_or(METRICS_ERROR),
321            );
322        db_metrics
323            .cf_metrics
324            .rocksdb_current_size_active_mem_tables
325            .with_label_values(&[cf_name])
326            .set(
327                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
328                    .unwrap_or(METRICS_ERROR),
329            );
330        db_metrics
331            .cf_metrics
332            .rocksdb_size_all_mem_tables
333            .with_label_values(&[cf_name])
334            .set(
335                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
336                    .unwrap_or(METRICS_ERROR),
337            );
338        db_metrics
339            .cf_metrics
340            .rocksdb_num_snapshots
341            .with_label_values(&[cf_name])
342            .set(
343                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
344                    .unwrap_or(METRICS_ERROR),
345            );
346        db_metrics
347            .cf_metrics
348            .rocksdb_oldest_snapshot_time
349            .with_label_values(&[cf_name])
350            .set(
351                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
352                    .unwrap_or(METRICS_ERROR),
353            );
354        db_metrics
355            .cf_metrics
356            .rocksdb_actual_delayed_write_rate
357            .with_label_values(&[cf_name])
358            .set(
359                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
360                    .unwrap_or(METRICS_ERROR),
361            );
362        db_metrics
363            .cf_metrics
364            .rocksdb_is_write_stopped
365            .with_label_values(&[cf_name])
366            .set(
367                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
368                    .unwrap_or(METRICS_ERROR),
369            );
370        db_metrics
371            .cf_metrics
372            .rocksdb_block_cache_capacity
373            .with_label_values(&[cf_name])
374            .set(
375                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
376                    .unwrap_or(METRICS_ERROR),
377            );
378        db_metrics
379            .cf_metrics
380            .rocksdb_block_cache_usage
381            .with_label_values(&[cf_name])
382            .set(
383                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
384                    .unwrap_or(METRICS_ERROR),
385            );
386        db_metrics
387            .cf_metrics
388            .rocksdb_block_cache_pinned_usage
389            .with_label_values(&[cf_name])
390            .set(
391                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
392                    .unwrap_or(METRICS_ERROR),
393            );
394        db_metrics
395            .cf_metrics
396            .rocksdb_estimate_table_readers_mem
397            .with_label_values(&[cf_name])
398            .set(
399                Self::get_rocksdb_int_property(
400                    rocksdb,
401                    &cf,
402                    properties::ESTIMATE_TABLE_READERS_MEM,
403                )
404                .unwrap_or(METRICS_ERROR),
405            );
406        db_metrics
407            .cf_metrics
408            .rocksdb_estimated_num_keys
409            .with_label_values(&[cf_name])
410            .set(
411                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
412                    .unwrap_or(METRICS_ERROR),
413            );
414        db_metrics
415            .cf_metrics
416            .rocksdb_num_immutable_mem_tables
417            .with_label_values(&[cf_name])
418            .set(
419                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
420                    .unwrap_or(METRICS_ERROR),
421            );
422        db_metrics
423            .cf_metrics
424            .rocksdb_mem_table_flush_pending
425            .with_label_values(&[cf_name])
426            .set(
427                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
428                    .unwrap_or(METRICS_ERROR),
429            );
430        db_metrics
431            .cf_metrics
432            .rocksdb_compaction_pending
433            .with_label_values(&[cf_name])
434            .set(
435                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
436                    .unwrap_or(METRICS_ERROR),
437            );
438        db_metrics
439            .cf_metrics
440            .rocksdb_estimate_pending_compaction_bytes
441            .with_label_values(&[cf_name])
442            .set(
443                Self::get_rocksdb_int_property(
444                    rocksdb,
445                    &cf,
446                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
447                )
448                .unwrap_or(METRICS_ERROR),
449            );
450        db_metrics
451            .cf_metrics
452            .rocksdb_num_running_compactions
453            .with_label_values(&[cf_name])
454            .set(
455                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
456                    .unwrap_or(METRICS_ERROR),
457            );
458        db_metrics
459            .cf_metrics
460            .rocksdb_num_running_flushes
461            .with_label_values(&[cf_name])
462            .set(
463                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
464                    .unwrap_or(METRICS_ERROR),
465            );
466        db_metrics
467            .cf_metrics
468            .rocksdb_estimate_oldest_key_time
469            .with_label_values(&[cf_name])
470            .set(
471                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
472                    .unwrap_or(METRICS_ERROR),
473            );
474        db_metrics
475            .cf_metrics
476            .rocksdb_background_errors
477            .with_label_values(&[cf_name])
478            .set(
479                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
480                    .unwrap_or(METRICS_ERROR),
481            );
482        db_metrics
483            .cf_metrics
484            .rocksdb_base_level
485            .with_label_values(&[cf_name])
486            .set(
487                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
488                    .unwrap_or(METRICS_ERROR),
489            );
490    }
491}