1pub mod errors;
6pub(crate) mod options;
7pub(crate) mod rocks_util;
8pub(crate) mod safe_iter;
9
10use std::{
11 collections::HashSet,
12 ffi::CStr,
13 path::{Path, PathBuf},
14 sync::Arc,
15 time::Duration,
16};
17
18use backoff::backoff::Backoff;
19use iota_macros::nondeterministic;
20use rocksdb::{
21 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, properties,
22 properties::num_files_at_level,
23};
24use tracing::warn;
25use typed_store_error::TypedStoreError;
26
27pub use crate::{
28 database::{DBBatch, DBMap, MetricConf},
29 rocks::options::{
30 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, list_tables,
31 read_size_from_env,
32 },
33};
34use crate::{
35 database::{Database, Storage},
36 metrics::DBMetrics,
37 rocks::errors::typed_store_err_from_rocks_err,
38};
39
40const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
43 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
44
45const METRICS_ERROR: i64 = -1;
46
47const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
48
49#[cfg(test)]
50mod tests;
51
52#[macro_export]
54macro_rules! reopen {
55 ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
56 (
57 $(
58 DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
59 ),*
60 )
61 };
62}
63
64#[derive(Debug)]
65pub(crate) struct RocksDB {
66 pub(crate) underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
67}
68
69impl Drop for RocksDB {
70 fn drop(&mut self) {
71 self.underlying.cancel_all_background_work(true);
72 }
73}
74
75pub(crate) fn rocks_cf<'a>(
76 rocks_db: &'a RocksDB,
77 cf_name: &str,
78) -> Arc<rocksdb::BoundColumnFamily<'a>> {
79 rocks_db
80 .underlying
81 .cf_handle(cf_name)
82 .expect("Map-keying column family should have been checked at DB creation")
83}
84
85pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
88 let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
89
90 db.get(DB_CORRUPTED_KEY)
91 .map_err(|e| format!("Failed to open database: {e}"))
92 .and_then(|value| match value {
93 Some(v) if v[0] == 1 => Err(
94 "Database is corrupted, please remove the current database and start clean!"
95 .to_string(),
96 ),
97 Some(_) => Ok(()),
98 None => db
99 .put(DB_CORRUPTED_KEY, [1])
100 .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
101 })?;
102
103 Ok(())
104}
105
106pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
107 rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
108}
109
110#[tracing::instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
113pub fn open_cf_opts<P: AsRef<Path>>(
114 path: P,
115 db_options: Option<rocksdb::Options>,
116 metric_conf: MetricConf,
117 opt_cfs: &[(&str, rocksdb::Options)],
118) -> Result<Arc<Database>, TypedStoreError> {
119 let path = path.as_ref();
120 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
130 nondeterministic!({
131 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
132 options.create_if_missing(true);
133 options.create_missing_column_families(true);
134 let rocksdb = {
135 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
136 &options,
137 path,
138 cfs.into_iter()
139 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
140 )
141 .map_err(typed_store_err_from_rocks_err)?
142 };
143 Ok(Arc::new(Database::new(
144 Storage::Rocks(RocksDB {
145 underlying: rocksdb,
146 }),
147 metric_conf,
148 )))
149 })
150}
151
152pub fn open_cf_opts_secondary<P: AsRef<Path>>(
155 primary_path: P,
156 secondary_path: Option<P>,
157 db_options: Option<rocksdb::Options>,
158 metric_conf: MetricConf,
159 opt_cfs: &[(&str, rocksdb::Options)],
160) -> Result<Arc<Database>, TypedStoreError> {
161 let primary_path = primary_path.as_ref();
162 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
163 nondeterministic!({
165 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
167
168 fdlimit::raise_fd_limit();
169 options.set_max_open_files(-1);
171
172 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
173 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
174 .ok()
175 .unwrap_or_default();
176
177 let default_db_options = default_db_options();
178 for cf_key in cfs.iter() {
180 if !opt_cfs.contains_key(&cf_key[..]) {
181 opt_cfs.insert(cf_key, default_db_options.options.clone());
182 }
183 }
184
185 let primary_path = primary_path.to_path_buf();
186 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
187 let mut s = primary_path.clone();
188 s.pop();
189 s.push("SECONDARY");
190 s.as_path().to_path_buf()
191 });
192
193 let rocksdb = {
194 options.create_if_missing(true);
195 options.create_missing_column_families(true);
196 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
197 &options,
198 &primary_path,
199 &secondary_path,
200 opt_cfs
201 .iter()
202 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
203 )
204 .map_err(typed_store_err_from_rocks_err)?;
205 db.try_catch_up_with_primary()
206 .map_err(typed_store_err_from_rocks_err)?;
207 db
208 };
209 Ok(Arc::new(Database::new(
210 Storage::Rocks(RocksDB {
211 underlying: rocksdb,
212 }),
213 metric_conf,
214 )))
215 })
216}
217
218pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
220 let mut backoff = backoff::ExponentialBackoff {
221 max_elapsed_time: Some(timeout),
222 ..Default::default()
223 };
224 loop {
225 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
226 Ok(()) => return Ok(()),
227 Err(err) => match backoff.next_backoff() {
228 Some(duration) => tokio::time::sleep(duration).await,
229 None => return Err(err),
230 },
231 }
232 }
233}
234
235fn populate_missing_cfs(
236 input_cfs: &[(&str, rocksdb::Options)],
237 path: &Path,
238) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
239 let mut cfs = vec![];
240 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
241 let existing_cfs =
242 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
243 .ok()
244 .unwrap_or_default();
245
246 for cf_name in existing_cfs {
247 if !input_cf_index.contains(&cf_name[..]) {
248 cfs.push((cf_name, rocksdb::Options::default()));
249 }
250 }
251 cfs.extend(
252 input_cfs
253 .iter()
254 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
255 );
256 Ok(cfs)
257}
258
259impl<K, V> DBMap<K, V> {
263 fn get_rocksdb_int_property(
264 rocksdb: &RocksDB,
265 cf: &impl AsColumnFamilyRef,
266 property_name: &CStr,
267 ) -> Result<i64, TypedStoreError> {
268 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
269 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
270 Ok(None) => Ok(0),
271 Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
272 }
273 }
274
275 pub(crate) fn report_rocksdb_metrics(
276 database: &Arc<Database>,
277 cf_name: &str,
278 db_metrics: &Arc<DBMetrics>,
279 ) {
280 let Storage::Rocks(rocksdb) = &database.storage else {
281 return;
282 };
283
284 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
285 warn!(
286 "unable to report metrics for cf {cf_name:?} in db {:?}",
287 database.db_name()
288 );
289 return;
290 };
291
292 db_metrics
293 .cf_metrics
294 .rocksdb_total_sst_files_size
295 .with_label_values(&[cf_name])
296 .set(
297 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
298 .unwrap_or(METRICS_ERROR),
299 );
300 db_metrics
301 .cf_metrics
302 .rocksdb_total_blob_files_size
303 .with_label_values(&[cf_name])
304 .set(
305 Self::get_rocksdb_int_property(
306 rocksdb,
307 &cf,
308 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
309 )
310 .unwrap_or(METRICS_ERROR),
311 );
312 let total_num_files: i64 = (0..=6)
316 .map(|level| {
317 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
318 .unwrap_or(METRICS_ERROR)
319 })
320 .sum();
321 db_metrics
322 .cf_metrics
323 .rocksdb_total_num_files
324 .with_label_values(&[cf_name])
325 .set(total_num_files);
326 db_metrics
327 .cf_metrics
328 .rocksdb_num_level0_files
329 .with_label_values(&[cf_name])
330 .set(
331 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
332 .unwrap_or(METRICS_ERROR),
333 );
334 db_metrics
335 .cf_metrics
336 .rocksdb_current_size_active_mem_tables
337 .with_label_values(&[cf_name])
338 .set(
339 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
340 .unwrap_or(METRICS_ERROR),
341 );
342 db_metrics
343 .cf_metrics
344 .rocksdb_size_all_mem_tables
345 .with_label_values(&[cf_name])
346 .set(
347 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
348 .unwrap_or(METRICS_ERROR),
349 );
350 db_metrics
351 .cf_metrics
352 .rocksdb_num_snapshots
353 .with_label_values(&[cf_name])
354 .set(
355 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
356 .unwrap_or(METRICS_ERROR),
357 );
358 db_metrics
359 .cf_metrics
360 .rocksdb_oldest_snapshot_time
361 .with_label_values(&[cf_name])
362 .set(
363 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
364 .unwrap_or(METRICS_ERROR),
365 );
366 db_metrics
367 .cf_metrics
368 .rocksdb_actual_delayed_write_rate
369 .with_label_values(&[cf_name])
370 .set(
371 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
372 .unwrap_or(METRICS_ERROR),
373 );
374 db_metrics
375 .cf_metrics
376 .rocksdb_is_write_stopped
377 .with_label_values(&[cf_name])
378 .set(
379 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
380 .unwrap_or(METRICS_ERROR),
381 );
382 db_metrics
383 .cf_metrics
384 .rocksdb_block_cache_capacity
385 .with_label_values(&[cf_name])
386 .set(
387 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
388 .unwrap_or(METRICS_ERROR),
389 );
390 db_metrics
391 .cf_metrics
392 .rocksdb_block_cache_usage
393 .with_label_values(&[cf_name])
394 .set(
395 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
396 .unwrap_or(METRICS_ERROR),
397 );
398 db_metrics
399 .cf_metrics
400 .rocksdb_block_cache_pinned_usage
401 .with_label_values(&[cf_name])
402 .set(
403 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
404 .unwrap_or(METRICS_ERROR),
405 );
406 db_metrics
407 .cf_metrics
408 .rocksdb_estimate_table_readers_mem
409 .with_label_values(&[cf_name])
410 .set(
411 Self::get_rocksdb_int_property(
412 rocksdb,
413 &cf,
414 properties::ESTIMATE_TABLE_READERS_MEM,
415 )
416 .unwrap_or(METRICS_ERROR),
417 );
418 db_metrics
419 .cf_metrics
420 .rocksdb_estimated_num_keys
421 .with_label_values(&[cf_name])
422 .set(
423 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
424 .unwrap_or(METRICS_ERROR),
425 );
426 db_metrics
427 .cf_metrics
428 .rocksdb_num_immutable_mem_tables
429 .with_label_values(&[cf_name])
430 .set(
431 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
432 .unwrap_or(METRICS_ERROR),
433 );
434 db_metrics
435 .cf_metrics
436 .rocksdb_mem_table_flush_pending
437 .with_label_values(&[cf_name])
438 .set(
439 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
440 .unwrap_or(METRICS_ERROR),
441 );
442 db_metrics
443 .cf_metrics
444 .rocksdb_compaction_pending
445 .with_label_values(&[cf_name])
446 .set(
447 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
448 .unwrap_or(METRICS_ERROR),
449 );
450 db_metrics
451 .cf_metrics
452 .rocksdb_estimate_pending_compaction_bytes
453 .with_label_values(&[cf_name])
454 .set(
455 Self::get_rocksdb_int_property(
456 rocksdb,
457 &cf,
458 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
459 )
460 .unwrap_or(METRICS_ERROR),
461 );
462 db_metrics
463 .cf_metrics
464 .rocksdb_num_running_compactions
465 .with_label_values(&[cf_name])
466 .set(
467 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
468 .unwrap_or(METRICS_ERROR),
469 );
470 db_metrics
471 .cf_metrics
472 .rocksdb_num_running_flushes
473 .with_label_values(&[cf_name])
474 .set(
475 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
476 .unwrap_or(METRICS_ERROR),
477 );
478 db_metrics
479 .cf_metrics
480 .rocksdb_estimate_oldest_key_time
481 .with_label_values(&[cf_name])
482 .set(
483 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
484 .unwrap_or(METRICS_ERROR),
485 );
486 db_metrics
487 .cf_metrics
488 .rocksdb_background_errors
489 .with_label_values(&[cf_name])
490 .set(
491 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
492 .unwrap_or(METRICS_ERROR),
493 );
494 db_metrics
495 .cf_metrics
496 .rocksdb_base_level
497 .with_label_values(&[cf_name])
498 .set(
499 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
500 .unwrap_or(METRICS_ERROR),
501 );
502 }
503}