1pub mod errors;
6pub(crate) mod safe_iter;
7
8use std::{
9 borrow::Borrow,
10 collections::{BTreeMap, HashSet},
11 env,
12 ffi::CStr,
13 marker::PhantomData,
14 ops::{Bound, RangeBounds},
15 path::{Path, PathBuf},
16 sync::Arc,
17 time::Duration,
18};
19
20use backoff::backoff::Backoff;
21use bincode::Options;
22use iota_macros::{fail_point, nondeterministic};
23use prometheus::{Histogram, HistogramTimer};
24use rocksdb::{
25 AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
26 ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, LiveFile,
27 MultiThreaded, OptimisticTransactionDB, ReadOptions, SnapshotWithThreadMode, WriteBatch,
28 WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
29};
30use serde::{Serialize, de::DeserializeOwned};
31use tap::TapFallible;
32use tokio::sync::oneshot;
33use tracing::{debug, error, info, instrument, warn};
34
35use crate::{
36 TypedStoreError,
37 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
38 rocks::{
39 errors::{
40 typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
41 typed_store_err_from_rocks_err,
42 },
43 safe_iter::{SafeIter, SafeRevIter},
44 },
45 traits::{Map, TableSummary},
46};
47
48const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
51const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
52
53const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
56const DEFAULT_DB_WAL_SIZE: usize = 1024;
57
58const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
61const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
62const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
63const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
64const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
65const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
66const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
67const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
68const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
69
70const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
72
73const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
74
75const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
78 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
79
80const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
81
82#[cfg(test)]
83mod tests;
84
85#[macro_export]
123macro_rules! reopen {
124 ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
125 (
126 $(
127 DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
128 ),*
129 )
130 };
131}
132
133#[derive(Debug)]
134pub struct RocksDB {
135 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
136 pub metric_conf: MetricConf,
137 pub db_path: PathBuf,
138}
139
140impl Drop for RocksDB {
141 fn drop(&mut self) {
142 self.underlying.cancel_all_background_work(true);
143 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
144 }
145}
146
147impl RocksDB {
148 fn new(
149 underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
150 metric_conf: MetricConf,
151 db_path: PathBuf,
152 ) -> Self {
153 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
154 Self {
155 underlying,
156 metric_conf,
157 db_path,
158 }
159 }
160
161 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
162 self.underlying.get(key)
163 }
164
165 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
166 &'a self,
167 keys: I,
168 readopts: &ReadOptions,
169 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
170 where
171 K: AsRef<[u8]>,
172 I: IntoIterator<Item = (&'b W, K)>,
173 W: 'b + AsColumnFamilyRef,
174 {
175 self.underlying.multi_get_cf_opt(keys, readopts)
176 }
177
178 pub fn batched_multi_get_cf_opt<I, K>(
179 &self,
180 cf: &impl AsColumnFamilyRef,
181 keys: I,
182 sorted_input: bool,
183 readopts: &ReadOptions,
184 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
185 where
186 I: IntoIterator<Item = K>,
187 K: AsRef<[u8]>,
188 {
189 self.underlying
190 .batched_multi_get_cf_opt(cf, keys, sorted_input, readopts)
191 }
192
193 pub fn property_int_value_cf(
194 &self,
195 cf: &impl AsColumnFamilyRef,
196 name: impl CStrLike,
197 ) -> Result<Option<u64>, rocksdb::Error> {
198 self.underlying.property_int_value_cf(cf, name)
199 }
200
201 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
202 &self,
203 cf: &impl AsColumnFamilyRef,
204 key: K,
205 readopts: &ReadOptions,
206 ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
207 self.underlying.get_pinned_cf_opt(cf, key, readopts)
208 }
209
210 pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
211 self.underlying.cf_handle(name)
212 }
213
214 pub fn create_cf<N: AsRef<str>>(
215 &self,
216 name: N,
217 opts: &rocksdb::Options,
218 ) -> Result<(), rocksdb::Error> {
219 self.underlying.create_cf(name, opts)
220 }
221
222 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
223 self.underlying.drop_cf(name)
224 }
225
226 pub fn delete_cf<K: AsRef<[u8]>>(
227 &self,
228 cf: &impl AsColumnFamilyRef,
229 key: K,
230 writeopts: &WriteOptions,
231 ) -> Result<(), rocksdb::Error> {
232 fail_point!("delete-cf-before");
233 let ret = self.underlying.delete_cf_opt(cf, key, writeopts);
234 fail_point!("delete-cf-after");
235 #[expect(clippy::let_and_return)]
236 ret
237 }
238
239 pub fn path(&self) -> &Path {
240 self.underlying.path()
241 }
242
243 pub fn put_cf<K, V>(
244 &self,
245 cf: &impl AsColumnFamilyRef,
246 key: K,
247 value: V,
248 writeopts: &WriteOptions,
249 ) -> Result<(), rocksdb::Error>
250 where
251 K: AsRef<[u8]>,
252 V: AsRef<[u8]>,
253 {
254 fail_point!("put-cf-before");
255 let ret = self.underlying.put_cf_opt(cf, key, value, writeopts);
256 fail_point!("put-cf-after");
257 #[expect(clippy::let_and_return)]
258 ret
259 }
260
261 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
262 &self,
263 cf: &impl AsColumnFamilyRef,
264 key: K,
265 readopts: &ReadOptions,
266 ) -> bool {
267 self.underlying.key_may_exist_cf_opt(cf, key, readopts)
268 }
269
270 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
271 self.underlying.try_catch_up_with_primary()
272 }
273
274 pub fn write(
275 &self,
276 batch: rocksdb::WriteBatch,
277 writeopts: &WriteOptions,
278 ) -> Result<(), TypedStoreError> {
279 fail_point!("batch-write-before");
280 self.underlying
281 .write_opt(batch, writeopts)
282 .map_err(typed_store_err_from_rocks_err)?;
283 fail_point!("batch-write-after");
284
285 Ok(())
286 }
287
288 pub fn raw_iterator_cf<'a: 'b, 'b>(
289 &'a self,
290 cf_handle: &impl AsColumnFamilyRef,
291 readopts: ReadOptions,
292 ) -> RocksDBRawIter<'b> {
293 self.underlying.raw_iterator_cf_opt(cf_handle, readopts)
294 }
295
296 pub fn compact_range_cf<K: AsRef<[u8]>>(
297 &self,
298 cf: &impl AsColumnFamilyRef,
299 start: Option<K>,
300 end: Option<K>,
301 ) {
302 self.underlying.compact_range_cf(cf, start, end)
303 }
304
305 pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
306 &self,
307 cf: &impl AsColumnFamilyRef,
308 start: Option<K>,
309 end: Option<K>,
310 ) {
311 let opt = &mut CompactOptions::default();
312 opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
313 self.underlying.compact_range_cf_opt(cf, start, end, opt)
314 }
315
316 pub fn flush(&self) -> Result<(), TypedStoreError> {
317 self.underlying
318 .flush()
319 .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
320 }
321
322 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
323 let checkpoint =
324 Checkpoint::new(&self.underlying).map_err(typed_store_err_from_rocks_err)?;
325 checkpoint
326 .create_checkpoint(path)
327 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
328 Ok(())
329 }
330
331 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
332 self.underlying.flush_cf(cf)
333 }
334
335 pub fn set_options_cf(
336 &self,
337 cf: &impl AsColumnFamilyRef,
338 opts: &[(&str, &str)],
339 ) -> Result<(), rocksdb::Error> {
340 self.underlying.set_options_cf(cf, opts)
341 }
342
343 pub fn get_sampling_interval(&self) -> SamplingInterval {
344 self.metric_conf.read_sample_interval.new_from_self()
345 }
346
347 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
348 self.metric_conf.read_sample_interval.new_from_self()
349 }
350
351 pub fn write_sampling_interval(&self) -> SamplingInterval {
352 self.metric_conf.write_sample_interval.new_from_self()
353 }
354
355 pub fn iter_sampling_interval(&self) -> SamplingInterval {
356 self.metric_conf.iter_sample_interval.new_from_self()
357 }
358
359 pub fn db_name(&self) -> String {
360 let name = &self.metric_conf.db_name;
361 if name.is_empty() {
362 self.default_db_name()
363 } else {
364 name.clone()
365 }
366 }
367
368 fn default_db_name(&self) -> String {
369 self.path()
370 .file_name()
371 .and_then(|f| f.to_str())
372 .unwrap_or("unknown")
373 .to_string()
374 }
375
376 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
377 self.underlying.live_files()
378 }
379}
380
381pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
384 let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
385
386 db.get(DB_CORRUPTED_KEY)
387 .map_err(|e| format!("Failed to open database: {e}"))
388 .and_then(|value| match value {
389 Some(v) if v[0] == 1 => Err(
390 "Database is corrupted, please remove the current database and start clean!"
391 .to_string(),
392 ),
393 Some(_) => Ok(()),
394 None => db
395 .put(DB_CORRUPTED_KEY, [1])
396 .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
397 })?;
398
399 Ok(())
400}
401
402pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
403 rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
404}
405
406pub enum RocksDBSnapshot<'a> {
407 DBWithThreadMode(rocksdb::Snapshot<'a>),
408 OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
409}
410
411impl<'a> RocksDBSnapshot<'a> {
412 pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
413 &'a self,
414 keys: I,
415 readopts: ReadOptions,
416 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
417 where
418 K: AsRef<[u8]>,
419 I: IntoIterator<Item = (&'b W, K)>,
420 W: 'b + AsColumnFamilyRef,
421 {
422 match self {
423 Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
424 Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
425 }
426 }
427 pub fn multi_get_cf<'b: 'a, K, I, W>(
428 &'a self,
429 keys: I,
430 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
431 where
432 K: AsRef<[u8]>,
433 I: IntoIterator<Item = (&'b W, K)>,
434 W: 'b + AsColumnFamilyRef,
435 {
436 match self {
437 Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
438 Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
439 }
440 }
441}
442
443#[derive(Debug, Default)]
444pub struct MetricConf {
445 pub db_name: String,
446 pub read_sample_interval: SamplingInterval,
447 pub write_sample_interval: SamplingInterval,
448 pub iter_sample_interval: SamplingInterval,
449}
450
451impl MetricConf {
452 pub fn new(db_name: &str) -> Self {
453 if db_name.is_empty() {
454 error!("A meaningful db name should be used for metrics reporting.")
455 }
456 Self {
457 db_name: db_name.to_string(),
458 read_sample_interval: SamplingInterval::default(),
459 write_sample_interval: SamplingInterval::default(),
460 iter_sample_interval: SamplingInterval::default(),
461 }
462 }
463
464 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
465 Self {
466 db_name: self.db_name,
467 read_sample_interval: read_interval,
468 write_sample_interval: SamplingInterval::default(),
469 iter_sample_interval: SamplingInterval::default(),
470 }
471 }
472}
473const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
474const METRICS_ERROR: i64 = -1;
475
476#[derive(Clone, Debug)]
478pub struct DBMap<K, V> {
479 pub rocksdb: Arc<RocksDB>,
480 _phantom: PhantomData<fn(K) -> V>,
481 cf: String,
483 pub opts: ReadWriteOptions,
484 db_metrics: Arc<DBMetrics>,
485 get_sample_interval: SamplingInterval,
486 multiget_sample_interval: SamplingInterval,
487 write_sample_interval: SamplingInterval,
488 iter_sample_interval: SamplingInterval,
489 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
490}
491
492unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
493
494impl<K, V> DBMap<K, V> {
495 pub(crate) fn new(
496 db: Arc<RocksDB>,
497 opts: &ReadWriteOptions,
498 opt_cf: &str,
499 is_deprecated: bool,
500 ) -> Self {
501 let db_cloned = db.clone();
502 let db_metrics = DBMetrics::get();
503 let db_metrics_cloned = db_metrics.clone();
504 let cf = opt_cf.to_string();
505 let (sender, mut recv) = tokio::sync::oneshot::channel();
506 if !is_deprecated {
507 tokio::task::spawn(async move {
508 let mut interval =
509 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
510 loop {
511 tokio::select! {
512 _ = interval.tick() => {
513 let db = db_cloned.clone();
514 let cf = cf.clone();
515 let db_metrics = db_metrics.clone();
516 if let Err(e) = tokio::task::spawn_blocking(move || {
517 Self::report_metrics(&db, &cf, &db_metrics);
518 }).await {
519 error!("Failed to log metrics with error: {}", e);
520 }
521 }
522 _ = &mut recv => break,
523 }
524 }
525 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
526 });
527 }
528 DBMap {
529 rocksdb: db.clone(),
530 opts: opts.clone(),
531 _phantom: PhantomData,
532 cf: opt_cf.to_string(),
533 db_metrics: db_metrics_cloned,
534 _metrics_task_cancel_handle: Arc::new(sender),
535 get_sample_interval: db.get_sampling_interval(),
536 multiget_sample_interval: db.multiget_sampling_interval(),
537 write_sample_interval: db.write_sampling_interval(),
538 iter_sample_interval: db.iter_sampling_interval(),
539 }
540 }
541
542 #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
549 pub fn open<P: AsRef<Path>>(
550 path: P,
551 metric_conf: MetricConf,
552 db_options: Option<rocksdb::Options>,
553 opt_cf: Option<&str>,
554 rw_options: &ReadWriteOptions,
555 ) -> Result<Self, TypedStoreError> {
556 let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
557 let cfs = vec![cf_key];
558 let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
559 Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
560 }
561
562 #[instrument(level = "debug", skip(db), err)]
602 pub fn reopen(
603 db: &Arc<RocksDB>,
604 opt_cf: Option<&str>,
605 rw_options: &ReadWriteOptions,
606 is_deprecated: bool,
607 ) -> Result<Self, TypedStoreError> {
608 let cf_key = opt_cf
609 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
610 .to_owned();
611
612 db.cf_handle(&cf_key)
613 .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
614
615 Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
616 }
617
618 pub fn batch(&self) -> DBBatch {
619 let batch = WriteBatch::default();
620 DBBatch::new(
621 &self.rocksdb,
622 batch,
623 self.opts.writeopts(),
624 &self.db_metrics,
625 &self.write_sample_interval,
626 )
627 }
628
629 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
630 let from_buf = be_fix_int_ser(start)?;
631 let to_buf = be_fix_int_ser(end)?;
632 self.rocksdb
633 .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
634 Ok(())
635 }
636
637 pub fn compact_range_raw(
638 &self,
639 cf_name: &str,
640 start: Vec<u8>,
641 end: Vec<u8>,
642 ) -> Result<(), TypedStoreError> {
643 let cf = self
644 .rocksdb
645 .cf_handle(cf_name)
646 .expect("compact range: column family does not exist");
647 self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
648 Ok(())
649 }
650
651 pub fn compact_range_to_bottom<J: Serialize>(
652 &self,
653 start: &J,
654 end: &J,
655 ) -> Result<(), TypedStoreError> {
656 let from_buf = be_fix_int_ser(start)?;
657 let to_buf = be_fix_int_ser(end)?;
658 self.rocksdb
659 .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
660 Ok(())
661 }
662
663 pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
664 self.rocksdb
665 .cf_handle(&self.cf)
666 .expect("Map-keying column family should have been checked at DB creation")
667 }
668
669 pub fn flush(&self) -> Result<(), TypedStoreError> {
670 self.rocksdb
671 .flush_cf(&self.cf())
672 .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
673 }
674
675 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
676 self.rocksdb.set_options_cf(&self.cf(), opts)
677 }
678
679 fn get_int_property(
680 rocksdb: &RocksDB,
681 cf: &impl AsColumnFamilyRef,
682 property_name: &std::ffi::CStr,
683 ) -> Result<i64, TypedStoreError> {
684 match rocksdb.property_int_value_cf(cf, property_name) {
685 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
686 Ok(None) => Ok(0),
687 Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
688 }
689 }
690
691 fn multi_get_pinned<J>(
693 &self,
694 keys: impl IntoIterator<Item = J>,
695 ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
696 where
697 J: Borrow<K>,
698 K: Serialize,
699 {
700 let _timer = self
701 .db_metrics
702 .op_metrics
703 .rocksdb_multiget_latency_seconds
704 .with_label_values(&[&self.cf])
705 .start_timer();
706 let perf_ctx = if self.multiget_sample_interval.sample() {
707 Some(RocksDBPerfContext)
708 } else {
709 None
710 };
711 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
712 .into_iter()
713 .map(|k| be_fix_int_ser(k.borrow()))
714 .collect();
715 let results: Result<Vec<_>, TypedStoreError> = self
716 .rocksdb
717 .batched_multi_get_cf_opt(
718 &self.cf(),
719 keys_bytes?,
720 false,
722 &self.opts.readopts(),
723 )
724 .into_iter()
725 .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
726 .collect();
727 let entries = results?;
728 let entry_size = entries
729 .iter()
730 .flatten()
731 .map(|entry| entry.len())
732 .sum::<usize>();
733 self.db_metrics
734 .op_metrics
735 .rocksdb_multiget_bytes
736 .with_label_values(&[&self.cf])
737 .observe(entry_size as f64);
738 if perf_ctx.is_some() {
739 self.db_metrics
740 .read_perf_ctx_metrics
741 .report_metrics(&self.cf);
742 }
743 Ok(entries)
744 }
745
746 fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
747 let Some(cf) = rocksdb.cf_handle(cf_name) else {
748 tracing::warn!(
749 "unable to report metrics for cf {cf_name:?} in db {:?}",
750 rocksdb.db_name()
751 );
752 return;
753 };
754
755 db_metrics
756 .cf_metrics
757 .rocksdb_total_sst_files_size
758 .with_label_values(&[cf_name])
759 .set(
760 Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
761 .unwrap_or(METRICS_ERROR),
762 );
763 db_metrics
764 .cf_metrics
765 .rocksdb_total_blob_files_size
766 .with_label_values(&[cf_name])
767 .set(
768 Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
769 .unwrap_or(METRICS_ERROR),
770 );
771 let total_num_files: i64 = (0..=6)
775 .map(|level| {
776 Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
777 .unwrap_or(METRICS_ERROR)
778 })
779 .sum();
780 db_metrics
781 .cf_metrics
782 .rocksdb_total_num_files
783 .with_label_values(&[cf_name])
784 .set(total_num_files);
785 db_metrics
786 .cf_metrics
787 .rocksdb_num_level0_files
788 .with_label_values(&[cf_name])
789 .set(
790 Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
791 .unwrap_or(METRICS_ERROR),
792 );
793 db_metrics
794 .cf_metrics
795 .rocksdb_current_size_active_mem_tables
796 .with_label_values(&[cf_name])
797 .set(
798 Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
799 .unwrap_or(METRICS_ERROR),
800 );
801 db_metrics
802 .cf_metrics
803 .rocksdb_size_all_mem_tables
804 .with_label_values(&[cf_name])
805 .set(
806 Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
807 .unwrap_or(METRICS_ERROR),
808 );
809 db_metrics
810 .cf_metrics
811 .rocksdb_num_snapshots
812 .with_label_values(&[cf_name])
813 .set(
814 Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
815 .unwrap_or(METRICS_ERROR),
816 );
817 db_metrics
818 .cf_metrics
819 .rocksdb_oldest_snapshot_time
820 .with_label_values(&[cf_name])
821 .set(
822 Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
823 .unwrap_or(METRICS_ERROR),
824 );
825 db_metrics
826 .cf_metrics
827 .rocksdb_actual_delayed_write_rate
828 .with_label_values(&[cf_name])
829 .set(
830 Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
831 .unwrap_or(METRICS_ERROR),
832 );
833 db_metrics
834 .cf_metrics
835 .rocksdb_is_write_stopped
836 .with_label_values(&[cf_name])
837 .set(
838 Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
839 .unwrap_or(METRICS_ERROR),
840 );
841 db_metrics
842 .cf_metrics
843 .rocksdb_block_cache_capacity
844 .with_label_values(&[cf_name])
845 .set(
846 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
847 .unwrap_or(METRICS_ERROR),
848 );
849 db_metrics
850 .cf_metrics
851 .rocksdb_block_cache_usage
852 .with_label_values(&[cf_name])
853 .set(
854 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
855 .unwrap_or(METRICS_ERROR),
856 );
857 db_metrics
858 .cf_metrics
859 .rocksdb_block_cache_pinned_usage
860 .with_label_values(&[cf_name])
861 .set(
862 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
863 .unwrap_or(METRICS_ERROR),
864 );
865 db_metrics
866 .cf_metrics
867 .rocksdb_estimate_table_readers_mem
868 .with_label_values(&[cf_name])
869 .set(
870 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
871 .unwrap_or(METRICS_ERROR),
872 );
873 db_metrics
874 .cf_metrics
875 .rocksdb_estimated_num_keys
876 .with_label_values(&[cf_name])
877 .set(
878 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
879 .unwrap_or(METRICS_ERROR),
880 );
881 db_metrics
882 .cf_metrics
883 .rocksdb_num_immutable_mem_tables
884 .with_label_values(&[cf_name])
885 .set(
886 Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
887 .unwrap_or(METRICS_ERROR),
888 );
889 db_metrics
890 .cf_metrics
891 .rocksdb_mem_table_flush_pending
892 .with_label_values(&[cf_name])
893 .set(
894 Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
895 .unwrap_or(METRICS_ERROR),
896 );
897 db_metrics
898 .cf_metrics
899 .rocksdb_compaction_pending
900 .with_label_values(&[cf_name])
901 .set(
902 Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
903 .unwrap_or(METRICS_ERROR),
904 );
905 db_metrics
906 .cf_metrics
907 .rocksdb_estimate_pending_compaction_bytes
908 .with_label_values(&[cf_name])
909 .set(
910 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
911 .unwrap_or(METRICS_ERROR),
912 );
913 db_metrics
914 .cf_metrics
915 .rocksdb_num_running_compactions
916 .with_label_values(&[cf_name])
917 .set(
918 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
919 .unwrap_or(METRICS_ERROR),
920 );
921 db_metrics
922 .cf_metrics
923 .rocksdb_num_running_flushes
924 .with_label_values(&[cf_name])
925 .set(
926 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
927 .unwrap_or(METRICS_ERROR),
928 );
929 db_metrics
930 .cf_metrics
931 .rocksdb_estimate_oldest_key_time
932 .with_label_values(&[cf_name])
933 .set(
934 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
935 .unwrap_or(METRICS_ERROR),
936 );
937 db_metrics
938 .cf_metrics
939 .rocksdb_background_errors
940 .with_label_values(&[cf_name])
941 .set(
942 Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
943 .unwrap_or(METRICS_ERROR),
944 );
945 db_metrics
946 .cf_metrics
947 .rocksdb_base_level
948 .with_label_values(&[cf_name])
949 .set(
950 Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
951 .unwrap_or(METRICS_ERROR),
952 );
953 }
954
955 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
956 self.rocksdb.checkpoint(path)
957 }
958
959 pub fn table_summary(&self) -> eyre::Result<TableSummary>
960 where
961 K: Serialize + DeserializeOwned,
962 V: Serialize + DeserializeOwned,
963 {
964 let mut num_keys = 0;
965 let mut key_bytes_total = 0;
966 let mut value_bytes_total = 0;
967 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
968 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
969 for item in self.safe_iter() {
970 let (key, value) = item?;
971 num_keys += 1;
972 let key_len = be_fix_int_ser(key.borrow())?.len();
973 let value_len = bcs::to_bytes(value.borrow())?.len();
974 key_bytes_total += key_len;
975 value_bytes_total += value_len;
976 key_hist.record(key_len as u64)?;
977 value_hist.record(value_len as u64)?;
978 }
979 Ok(TableSummary {
980 num_keys,
981 key_bytes_total,
982 value_bytes_total,
983 key_hist,
984 value_hist,
985 })
986 }
987
988 fn create_iter_context(
990 &self,
991 ) -> (
992 Option<HistogramTimer>,
993 Option<Histogram>,
994 Option<Histogram>,
995 Option<RocksDBPerfContext>,
996 ) {
997 let timer = self
998 .db_metrics
999 .op_metrics
1000 .rocksdb_iter_latency_seconds
1001 .with_label_values(&[&self.cf])
1002 .start_timer();
1003 let bytes_scanned = self
1004 .db_metrics
1005 .op_metrics
1006 .rocksdb_iter_bytes
1007 .with_label_values(&[&self.cf]);
1008 let keys_scanned = self
1009 .db_metrics
1010 .op_metrics
1011 .rocksdb_iter_keys
1012 .with_label_values(&[&self.cf]);
1013 let perf_ctx = if self.iter_sample_interval.sample() {
1014 Some(RocksDBPerfContext)
1015 } else {
1016 None
1017 };
1018 (
1019 Some(timer),
1020 Some(bytes_scanned),
1021 Some(keys_scanned),
1022 perf_ctx,
1023 )
1024 }
1025
1026 fn create_read_options_with_bounds(
1029 &self,
1030 lower_bound: Option<K>,
1031 upper_bound: Option<K>,
1032 ) -> ReadOptions
1033 where
1034 K: Serialize,
1035 {
1036 let mut readopts = self.opts.readopts();
1037 if let Some(lower_bound) = lower_bound {
1038 let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1039 readopts.set_iterate_lower_bound(key_buf);
1040 }
1041 if let Some(upper_bound) = upper_bound {
1042 let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1043 readopts.set_iterate_upper_bound(key_buf);
1044 }
1045 readopts
1046 }
1047
1048 pub fn reversed_safe_iter_with_bounds(
1051 &self,
1052 lower_bound: Option<K>,
1053 upper_bound: Option<K>,
1054 ) -> Result<SafeRevIter<'_, K, V>, TypedStoreError>
1055 where
1056 K: Serialize + DeserializeOwned,
1057 V: Serialize + DeserializeOwned,
1058 {
1059 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1060 let readopts = self.create_read_options_with_range((
1061 lower_bound
1062 .as_ref()
1063 .map(Bound::Included)
1064 .unwrap_or(Bound::Unbounded),
1065 upper_bound
1066 .as_ref()
1067 .map(Bound::Included)
1068 .unwrap_or(Bound::Unbounded),
1069 ));
1070
1071 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1072 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1073 let iter = SafeIter::new(
1074 self.cf.clone(),
1075 db_iter,
1076 _timer,
1077 _perf_ctx,
1078 bytes_scanned,
1079 keys_scanned,
1080 Some(self.db_metrics.clone()),
1081 );
1082 Ok(SafeRevIter::new(iter, upper_bound_key.transpose()?))
1083 }
1084
1085 fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1088 where
1089 K: Serialize,
1090 {
1091 let mut readopts = self.opts.readopts();
1092
1093 let lower_bound = range.start_bound();
1094 let upper_bound = range.end_bound();
1095
1096 match lower_bound {
1097 Bound::Included(lower_bound) => {
1098 let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1100 readopts.set_iterate_lower_bound(key_buf);
1101 }
1102 Bound::Excluded(lower_bound) => {
1103 let mut key_buf =
1104 be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1105
1106 big_endian_saturating_add_one(&mut key_buf);
1108 readopts.set_iterate_lower_bound(key_buf);
1109 }
1110 Bound::Unbounded => (),
1111 };
1112
1113 match upper_bound {
1114 Bound::Included(upper_bound) => {
1115 let mut key_buf =
1116 be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1117
1118 if !is_max(&key_buf) {
1121 big_endian_saturating_add_one(&mut key_buf);
1123 readopts.set_iterate_upper_bound(key_buf);
1124 }
1125 }
1126 Bound::Excluded(upper_bound) => {
1127 let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1129 readopts.set_iterate_upper_bound(key_buf);
1130 }
1131 Bound::Unbounded => (),
1132 };
1133
1134 readopts
1135 }
1136}
1137
1138pub struct DBBatch {
1205 rocksdb: Arc<RocksDB>,
1206 batch: rocksdb::WriteBatch,
1207 opts: WriteOptions,
1208 db_metrics: Arc<DBMetrics>,
1209 write_sample_interval: SamplingInterval,
1210}
1211
1212impl DBBatch {
1213 pub fn new(
1217 dbref: &Arc<RocksDB>,
1218 batch: rocksdb::WriteBatch,
1219 opts: WriteOptions,
1220 db_metrics: &Arc<DBMetrics>,
1221 write_sample_interval: &SamplingInterval,
1222 ) -> Self {
1223 DBBatch {
1224 rocksdb: dbref.clone(),
1225 batch,
1226 opts,
1227 db_metrics: db_metrics.clone(),
1228 write_sample_interval: write_sample_interval.clone(),
1229 }
1230 }
1231
1232 #[instrument(level = "trace", skip_all, err)]
1234 pub fn write(self) -> Result<(), TypedStoreError> {
1235 let db_name = self.rocksdb.db_name();
1236 let timer = self
1237 .db_metrics
1238 .op_metrics
1239 .rocksdb_batch_commit_latency_seconds
1240 .with_label_values(&[&db_name])
1241 .start_timer();
1242 let batch_size = self.batch.size_in_bytes();
1243
1244 let perf_ctx = if self.write_sample_interval.sample() {
1245 Some(RocksDBPerfContext)
1246 } else {
1247 None
1248 };
1249 self.rocksdb.write(self.batch, &self.opts)?;
1250 self.db_metrics
1251 .op_metrics
1252 .rocksdb_batch_commit_bytes
1253 .with_label_values(&[&db_name])
1254 .observe(batch_size as f64);
1255
1256 if perf_ctx.is_some() {
1257 self.db_metrics
1258 .write_perf_ctx_metrics
1259 .report_metrics(&db_name);
1260 }
1261 let elapsed = timer.stop_and_record();
1262 if elapsed > 1.0 {
1263 warn!(?elapsed, ?db_name, "very slow batch write");
1264 self.db_metrics
1265 .op_metrics
1266 .rocksdb_very_slow_batch_writes_count
1267 .with_label_values(&[&db_name])
1268 .inc();
1269 self.db_metrics
1270 .op_metrics
1271 .rocksdb_very_slow_batch_writes_duration_ms
1272 .with_label_values(&[&db_name])
1273 .inc_by((elapsed * 1000.0) as u64);
1274 }
1275 Ok(())
1276 }
1277
1278 pub fn size_in_bytes(&self) -> usize {
1279 self.batch.size_in_bytes()
1280 }
1281}
1282
1283impl DBBatch {
1284 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1285 &mut self,
1286 db: &DBMap<K, V>,
1287 purged_vals: impl IntoIterator<Item = J>,
1288 ) -> Result<(), TypedStoreError> {
1289 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1290 return Err(TypedStoreError::CrossDBBatch);
1291 }
1292
1293 purged_vals
1294 .into_iter()
1295 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1296 let k_buf = be_fix_int_ser(k.borrow())?;
1297 self.batch.delete_cf(&db.cf(), k_buf);
1298
1299 Ok(())
1300 })?;
1301 Ok(())
1302 }
1303
1304 pub fn schedule_delete_range<K: Serialize, V>(
1314 &mut self,
1315 db: &DBMap<K, V>,
1316 from: &K,
1317 to: &K,
1318 ) -> Result<(), TypedStoreError> {
1319 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1320 return Err(TypedStoreError::CrossDBBatch);
1321 }
1322
1323 let from_buf = be_fix_int_ser(from)?;
1324 let to_buf = be_fix_int_ser(to)?;
1325
1326 self.batch.delete_range_cf(&db.cf(), from_buf, to_buf);
1327 Ok(())
1328 }
1329
1330 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1332 &mut self,
1333 db: &DBMap<K, V>,
1334 new_vals: impl IntoIterator<Item = (J, U)>,
1335 ) -> Result<&mut Self, TypedStoreError> {
1336 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1337 return Err(TypedStoreError::CrossDBBatch);
1338 }
1339 let mut total = 0usize;
1340 new_vals
1341 .into_iter()
1342 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1343 let k_buf = be_fix_int_ser(k.borrow())?;
1344 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1345 total += k_buf.len() + v_buf.len();
1346 self.batch.put_cf(&db.cf(), k_buf, v_buf);
1347 Ok(())
1348 })?;
1349 self.db_metrics
1350 .op_metrics
1351 .rocksdb_batch_put_bytes
1352 .with_label_values(&[&db.cf])
1353 .observe(total as f64);
1354 Ok(self)
1355 }
1356}
1357
1358pub type RocksDBRawIter<'a> =
1359 rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>;
1360
1361impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1362where
1363 K: Serialize + DeserializeOwned,
1364 V: Serialize + DeserializeOwned,
1365{
1366 type Error = TypedStoreError;
1367 type SafeIterator = SafeIter<'a, K, V>;
1368
1369 #[instrument(level = "trace", skip_all, err)]
1370 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1371 let key_buf = be_fix_int_ser(key)?;
1372 let readopts = self.opts.readopts();
1375 Ok(self
1376 .rocksdb
1377 .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1378 && self
1379 .rocksdb
1380 .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1381 .map_err(typed_store_err_from_rocks_err)?
1382 .is_some())
1383 }
1384
1385 #[instrument(level = "trace", skip_all, err)]
1386 fn multi_contains_keys<J>(
1387 &self,
1388 keys: impl IntoIterator<Item = J>,
1389 ) -> Result<Vec<bool>, Self::Error>
1390 where
1391 J: Borrow<K>,
1392 {
1393 let values = self.multi_get_pinned(keys)?;
1394 Ok(values.into_iter().map(|v| v.is_some()).collect())
1395 }
1396
1397 #[instrument(level = "trace", skip_all, err)]
1398 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1399 let _timer = self
1400 .db_metrics
1401 .op_metrics
1402 .rocksdb_get_latency_seconds
1403 .with_label_values(&[&self.cf])
1404 .start_timer();
1405 let perf_ctx = if self.get_sample_interval.sample() {
1406 Some(RocksDBPerfContext)
1407 } else {
1408 None
1409 };
1410 let key_buf = be_fix_int_ser(key)?;
1411 let res = self
1412 .rocksdb
1413 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1414 .map_err(typed_store_err_from_rocks_err)?;
1415 self.db_metrics
1416 .op_metrics
1417 .rocksdb_get_bytes
1418 .with_label_values(&[&self.cf])
1419 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1420 if perf_ctx.is_some() {
1421 self.db_metrics
1422 .read_perf_ctx_metrics
1423 .report_metrics(&self.cf);
1424 }
1425 match res {
1426 Some(data) => Ok(Some(
1427 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1428 )),
1429 None => Ok(None),
1430 }
1431 }
1432
1433 #[instrument(level = "trace", skip_all, err)]
1434 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1435 let timer = self
1436 .db_metrics
1437 .op_metrics
1438 .rocksdb_put_latency_seconds
1439 .with_label_values(&[&self.cf])
1440 .start_timer();
1441 let perf_ctx = if self.write_sample_interval.sample() {
1442 Some(RocksDBPerfContext)
1443 } else {
1444 None
1445 };
1446 let key_buf = be_fix_int_ser(key)?;
1447 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1448 self.db_metrics
1449 .op_metrics
1450 .rocksdb_put_bytes
1451 .with_label_values(&[&self.cf])
1452 .observe((key_buf.len() + value_buf.len()) as f64);
1453 if perf_ctx.is_some() {
1454 self.db_metrics
1455 .write_perf_ctx_metrics
1456 .report_metrics(&self.cf);
1457 }
1458 self.rocksdb
1459 .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
1460 .map_err(typed_store_err_from_rocks_err)?;
1461
1462 let elapsed = timer.stop_and_record();
1463 if elapsed > 1.0 {
1464 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1465 self.db_metrics
1466 .op_metrics
1467 .rocksdb_very_slow_puts_count
1468 .with_label_values(&[&self.cf])
1469 .inc();
1470 self.db_metrics
1471 .op_metrics
1472 .rocksdb_very_slow_puts_duration_ms
1473 .with_label_values(&[&self.cf])
1474 .inc_by((elapsed * 1000.0) as u64);
1475 }
1476
1477 Ok(())
1478 }
1479
1480 #[instrument(level = "trace", skip_all, err)]
1481 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1482 let _timer = self
1483 .db_metrics
1484 .op_metrics
1485 .rocksdb_delete_latency_seconds
1486 .with_label_values(&[&self.cf])
1487 .start_timer();
1488 let perf_ctx = if self.write_sample_interval.sample() {
1489 Some(RocksDBPerfContext)
1490 } else {
1491 None
1492 };
1493 let key_buf = be_fix_int_ser(key)?;
1494 self.rocksdb
1495 .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
1496 .map_err(typed_store_err_from_rocks_err)?;
1497 self.db_metrics
1498 .op_metrics
1499 .rocksdb_deletes
1500 .with_label_values(&[&self.cf])
1501 .inc();
1502 if perf_ctx.is_some() {
1503 self.db_metrics
1504 .write_perf_ctx_metrics
1505 .report_metrics(&self.cf);
1506 }
1507 Ok(())
1508 }
1509
1510 #[instrument(level = "trace", skip_all, err)]
1515 fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
1516 let _ = self.rocksdb.drop_cf(&self.cf);
1517 self.rocksdb
1518 .create_cf(self.cf.clone(), &default_db_options().options)
1519 .map_err(typed_store_err_from_rocks_err)?;
1520 Ok(())
1521 }
1522
1523 #[instrument(level = "trace", skip_all, err)]
1532 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1533 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1534 let last_key = self
1535 .reversed_safe_iter_with_bounds(None, None)?
1536 .next()
1537 .transpose()?
1538 .map(|(k, _v)| k);
1539 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1540 let mut batch = self.batch();
1541 batch.schedule_delete_range(self, &first_key, &last_key)?;
1542 batch.write()?;
1543 }
1544 Ok(())
1545 }
1546
1547 fn is_empty(&self) -> bool {
1548 self.safe_iter().next().is_none()
1549 }
1550
1551 fn safe_iter(&'a self) -> Self::SafeIterator {
1552 let db_iter = self
1553 .rocksdb
1554 .raw_iterator_cf(&self.cf(), self.opts.readopts());
1555 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1556 SafeIter::new(
1557 self.cf.clone(),
1558 db_iter,
1559 _timer,
1560 _perf_ctx,
1561 bytes_scanned,
1562 keys_scanned,
1563 Some(self.db_metrics.clone()),
1564 )
1565 }
1566
1567 fn safe_iter_with_bounds(
1568 &'a self,
1569 lower_bound: Option<K>,
1570 upper_bound: Option<K>,
1571 ) -> Self::SafeIterator {
1572 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
1573 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1574 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1575 SafeIter::new(
1576 self.cf.clone(),
1577 db_iter,
1578 _timer,
1579 _perf_ctx,
1580 bytes_scanned,
1581 keys_scanned,
1582 Some(self.db_metrics.clone()),
1583 )
1584 }
1585
1586 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
1587 let readopts = self.create_read_options_with_range(range);
1588 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1589 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1590 SafeIter::new(
1591 self.cf.clone(),
1592 db_iter,
1593 _timer,
1594 _perf_ctx,
1595 bytes_scanned,
1596 keys_scanned,
1597 Some(self.db_metrics.clone()),
1598 )
1599 }
1600
1601 #[instrument(level = "trace", skip_all, err)]
1603 fn multi_get<J>(
1604 &self,
1605 keys: impl IntoIterator<Item = J>,
1606 ) -> Result<Vec<Option<V>>, TypedStoreError>
1607 where
1608 J: Borrow<K>,
1609 {
1610 let results = self.multi_get_pinned(keys)?;
1611 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1612 .into_iter()
1613 .map(|value_byte| match value_byte {
1614 Some(data) => Ok(Some(
1615 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1616 )),
1617 None => Ok(None),
1618 })
1619 .collect();
1620
1621 values_parsed
1622 }
1623
1624 #[instrument(level = "trace", skip_all, err)]
1626 fn multi_insert<J, U>(
1627 &self,
1628 key_val_pairs: impl IntoIterator<Item = (J, U)>,
1629 ) -> Result<(), Self::Error>
1630 where
1631 J: Borrow<K>,
1632 U: Borrow<V>,
1633 {
1634 let mut batch = self.batch();
1635 batch.insert_batch(self, key_val_pairs)?;
1636 batch.write()
1637 }
1638
1639 #[instrument(level = "trace", skip_all, err)]
1641 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1642 where
1643 J: Borrow<K>,
1644 {
1645 let mut batch = self.batch();
1646 batch.delete_batch(self, keys)?;
1647 batch.write()
1648 }
1649
1650 #[instrument(level = "trace", skip_all, err)]
1652 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1653 self.rocksdb
1654 .try_catch_up_with_primary()
1655 .map_err(typed_store_err_from_rocks_err)
1656 }
1657}
1658
1659pub fn read_size_from_env(var_name: &str) -> Option<usize> {
1660 env::var(var_name)
1661 .ok()?
1662 .parse::<usize>()
1663 .tap_err(|e| {
1664 warn!(
1665 "Env var {} does not contain valid usize integer: {}",
1666 var_name, e
1667 )
1668 })
1669 .ok()
1670}
1671
1672#[derive(Clone, Debug)]
1673pub struct ReadWriteOptions {
1674 pub ignore_range_deletions: bool,
1675 sync_to_disk: bool,
1677}
1678
1679impl ReadWriteOptions {
1680 pub fn readopts(&self) -> ReadOptions {
1681 let mut readopts = ReadOptions::default();
1682 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
1683 readopts
1684 }
1685
1686 pub fn writeopts(&self) -> WriteOptions {
1687 let mut opts = WriteOptions::default();
1688 opts.set_sync(self.sync_to_disk);
1689 opts
1690 }
1691
1692 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
1693 self.ignore_range_deletions = ignore;
1694 self
1695 }
1696}
1697
1698impl Default for ReadWriteOptions {
1699 fn default() -> Self {
1700 Self {
1701 ignore_range_deletions: true,
1702 sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
1703 }
1704 }
1705}
1706#[derive(Default, Clone)]
1709pub struct DBOptions {
1710 pub options: rocksdb::Options,
1711 pub rw_options: ReadWriteOptions,
1712}
1713
1714impl DBOptions {
1715 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
1719 self.options
1721 .optimize_for_point_lookup(block_cache_size_mb as u64);
1722 self
1723 }
1724
1725 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
1728 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
1729 info!("Large value blob storage optimization is disabled via env var.");
1730 return self;
1731 }
1732
1733 self.options.set_enable_blob_files(true);
1735 self.options
1736 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
1737 self.options.set_enable_blob_gc(true);
1738 self.options.set_min_blob_size(min_blob_size);
1742
1743 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
1745 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
1746 * 1024
1747 * 1024;
1748 self.options.set_write_buffer_size(write_buffer_size);
1749 let target_file_size_base = 64 << 20;
1752 self.options
1753 .set_target_file_size_base(target_file_size_base);
1754 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
1756 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
1757 self.options
1758 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
1759
1760 self
1761 }
1762
1763 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
1765 self.options
1766 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
1767 self
1768 }
1769
1770 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
1772 self.options
1773 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
1774 self.options
1775 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
1776 self
1777 }
1778
1779 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
1781 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
1783 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
1784 * 1024
1785 * 1024;
1786 self.options.set_write_buffer_size(write_buffer_size);
1787 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
1789 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
1790 self.options
1791 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
1792 self.options
1794 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
1795
1796 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
1798 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
1799 self.options.set_level_zero_file_num_compaction_trigger(
1800 max_level_zero_file_num.try_into().unwrap(),
1801 );
1802 self.options.set_level_zero_slowdown_writes_trigger(
1803 (max_level_zero_file_num * 12).try_into().unwrap(),
1804 );
1805 self.options
1806 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
1807
1808 self.options.set_target_file_size_base(
1810 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
1811 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
1812 * 1024
1813 * 1024,
1814 );
1815
1816 self.options
1818 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
1819
1820 self
1821 }
1822
1823 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
1827 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
1829 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
1830 * 1024
1831 * 1024;
1832 self.options.set_write_buffer_size(write_buffer_size);
1833 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
1835 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
1836 self.options
1837 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
1838 self.options
1840 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
1841
1842 self.options
1844 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
1845 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
1846 compaction_options.set_max_size_amplification_percent(10000);
1847 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
1848 self.options
1849 .set_universal_compaction_options(&compaction_options);
1850
1851 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
1852 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
1853 self.options.set_level_zero_file_num_compaction_trigger(
1854 max_level_zero_file_num.try_into().unwrap(),
1855 );
1856 self.options.set_level_zero_slowdown_writes_trigger(
1857 (max_level_zero_file_num * 12).try_into().unwrap(),
1858 );
1859 self.options
1860 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
1861
1862 self.options.set_target_file_size_base(
1864 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
1865 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
1866 * 1024
1867 * 1024,
1868 );
1869
1870 self.options
1872 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
1873
1874 self
1875 }
1876
1877 pub fn set_block_options(
1879 mut self,
1880 block_cache_size_mb: usize,
1881 block_size_bytes: usize,
1882 ) -> DBOptions {
1883 self.options
1884 .set_block_based_table_factory(&get_block_options(
1885 block_cache_size_mb,
1886 block_size_bytes,
1887 ));
1888 self
1889 }
1890
1891 pub fn disable_write_throttling(mut self) -> DBOptions {
1893 self.options.set_soft_pending_compaction_bytes_limit(0);
1894 self.options.set_hard_pending_compaction_bytes_limit(0);
1895 self
1896 }
1897}
1898
1899pub fn default_db_options() -> DBOptions {
1902 let mut opt = rocksdb::Options::default();
1903
1904 if let Some(limit) = fdlimit::raise_fd_limit() {
1908 opt.set_max_open_files((limit / 8) as i32);
1910 }
1911
1912 opt.set_table_cache_num_shard_bits(10);
1915
1916 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
1918 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
1919 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
1920
1921 opt.set_db_write_buffer_size(
1933 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
1934 * 1024
1935 * 1024,
1936 );
1937 opt.set_max_total_wal_size(
1938 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
1939 );
1940
1941 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
1943
1944 opt.set_enable_pipelined_write(true);
1945
1946 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
1949
1950 opt.set_memtable_prefix_bloom_ratio(0.02);
1952
1953 DBOptions {
1954 options: opt,
1955 rw_options: ReadWriteOptions::default(),
1956 }
1957}
1958
1959fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
1960 let mut block_options = BlockBasedOptions::default();
1965 block_options.set_block_size(block_size_bytes);
1967 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
1969 block_options.set_bloom_filter(10.0, false);
1971 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
1973 block_options
1974}
1975
1976#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
1979pub fn open_cf<P: AsRef<Path>>(
1980 path: P,
1981 db_options: Option<rocksdb::Options>,
1982 metric_conf: MetricConf,
1983 opt_cfs: &[&str],
1984) -> Result<Arc<RocksDB>, TypedStoreError> {
1985 let options = db_options.unwrap_or_else(|| default_db_options().options);
1986 let column_descriptors: Vec<_> = opt_cfs
1987 .iter()
1988 .map(|name| (*name, options.clone()))
1989 .collect();
1990 open_cf_opts(
1991 path,
1992 Some(options.clone()),
1993 metric_conf,
1994 &column_descriptors[..],
1995 )
1996}
1997
1998fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
1999 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2001 options.create_if_missing(true);
2002 options.create_missing_column_families(true);
2003 options
2004}
2005
2006#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2009pub fn open_cf_opts<P: AsRef<Path>>(
2010 path: P,
2011 db_options: Option<rocksdb::Options>,
2012 metric_conf: MetricConf,
2013 opt_cfs: &[(&str, rocksdb::Options)],
2014) -> Result<Arc<RocksDB>, TypedStoreError> {
2015 let path = path.as_ref();
2016 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2026 nondeterministic!({
2027 let options = prepare_db_options(db_options);
2028 let rocksdb = {
2029 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2030 &options,
2031 path,
2032 cfs.into_iter()
2033 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2034 )
2035 .map_err(typed_store_err_from_rocks_err)?
2036 };
2037 Ok(Arc::new(RocksDB::new(
2038 rocksdb,
2039 metric_conf,
2040 PathBuf::from(path),
2041 )))
2042 })
2043}
2044
2045pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2048 primary_path: P,
2049 secondary_path: Option<P>,
2050 db_options: Option<rocksdb::Options>,
2051 metric_conf: MetricConf,
2052 opt_cfs: &[(&str, rocksdb::Options)],
2053) -> Result<Arc<RocksDB>, TypedStoreError> {
2054 let primary_path = primary_path.as_ref();
2055 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2056 nondeterministic!({
2058 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2060
2061 fdlimit::raise_fd_limit();
2062 options.set_max_open_files(-1);
2064
2065 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2066 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2067 .ok()
2068 .unwrap_or_default();
2069
2070 let default_db_options = default_db_options();
2071 for cf_key in cfs.iter() {
2073 if !opt_cfs.contains_key(&cf_key[..]) {
2074 opt_cfs.insert(cf_key, default_db_options.options.clone());
2075 }
2076 }
2077
2078 let primary_path = primary_path.to_path_buf();
2079 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2080 let mut s = primary_path.clone();
2081 s.pop();
2082 s.push("SECONDARY");
2083 s.as_path().to_path_buf()
2084 });
2085
2086 let rocksdb = {
2087 options.create_if_missing(true);
2088 options.create_missing_column_families(true);
2089 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2090 &options,
2091 &primary_path,
2092 &secondary_path,
2093 opt_cfs
2094 .iter()
2095 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2096 )
2097 .map_err(typed_store_err_from_rocks_err)?;
2098 db.try_catch_up_with_primary()
2099 .map_err(typed_store_err_from_rocks_err)?;
2100 db
2101 };
2102 Ok(Arc::new(RocksDB::new(rocksdb, metric_conf, secondary_path)))
2103 })
2104}
2105
2106pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2107 const DB_DEFAULT_CF_NAME: &str = "default";
2108
2109 let opts = rocksdb::Options::default();
2110 rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2111 .map_err(|e| e.into())
2112 .map(|q| {
2113 q.iter()
2114 .filter_map(|s| {
2115 if s != DB_DEFAULT_CF_NAME {
2117 Some(s.clone())
2118 } else {
2119 None
2120 }
2121 })
2122 .collect()
2123 })
2124}
2125
2126#[inline]
2128pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2129where
2130 S: ?Sized + serde::Serialize,
2131{
2132 bincode::DefaultOptions::new()
2133 .with_big_endian()
2134 .with_fixint_encoding()
2135 .serialize(t)
2136 .map_err(typed_store_err_from_bincode_err)
2137}
2138
2139#[derive(Clone)]
2140pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2141impl DBMapTableConfigMap {
2142 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2143 Self(map)
2144 }
2145
2146 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2147 self.0.clone()
2148 }
2149}
2150
2151pub enum RocksDBAccessType {
2152 Primary,
2153 Secondary(Option<PathBuf>),
2154}
2155
2156pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
2158 let mut backoff = backoff::ExponentialBackoff {
2159 max_elapsed_time: Some(timeout),
2160 ..Default::default()
2161 };
2162 loop {
2163 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2164 Ok(()) => return Ok(()),
2165 Err(err) => match backoff.next_backoff() {
2166 Some(duration) => tokio::time::sleep(duration).await,
2167 None => return Err(err),
2168 },
2169 }
2170 }
2171}
2172
2173fn populate_missing_cfs(
2174 input_cfs: &[(&str, rocksdb::Options)],
2175 path: &Path,
2176) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2177 let mut cfs = vec![];
2178 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2179 let existing_cfs =
2180 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2181 .ok()
2182 .unwrap_or_default();
2183
2184 for cf_name in existing_cfs {
2185 if !input_cf_index.contains(&cf_name[..]) {
2186 cfs.push((cf_name, rocksdb::Options::default()));
2187 }
2188 }
2189 cfs.extend(
2190 input_cfs
2191 .iter()
2192 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2193 );
2194 Ok(cfs)
2195}
2196
2197fn big_endian_saturating_add_one(v: &mut [u8]) {
2201 if is_max(v) {
2202 return;
2203 }
2204 for i in (0..v.len()).rev() {
2205 if v[i] == u8::MAX {
2206 v[i] = 0;
2207 } else {
2208 v[i] += 1;
2209 break;
2210 }
2211 }
2212}
2213
2214fn is_max(v: &[u8]) -> bool {
2216 v.iter().all(|&x| x == u8::MAX)
2217}
2218
2219#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2222#[test]
2223fn test_helpers() {
2224 let v = vec![];
2225 assert!(is_max(&v));
2226
2227 fn check_add(v: Vec<u8>) {
2228 let mut v = v;
2229 let num = Num32::from_big_endian(&v);
2230 big_endian_saturating_add_one(&mut v);
2231 assert!(num + 1 == Num32::from_big_endian(&v));
2232 }
2233
2234 uint::construct_uint! {
2235 struct Num32(4);
2237 }
2238
2239 let mut v = vec![255; 32];
2240 big_endian_saturating_add_one(&mut v);
2241 assert!(Num32::MAX == Num32::from_big_endian(&v));
2242
2243 check_add(vec![1; 32]);
2244 check_add(vec![6; 32]);
2245 check_add(vec![254; 32]);
2246
2247 }