1pub mod errors;
6pub(crate) mod options;
7pub(crate) mod rocks_util;
8pub(crate) mod safe_iter;
9
10use std::{
11 collections::HashSet,
12 ffi::CStr,
13 path::{Path, PathBuf},
14 sync::Arc,
15 time::Duration,
16};
17
18use backoff::backoff::Backoff;
19use iota_macros::nondeterministic;
20use rocksdb::{
21 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, properties,
22 properties::num_files_at_level,
23};
24use tracing::warn;
25use typed_store_error::TypedStoreError;
26
27pub use crate::{
28 database::{DBBatch, DBMap, MetricConf},
29 rocks::options::{
30 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, list_tables,
31 read_size_from_env,
32 },
33};
34use crate::{
35 database::{Database, Storage},
36 metrics::DBMetrics,
37 rocks::errors::typed_store_err_from_rocks_err,
38};
39
40const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
43 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
44
45const METRICS_ERROR: i64 = -1;
46
47const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
48
49#[cfg(test)]
50mod tests;
51
52#[derive(Debug)]
53pub(crate) struct RocksDB {
54 pub(crate) underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
55}
56
57impl Drop for RocksDB {
58 fn drop(&mut self) {
59 self.underlying.cancel_all_background_work(true);
60 }
61}
62
63pub(crate) fn rocks_cf<'a>(
64 rocks_db: &'a RocksDB,
65 cf_name: &str,
66) -> Arc<rocksdb::BoundColumnFamily<'a>> {
67 rocks_db
68 .underlying
69 .cf_handle(cf_name)
70 .expect("Map-keying column family should have been checked at DB creation")
71}
72
73pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
76 let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
77
78 db.get(DB_CORRUPTED_KEY)
79 .map_err(|e| format!("Failed to open database: {e}"))
80 .and_then(|value| match value {
81 Some(v) if v[0] == 1 => Err(
82 "Database is corrupted, please remove the current database and start clean!"
83 .to_string(),
84 ),
85 Some(_) => Ok(()),
86 None => db
87 .put(DB_CORRUPTED_KEY, [1])
88 .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
89 })?;
90
91 Ok(())
92}
93
94pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
95 rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
96}
97
98#[tracing::instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
101pub fn open_cf_opts<P: AsRef<Path>>(
102 path: P,
103 db_options: Option<rocksdb::Options>,
104 metric_conf: MetricConf,
105 opt_cfs: &[(&str, rocksdb::Options)],
106) -> Result<Arc<Database>, TypedStoreError> {
107 let path = path.as_ref();
108 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
118 nondeterministic!({
119 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
120 options.create_if_missing(true);
121 options.create_missing_column_families(true);
122 let rocksdb = {
123 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
124 &options,
125 path,
126 cfs.into_iter()
127 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
128 )
129 .map_err(typed_store_err_from_rocks_err)?
130 };
131 Ok(Arc::new(Database::new(
132 Storage::Rocks(RocksDB {
133 underlying: rocksdb,
134 }),
135 metric_conf,
136 )))
137 })
138}
139
140pub fn open_cf_opts_secondary<P: AsRef<Path>>(
143 primary_path: P,
144 secondary_path: Option<P>,
145 db_options: Option<rocksdb::Options>,
146 metric_conf: MetricConf,
147 opt_cfs: &[(&str, rocksdb::Options)],
148) -> Result<Arc<Database>, TypedStoreError> {
149 let primary_path = primary_path.as_ref();
150 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
151 nondeterministic!({
153 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
155
156 fdlimit::raise_fd_limit();
157 options.set_max_open_files(-1);
159
160 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
161 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
162 .ok()
163 .unwrap_or_default();
164
165 let default_db_options = default_db_options();
166 for cf_key in cfs.iter() {
168 if !opt_cfs.contains_key(&cf_key[..]) {
169 opt_cfs.insert(cf_key, default_db_options.options.clone());
170 }
171 }
172
173 let primary_path = primary_path.to_path_buf();
174 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
175 let mut s = primary_path.clone();
176 s.pop();
177 s.push("SECONDARY");
178 s.as_path().to_path_buf()
179 });
180
181 let rocksdb = {
182 options.create_if_missing(true);
183 options.create_missing_column_families(true);
184 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
185 &options,
186 &primary_path,
187 &secondary_path,
188 opt_cfs
189 .iter()
190 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
191 )
192 .map_err(typed_store_err_from_rocks_err)?;
193 db.try_catch_up_with_primary()
194 .map_err(typed_store_err_from_rocks_err)?;
195 db
196 };
197 Ok(Arc::new(Database::new(
198 Storage::Rocks(RocksDB {
199 underlying: rocksdb,
200 }),
201 metric_conf,
202 )))
203 })
204}
205
206pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
208 let mut backoff = backoff::ExponentialBackoff {
209 max_elapsed_time: Some(timeout),
210 ..Default::default()
211 };
212 loop {
213 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
214 Ok(()) => return Ok(()),
215 Err(err) => match backoff.next_backoff() {
216 Some(duration) => tokio::time::sleep(duration).await,
217 None => return Err(err),
218 },
219 }
220 }
221}
222
223fn populate_missing_cfs(
224 input_cfs: &[(&str, rocksdb::Options)],
225 path: &Path,
226) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
227 let mut cfs = vec![];
228 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
229 let existing_cfs =
230 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
231 .ok()
232 .unwrap_or_default();
233
234 for cf_name in existing_cfs {
235 if !input_cf_index.contains(&cf_name[..]) {
236 cfs.push((cf_name, rocksdb::Options::default()));
237 }
238 }
239 cfs.extend(
240 input_cfs
241 .iter()
242 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
243 );
244 Ok(cfs)
245}
246
247impl<K, V> DBMap<K, V> {
251 fn get_rocksdb_int_property(
252 rocksdb: &RocksDB,
253 cf: &impl AsColumnFamilyRef,
254 property_name: &CStr,
255 ) -> Result<i64, TypedStoreError> {
256 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
257 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
258 Ok(None) => Ok(0),
259 Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
260 }
261 }
262
263 pub(crate) fn report_rocksdb_metrics(
264 database: &Arc<Database>,
265 cf_name: &str,
266 db_metrics: &Arc<DBMetrics>,
267 ) {
268 let Storage::Rocks(rocksdb) = &database.storage else {
269 return;
270 };
271
272 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
273 warn!(
274 "unable to report metrics for cf {cf_name:?} in db {:?}",
275 database.db_name()
276 );
277 return;
278 };
279
280 db_metrics
281 .cf_metrics
282 .rocksdb_total_sst_files_size
283 .with_label_values(&[cf_name])
284 .set(
285 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
286 .unwrap_or(METRICS_ERROR),
287 );
288 db_metrics
289 .cf_metrics
290 .rocksdb_total_blob_files_size
291 .with_label_values(&[cf_name])
292 .set(
293 Self::get_rocksdb_int_property(
294 rocksdb,
295 &cf,
296 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
297 )
298 .unwrap_or(METRICS_ERROR),
299 );
300 let total_num_files: i64 = (0..=6)
304 .map(|level| {
305 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
306 .unwrap_or(METRICS_ERROR)
307 })
308 .sum();
309 db_metrics
310 .cf_metrics
311 .rocksdb_total_num_files
312 .with_label_values(&[cf_name])
313 .set(total_num_files);
314 db_metrics
315 .cf_metrics
316 .rocksdb_num_level0_files
317 .with_label_values(&[cf_name])
318 .set(
319 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
320 .unwrap_or(METRICS_ERROR),
321 );
322 db_metrics
323 .cf_metrics
324 .rocksdb_current_size_active_mem_tables
325 .with_label_values(&[cf_name])
326 .set(
327 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
328 .unwrap_or(METRICS_ERROR),
329 );
330 db_metrics
331 .cf_metrics
332 .rocksdb_size_all_mem_tables
333 .with_label_values(&[cf_name])
334 .set(
335 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
336 .unwrap_or(METRICS_ERROR),
337 );
338 db_metrics
339 .cf_metrics
340 .rocksdb_num_snapshots
341 .with_label_values(&[cf_name])
342 .set(
343 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
344 .unwrap_or(METRICS_ERROR),
345 );
346 db_metrics
347 .cf_metrics
348 .rocksdb_oldest_snapshot_time
349 .with_label_values(&[cf_name])
350 .set(
351 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
352 .unwrap_or(METRICS_ERROR),
353 );
354 db_metrics
355 .cf_metrics
356 .rocksdb_actual_delayed_write_rate
357 .with_label_values(&[cf_name])
358 .set(
359 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
360 .unwrap_or(METRICS_ERROR),
361 );
362 db_metrics
363 .cf_metrics
364 .rocksdb_is_write_stopped
365 .with_label_values(&[cf_name])
366 .set(
367 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
368 .unwrap_or(METRICS_ERROR),
369 );
370 db_metrics
371 .cf_metrics
372 .rocksdb_block_cache_capacity
373 .with_label_values(&[cf_name])
374 .set(
375 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
376 .unwrap_or(METRICS_ERROR),
377 );
378 db_metrics
379 .cf_metrics
380 .rocksdb_block_cache_usage
381 .with_label_values(&[cf_name])
382 .set(
383 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
384 .unwrap_or(METRICS_ERROR),
385 );
386 db_metrics
387 .cf_metrics
388 .rocksdb_block_cache_pinned_usage
389 .with_label_values(&[cf_name])
390 .set(
391 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
392 .unwrap_or(METRICS_ERROR),
393 );
394 db_metrics
395 .cf_metrics
396 .rocksdb_estimate_table_readers_mem
397 .with_label_values(&[cf_name])
398 .set(
399 Self::get_rocksdb_int_property(
400 rocksdb,
401 &cf,
402 properties::ESTIMATE_TABLE_READERS_MEM,
403 )
404 .unwrap_or(METRICS_ERROR),
405 );
406 db_metrics
407 .cf_metrics
408 .rocksdb_estimated_num_keys
409 .with_label_values(&[cf_name])
410 .set(
411 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
412 .unwrap_or(METRICS_ERROR),
413 );
414 db_metrics
415 .cf_metrics
416 .rocksdb_num_immutable_mem_tables
417 .with_label_values(&[cf_name])
418 .set(
419 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
420 .unwrap_or(METRICS_ERROR),
421 );
422 db_metrics
423 .cf_metrics
424 .rocksdb_mem_table_flush_pending
425 .with_label_values(&[cf_name])
426 .set(
427 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
428 .unwrap_or(METRICS_ERROR),
429 );
430 db_metrics
431 .cf_metrics
432 .rocksdb_compaction_pending
433 .with_label_values(&[cf_name])
434 .set(
435 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
436 .unwrap_or(METRICS_ERROR),
437 );
438 db_metrics
439 .cf_metrics
440 .rocksdb_estimate_pending_compaction_bytes
441 .with_label_values(&[cf_name])
442 .set(
443 Self::get_rocksdb_int_property(
444 rocksdb,
445 &cf,
446 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
447 )
448 .unwrap_or(METRICS_ERROR),
449 );
450 db_metrics
451 .cf_metrics
452 .rocksdb_num_running_compactions
453 .with_label_values(&[cf_name])
454 .set(
455 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
456 .unwrap_or(METRICS_ERROR),
457 );
458 db_metrics
459 .cf_metrics
460 .rocksdb_num_running_flushes
461 .with_label_values(&[cf_name])
462 .set(
463 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
464 .unwrap_or(METRICS_ERROR),
465 );
466 db_metrics
467 .cf_metrics
468 .rocksdb_estimate_oldest_key_time
469 .with_label_values(&[cf_name])
470 .set(
471 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
472 .unwrap_or(METRICS_ERROR),
473 );
474 db_metrics
475 .cf_metrics
476 .rocksdb_background_errors
477 .with_label_values(&[cf_name])
478 .set(
479 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
480 .unwrap_or(METRICS_ERROR),
481 );
482 db_metrics
483 .cf_metrics
484 .rocksdb_base_level
485 .with_label_values(&[cf_name])
486 .set(
487 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
488 .unwrap_or(METRICS_ERROR),
489 );
490 }
491}