1pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod safe_iter;
8
9use std::{
10 borrow::Borrow,
11 collections::{BTreeMap, HashSet},
12 env,
13 ffi::CStr,
14 marker::PhantomData,
15 ops::{Bound, RangeBounds},
16 path::{Path, PathBuf},
17 sync::Arc,
18 time::Duration,
19};
20
21use bincode::Options;
22use collectable::TryExtend;
23use iota_macros::{fail_point, nondeterministic};
24use prometheus::{Histogram, HistogramTimer};
25use rocksdb::{
26 AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
27 ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
28 IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
29 ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
30 WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
31};
32use serde::{Serialize, de::DeserializeOwned};
33use tap::TapFallible;
34use tokio::sync::oneshot;
35use tracing::{debug, error, info, instrument, warn};
36
37use self::iter::Iter;
38use crate::{
39 TypedStoreError,
40 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
41 rocks::{
42 errors::{
43 typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
44 typed_store_err_from_rocks_err,
45 },
46 safe_iter::{SafeIter, SafeRevIter},
47 },
48 traits::{Map, TableSummary},
49};
50
51const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
54const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
55
56const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
59const DEFAULT_DB_WAL_SIZE: usize = 1024;
60
61const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
64const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
65const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
66const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
67const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
68const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
69const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
70const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
71const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
72
73const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
75
76const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
77
78const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
81 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
82
83const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
84
85#[cfg(test)]
86mod tests;
87
88#[macro_export]
126macro_rules! reopen {
127 ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
128 (
129 $(
130 DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
131 ),*
132 )
133 };
134}
135
136#[macro_export]
140macro_rules! retry_transaction {
141 ($transaction:expr) => {
142 retry_transaction!($transaction, Some(20))
143 };
144
145 (
146 $transaction:expr,
147 $max_retries:expr $(,)?
149
150 ) => {{
151 use std::time::Duration;
152
153 use rand::{
154 distributions::{Distribution, Uniform},
155 rngs::ThreadRng,
156 };
157 use tracing::{error, info};
158
159 let mut retries = 0;
160 let max_retries = $max_retries;
161 loop {
162 let status = $transaction;
163 match status {
164 Err(TypedStoreError::RetryableTransaction) => {
165 retries += 1;
166 let delay = {
168 let mut rng = ThreadRng::default();
169 Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
170 };
171 if let Some(max_retries) = max_retries {
172 if retries > max_retries {
173 error!(?max_retries, "max retries exceeded");
174 break status;
175 }
176 }
177 if retries > 10 {
178 error!(?delay, ?retries, "excessive transaction retries...");
180 } else {
181 info!(
182 ?delay,
183 ?retries,
184 "transaction write conflict detected, sleeping"
185 );
186 }
187 std::thread::sleep(delay);
188 }
189 _ => break status,
190 }
191 }
192 }};
193}
194
195#[macro_export]
196macro_rules! retry_transaction_forever {
197 ($transaction:expr) => {
198 $crate::retry_transaction!($transaction, None)
199 };
200}
201
202#[derive(Debug)]
203pub struct DBWithThreadModeWrapper {
204 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
205 pub metric_conf: MetricConf,
206 pub db_path: PathBuf,
207}
208
209impl DBWithThreadModeWrapper {
210 fn new(
211 underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
212 metric_conf: MetricConf,
213 db_path: PathBuf,
214 ) -> Self {
215 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
216 Self {
217 underlying,
218 metric_conf,
219 db_path,
220 }
221 }
222}
223
224impl Drop for DBWithThreadModeWrapper {
225 fn drop(&mut self) {
226 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
227 }
228}
229
230#[derive(Debug)]
231pub struct OptimisticTransactionDBWrapper {
232 pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
233 pub metric_conf: MetricConf,
234 pub db_path: PathBuf,
235}
236
237impl OptimisticTransactionDBWrapper {
238 fn new(
239 underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
240 metric_conf: MetricConf,
241 db_path: PathBuf,
242 ) -> Self {
243 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
244 Self {
245 underlying,
246 metric_conf,
247 db_path,
248 }
249 }
250}
251
252impl Drop for OptimisticTransactionDBWrapper {
253 fn drop(&mut self) {
254 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
255 }
256}
257
258#[derive(Debug)]
260pub enum RocksDB {
261 DBWithThreadMode(DBWithThreadModeWrapper),
262 OptimisticTransactionDB(OptimisticTransactionDBWrapper),
263}
264
265macro_rules! delegate_call {
266 ($self:ident.$method:ident($($args:ident),*)) => {
267 match $self {
268 Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
269 Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
270 }
271 }
272}
273
274impl Drop for RocksDB {
275 fn drop(&mut self) {
276 delegate_call!(self.cancel_all_background_work(true))
277 }
278}
279
280impl RocksDB {
281 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
282 delegate_call!(self.get(key))
283 }
284
285 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
286 &'a self,
287 keys: I,
288 readopts: &ReadOptions,
289 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
290 where
291 K: AsRef<[u8]>,
292 I: IntoIterator<Item = (&'b W, K)>,
293 W: 'b + AsColumnFamilyRef,
294 {
295 delegate_call!(self.multi_get_cf_opt(keys, readopts))
296 }
297
298 pub fn batched_multi_get_cf_opt<I, K>(
299 &self,
300 cf: &impl AsColumnFamilyRef,
301 keys: I,
302 sorted_input: bool,
303 readopts: &ReadOptions,
304 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
305 where
306 I: IntoIterator<Item = K>,
307 K: AsRef<[u8]>,
308 {
309 delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
310 }
311
312 pub fn property_int_value_cf(
313 &self,
314 cf: &impl AsColumnFamilyRef,
315 name: impl CStrLike,
316 ) -> Result<Option<u64>, rocksdb::Error> {
317 delegate_call!(self.property_int_value_cf(cf, name))
318 }
319
320 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
321 &self,
322 cf: &impl AsColumnFamilyRef,
323 key: K,
324 readopts: &ReadOptions,
325 ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
326 delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
327 }
328
329 pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
330 delegate_call!(self.cf_handle(name))
331 }
332
333 pub fn create_cf<N: AsRef<str>>(
334 &self,
335 name: N,
336 opts: &rocksdb::Options,
337 ) -> Result<(), rocksdb::Error> {
338 delegate_call!(self.create_cf(name, opts))
339 }
340
341 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
342 delegate_call!(self.drop_cf(name))
343 }
344
345 pub fn delete_cf<K: AsRef<[u8]>>(
346 &self,
347 cf: &impl AsColumnFamilyRef,
348 key: K,
349 writeopts: &WriteOptions,
350 ) -> Result<(), rocksdb::Error> {
351 fail_point!("delete-cf-before");
352 let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
353 fail_point!("delete-cf-after");
354 #[expect(clippy::let_and_return)]
355 ret
356 }
357
358 pub fn path(&self) -> &Path {
359 delegate_call!(self.path())
360 }
361
362 pub fn put_cf<K, V>(
363 &self,
364 cf: &impl AsColumnFamilyRef,
365 key: K,
366 value: V,
367 writeopts: &WriteOptions,
368 ) -> Result<(), rocksdb::Error>
369 where
370 K: AsRef<[u8]>,
371 V: AsRef<[u8]>,
372 {
373 fail_point!("put-cf-before");
374 let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
375 fail_point!("put-cf-after");
376 #[expect(clippy::let_and_return)]
377 ret
378 }
379
380 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
381 &self,
382 cf: &impl AsColumnFamilyRef,
383 key: K,
384 readopts: &ReadOptions,
385 ) -> bool {
386 delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
387 }
388
389 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
390 delegate_call!(self.try_catch_up_with_primary())
391 }
392
393 pub fn write(
394 &self,
395 batch: RocksDBBatch,
396 writeopts: &WriteOptions,
397 ) -> Result<(), TypedStoreError> {
398 fail_point!("batch-write-before");
399 let ret = match (self, batch) {
400 (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
401 db.underlying
402 .write_opt(batch, writeopts)
403 .map_err(typed_store_err_from_rocks_err)?;
404 Ok(())
405 }
406 (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
407 db.underlying
408 .write_opt(batch, writeopts)
409 .map_err(typed_store_err_from_rocks_err)?;
410 Ok(())
411 }
412 _ => Err(TypedStoreError::RocksDB(
413 "using invalid batch type for the database".to_string(),
414 )),
415 };
416 fail_point!("batch-write-after");
417 #[expect(clippy::let_and_return)]
418 ret
419 }
420
421 pub fn transaction_without_snapshot(
422 &self,
423 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
424 match self {
425 Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
426 Self::DBWithThreadMode(_) => panic!(),
427 }
428 }
429
430 pub fn transaction(
431 &self,
432 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
433 match self {
434 Self::OptimisticTransactionDB(db) => {
435 let mut tx_opts = OptimisticTransactionOptions::new();
436 tx_opts.set_snapshot(true);
437
438 Ok(db
439 .underlying
440 .transaction_opt(&WriteOptions::default(), &tx_opts))
441 }
442 Self::DBWithThreadMode(_) => panic!(),
443 }
444 }
445
446 pub fn raw_iterator_cf<'a: 'b, 'b>(
447 &'a self,
448 cf_handle: &impl AsColumnFamilyRef,
449 readopts: ReadOptions,
450 ) -> RocksDBRawIter<'b> {
451 match self {
452 Self::DBWithThreadMode(db) => {
453 RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
454 }
455 Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
456 db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
457 ),
458 }
459 }
460
461 pub fn iterator_cf<'a: 'b, 'b>(
462 &'a self,
463 cf_handle: &impl AsColumnFamilyRef,
464 readopts: ReadOptions,
465 mode: IteratorMode<'_>,
466 ) -> RocksDBIter<'b> {
467 match self {
468 Self::DBWithThreadMode(db) => {
469 RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
470 }
471 Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
472 db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
473 ),
474 }
475 }
476
477 pub fn compact_range_cf<K: AsRef<[u8]>>(
478 &self,
479 cf: &impl AsColumnFamilyRef,
480 start: Option<K>,
481 end: Option<K>,
482 ) {
483 delegate_call!(self.compact_range_cf(cf, start, end))
484 }
485
486 pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
487 &self,
488 cf: &impl AsColumnFamilyRef,
489 start: Option<K>,
490 end: Option<K>,
491 ) {
492 let opt = &mut CompactOptions::default();
493 opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
494 delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
495 }
496
497 pub fn flush(&self) -> Result<(), TypedStoreError> {
498 delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
499 }
500
501 pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
502 match self {
503 Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
504 Self::OptimisticTransactionDB(d) => {
505 RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
506 }
507 }
508 }
509
510 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
511 let checkpoint = match self {
512 Self::DBWithThreadMode(d) => {
513 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
514 }
515 Self::OptimisticTransactionDB(d) => {
516 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
517 }
518 };
519 checkpoint
520 .create_checkpoint(path)
521 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
522 Ok(())
523 }
524
525 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
526 delegate_call!(self.flush_cf(cf))
527 }
528
529 pub fn set_options_cf(
530 &self,
531 cf: &impl AsColumnFamilyRef,
532 opts: &[(&str, &str)],
533 ) -> Result<(), rocksdb::Error> {
534 delegate_call!(self.set_options_cf(cf, opts))
535 }
536
537 pub fn get_sampling_interval(&self) -> SamplingInterval {
538 match self {
539 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
540 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
541 }
542 }
543
544 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
545 match self {
546 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
547 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
548 }
549 }
550
551 pub fn write_sampling_interval(&self) -> SamplingInterval {
552 match self {
553 Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
554 Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
555 }
556 }
557
558 pub fn iter_sampling_interval(&self) -> SamplingInterval {
559 match self {
560 Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
561 Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
562 }
563 }
564
565 pub fn db_name(&self) -> String {
566 let name = match self {
567 Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
568 Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
569 };
570 if name.is_empty() {
571 self.default_db_name()
572 } else {
573 name.clone()
574 }
575 }
576
577 fn default_db_name(&self) -> String {
578 self.path()
579 .file_name()
580 .and_then(|f| f.to_str())
581 .unwrap_or("unknown")
582 .to_string()
583 }
584
585 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
586 delegate_call!(self.live_files())
587 }
588}
589
590pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
593 let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
594
595 db.get(DB_CORRUPTED_KEY)
596 .map_err(|e| format!("Failed to open database: {e}"))
597 .and_then(|value| match value {
598 Some(v) if v[0] == 1 => Err(
599 "Database is corrupted, please remove the current database and start clean!"
600 .to_string(),
601 ),
602 Some(_) => Ok(()),
603 None => db
604 .put(DB_CORRUPTED_KEY, [1])
605 .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
606 })?;
607
608 Ok(())
609}
610
611pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
612 rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
613}
614
615pub enum RocksDBSnapshot<'a> {
616 DBWithThreadMode(rocksdb::Snapshot<'a>),
617 OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
618}
619
620impl<'a> RocksDBSnapshot<'a> {
621 pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
622 &'a self,
623 keys: I,
624 readopts: ReadOptions,
625 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
626 where
627 K: AsRef<[u8]>,
628 I: IntoIterator<Item = (&'b W, K)>,
629 W: 'b + AsColumnFamilyRef,
630 {
631 match self {
632 Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
633 Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
634 }
635 }
636 pub fn multi_get_cf<'b: 'a, K, I, W>(
637 &'a self,
638 keys: I,
639 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
640 where
641 K: AsRef<[u8]>,
642 I: IntoIterator<Item = (&'b W, K)>,
643 W: 'b + AsColumnFamilyRef,
644 {
645 match self {
646 Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
647 Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
648 }
649 }
650}
651
652pub enum RocksDBBatch {
653 Regular(rocksdb::WriteBatch),
654 Transactional(rocksdb::WriteBatchWithTransaction<true>),
655}
656
657macro_rules! delegate_batch_call {
658 ($self:ident.$method:ident($($args:ident),*)) => {
659 match $self {
660 Self::Regular(b) => b.$method($($args),*),
661 Self::Transactional(b) => b.$method($($args),*),
662 }
663 }
664}
665
666impl RocksDBBatch {
667 fn size_in_bytes(&self) -> usize {
668 delegate_batch_call!(self.size_in_bytes())
669 }
670
671 pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
672 delegate_batch_call!(self.delete_cf(cf, key))
673 }
674
675 pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
676 where
677 K: AsRef<[u8]>,
678 V: AsRef<[u8]>,
679 {
680 delegate_batch_call!(self.put_cf(cf, key, value))
681 }
682
683 pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
684 where
685 K: AsRef<[u8]>,
686 V: AsRef<[u8]>,
687 {
688 delegate_batch_call!(self.merge_cf(cf, key, value))
689 }
690
691 pub fn delete_range_cf<K: AsRef<[u8]>>(
692 &mut self,
693 cf: &impl AsColumnFamilyRef,
694 from: K,
695 to: K,
696 ) -> Result<(), TypedStoreError> {
697 match self {
698 Self::Regular(batch) => {
699 batch.delete_range_cf(cf, from, to);
700 Ok(())
701 }
702 Self::Transactional(_) => panic!(),
703 }
704 }
705}
706
707#[derive(Debug, Default)]
708pub struct MetricConf {
709 pub db_name: String,
710 pub read_sample_interval: SamplingInterval,
711 pub write_sample_interval: SamplingInterval,
712 pub iter_sample_interval: SamplingInterval,
713}
714
715impl MetricConf {
716 pub fn new(db_name: &str) -> Self {
717 if db_name.is_empty() {
718 error!("A meaningful db name should be used for metrics reporting.")
719 }
720 Self {
721 db_name: db_name.to_string(),
722 read_sample_interval: SamplingInterval::default(),
723 write_sample_interval: SamplingInterval::default(),
724 iter_sample_interval: SamplingInterval::default(),
725 }
726 }
727
728 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
729 Self {
730 db_name: self.db_name,
731 read_sample_interval: read_interval,
732 write_sample_interval: SamplingInterval::default(),
733 iter_sample_interval: SamplingInterval::default(),
734 }
735 }
736}
737const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
738const METRICS_ERROR: i64 = -1;
739
740#[derive(Clone, Debug)]
742pub struct DBMap<K, V> {
743 pub rocksdb: Arc<RocksDB>,
744 _phantom: PhantomData<fn(K) -> V>,
745 cf: String,
747 pub opts: ReadWriteOptions,
748 db_metrics: Arc<DBMetrics>,
749 get_sample_interval: SamplingInterval,
750 multiget_sample_interval: SamplingInterval,
751 write_sample_interval: SamplingInterval,
752 iter_sample_interval: SamplingInterval,
753 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
754}
755
756unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
757
758impl<K, V> DBMap<K, V> {
759 pub(crate) fn new(
760 db: Arc<RocksDB>,
761 opts: &ReadWriteOptions,
762 opt_cf: &str,
763 is_deprecated: bool,
764 ) -> Self {
765 let db_cloned = db.clone();
766 let db_metrics = DBMetrics::get();
767 let db_metrics_cloned = db_metrics.clone();
768 let cf = opt_cf.to_string();
769 let (sender, mut recv) = tokio::sync::oneshot::channel();
770 if !is_deprecated {
771 tokio::task::spawn(async move {
772 let mut interval =
773 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
774 loop {
775 tokio::select! {
776 _ = interval.tick() => {
777 let db = db_cloned.clone();
778 let cf = cf.clone();
779 let db_metrics = db_metrics.clone();
780 if let Err(e) = tokio::task::spawn_blocking(move || {
781 Self::report_metrics(&db, &cf, &db_metrics);
782 }).await {
783 error!("Failed to log metrics with error: {}", e);
784 }
785 }
786 _ = &mut recv => break,
787 }
788 }
789 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
790 });
791 }
792 DBMap {
793 rocksdb: db.clone(),
794 opts: opts.clone(),
795 _phantom: PhantomData,
796 cf: opt_cf.to_string(),
797 db_metrics: db_metrics_cloned,
798 _metrics_task_cancel_handle: Arc::new(sender),
799 get_sample_interval: db.get_sampling_interval(),
800 multiget_sample_interval: db.multiget_sampling_interval(),
801 write_sample_interval: db.write_sampling_interval(),
802 iter_sample_interval: db.iter_sampling_interval(),
803 }
804 }
805
806 #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
813 pub fn open<P: AsRef<Path>>(
814 path: P,
815 metric_conf: MetricConf,
816 db_options: Option<rocksdb::Options>,
817 opt_cf: Option<&str>,
818 rw_options: &ReadWriteOptions,
819 ) -> Result<Self, TypedStoreError> {
820 let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
821 let cfs = vec![cf_key];
822 let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
823 Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
824 }
825
826 #[instrument(level = "debug", skip(db), err)]
866 pub fn reopen(
867 db: &Arc<RocksDB>,
868 opt_cf: Option<&str>,
869 rw_options: &ReadWriteOptions,
870 is_deprecated: bool,
871 ) -> Result<Self, TypedStoreError> {
872 let cf_key = opt_cf
873 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
874 .to_owned();
875
876 db.cf_handle(&cf_key)
877 .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
878
879 Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
880 }
881
882 pub fn batch(&self) -> DBBatch {
883 let batch = match *self.rocksdb {
884 RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
885 RocksDB::OptimisticTransactionDB(_) => {
886 RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
887 }
888 };
889 DBBatch::new(
890 &self.rocksdb,
891 batch,
892 self.opts.writeopts(),
893 &self.db_metrics,
894 &self.write_sample_interval,
895 )
896 }
897
898 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
899 let from_buf = be_fix_int_ser(start)?;
900 let to_buf = be_fix_int_ser(end)?;
901 self.rocksdb
902 .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
903 Ok(())
904 }
905
906 pub fn compact_range_raw(
907 &self,
908 cf_name: &str,
909 start: Vec<u8>,
910 end: Vec<u8>,
911 ) -> Result<(), TypedStoreError> {
912 let cf = self
913 .rocksdb
914 .cf_handle(cf_name)
915 .expect("compact range: column family does not exist");
916 self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
917 Ok(())
918 }
919
920 pub fn compact_range_to_bottom<J: Serialize>(
921 &self,
922 start: &J,
923 end: &J,
924 ) -> Result<(), TypedStoreError> {
925 let from_buf = be_fix_int_ser(start)?;
926 let to_buf = be_fix_int_ser(end)?;
927 self.rocksdb
928 .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
929 Ok(())
930 }
931
932 pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
933 self.rocksdb
934 .cf_handle(&self.cf)
935 .expect("Map-keying column family should have been checked at DB creation")
936 }
937
938 pub fn iterator_cf(&self) -> RocksDBIter<'_> {
939 self.rocksdb
940 .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
941 }
942
943 pub fn flush(&self) -> Result<(), TypedStoreError> {
944 self.rocksdb
945 .flush_cf(&self.cf())
946 .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
947 }
948
949 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
950 self.rocksdb.set_options_cf(&self.cf(), opts)
951 }
952
953 fn get_int_property(
954 rocksdb: &RocksDB,
955 cf: &impl AsColumnFamilyRef,
956 property_name: &std::ffi::CStr,
957 ) -> Result<i64, TypedStoreError> {
958 match rocksdb.property_int_value_cf(cf, property_name) {
959 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
960 Ok(None) => Ok(0),
961 Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
962 }
963 }
964
965 fn multi_get_pinned<J>(
967 &self,
968 keys: impl IntoIterator<Item = J>,
969 ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
970 where
971 J: Borrow<K>,
972 K: Serialize,
973 {
974 let _timer = self
975 .db_metrics
976 .op_metrics
977 .rocksdb_multiget_latency_seconds
978 .with_label_values(&[&self.cf])
979 .start_timer();
980 let perf_ctx = if self.multiget_sample_interval.sample() {
981 Some(RocksDBPerfContext)
982 } else {
983 None
984 };
985 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
986 .into_iter()
987 .map(|k| be_fix_int_ser(k.borrow()))
988 .collect();
989 let results: Result<Vec<_>, TypedStoreError> = self
990 .rocksdb
991 .batched_multi_get_cf_opt(
992 &self.cf(),
993 keys_bytes?,
994 false,
996 &self.opts.readopts(),
997 )
998 .into_iter()
999 .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
1000 .collect();
1001 let entries = results?;
1002 let entry_size = entries
1003 .iter()
1004 .flatten()
1005 .map(|entry| entry.len())
1006 .sum::<usize>();
1007 self.db_metrics
1008 .op_metrics
1009 .rocksdb_multiget_bytes
1010 .with_label_values(&[&self.cf])
1011 .observe(entry_size as f64);
1012 if perf_ctx.is_some() {
1013 self.db_metrics
1014 .read_perf_ctx_metrics
1015 .report_metrics(&self.cf);
1016 }
1017 Ok(entries)
1018 }
1019
1020 fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
1021 let Some(cf) = rocksdb.cf_handle(cf_name) else {
1022 tracing::warn!(
1023 "unable to report metrics for cf {cf_name:?} in db {:?}",
1024 rocksdb.db_name()
1025 );
1026 return;
1027 };
1028
1029 db_metrics
1030 .cf_metrics
1031 .rocksdb_total_sst_files_size
1032 .with_label_values(&[cf_name])
1033 .set(
1034 Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1035 .unwrap_or(METRICS_ERROR),
1036 );
1037 db_metrics
1038 .cf_metrics
1039 .rocksdb_total_blob_files_size
1040 .with_label_values(&[cf_name])
1041 .set(
1042 Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1043 .unwrap_or(METRICS_ERROR),
1044 );
1045 let total_num_files: i64 = (0..=6)
1049 .map(|level| {
1050 Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
1051 .unwrap_or(METRICS_ERROR)
1052 })
1053 .sum();
1054 db_metrics
1055 .cf_metrics
1056 .rocksdb_total_num_files
1057 .with_label_values(&[cf_name])
1058 .set(total_num_files);
1059 db_metrics
1060 .cf_metrics
1061 .rocksdb_num_level0_files
1062 .with_label_values(&[cf_name])
1063 .set(
1064 Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
1065 .unwrap_or(METRICS_ERROR),
1066 );
1067 db_metrics
1068 .cf_metrics
1069 .rocksdb_current_size_active_mem_tables
1070 .with_label_values(&[cf_name])
1071 .set(
1072 Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1073 .unwrap_or(METRICS_ERROR),
1074 );
1075 db_metrics
1076 .cf_metrics
1077 .rocksdb_size_all_mem_tables
1078 .with_label_values(&[cf_name])
1079 .set(
1080 Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1081 .unwrap_or(METRICS_ERROR),
1082 );
1083 db_metrics
1084 .cf_metrics
1085 .rocksdb_num_snapshots
1086 .with_label_values(&[cf_name])
1087 .set(
1088 Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1089 .unwrap_or(METRICS_ERROR),
1090 );
1091 db_metrics
1092 .cf_metrics
1093 .rocksdb_oldest_snapshot_time
1094 .with_label_values(&[cf_name])
1095 .set(
1096 Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1097 .unwrap_or(METRICS_ERROR),
1098 );
1099 db_metrics
1100 .cf_metrics
1101 .rocksdb_actual_delayed_write_rate
1102 .with_label_values(&[cf_name])
1103 .set(
1104 Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1105 .unwrap_or(METRICS_ERROR),
1106 );
1107 db_metrics
1108 .cf_metrics
1109 .rocksdb_is_write_stopped
1110 .with_label_values(&[cf_name])
1111 .set(
1112 Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1113 .unwrap_or(METRICS_ERROR),
1114 );
1115 db_metrics
1116 .cf_metrics
1117 .rocksdb_block_cache_capacity
1118 .with_label_values(&[cf_name])
1119 .set(
1120 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1121 .unwrap_or(METRICS_ERROR),
1122 );
1123 db_metrics
1124 .cf_metrics
1125 .rocksdb_block_cache_usage
1126 .with_label_values(&[cf_name])
1127 .set(
1128 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1129 .unwrap_or(METRICS_ERROR),
1130 );
1131 db_metrics
1132 .cf_metrics
1133 .rocksdb_block_cache_pinned_usage
1134 .with_label_values(&[cf_name])
1135 .set(
1136 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1137 .unwrap_or(METRICS_ERROR),
1138 );
1139 db_metrics
1140 .cf_metrics
1141 .rocksdb_estimate_table_readers_mem
1142 .with_label_values(&[cf_name])
1143 .set(
1144 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1145 .unwrap_or(METRICS_ERROR),
1146 );
1147 db_metrics
1148 .cf_metrics
1149 .rocksdb_estimated_num_keys
1150 .with_label_values(&[cf_name])
1151 .set(
1152 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1153 .unwrap_or(METRICS_ERROR),
1154 );
1155 db_metrics
1156 .cf_metrics
1157 .rocksdb_num_immutable_mem_tables
1158 .with_label_values(&[cf_name])
1159 .set(
1160 Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1161 .unwrap_or(METRICS_ERROR),
1162 );
1163 db_metrics
1164 .cf_metrics
1165 .rocksdb_mem_table_flush_pending
1166 .with_label_values(&[cf_name])
1167 .set(
1168 Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1169 .unwrap_or(METRICS_ERROR),
1170 );
1171 db_metrics
1172 .cf_metrics
1173 .rocksdb_compaction_pending
1174 .with_label_values(&[cf_name])
1175 .set(
1176 Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1177 .unwrap_or(METRICS_ERROR),
1178 );
1179 db_metrics
1180 .cf_metrics
1181 .rocksdb_estimate_pending_compaction_bytes
1182 .with_label_values(&[cf_name])
1183 .set(
1184 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1185 .unwrap_or(METRICS_ERROR),
1186 );
1187 db_metrics
1188 .cf_metrics
1189 .rocksdb_num_running_compactions
1190 .with_label_values(&[cf_name])
1191 .set(
1192 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1193 .unwrap_or(METRICS_ERROR),
1194 );
1195 db_metrics
1196 .cf_metrics
1197 .rocksdb_num_running_flushes
1198 .with_label_values(&[cf_name])
1199 .set(
1200 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1201 .unwrap_or(METRICS_ERROR),
1202 );
1203 db_metrics
1204 .cf_metrics
1205 .rocksdb_estimate_oldest_key_time
1206 .with_label_values(&[cf_name])
1207 .set(
1208 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1209 .unwrap_or(METRICS_ERROR),
1210 );
1211 db_metrics
1212 .cf_metrics
1213 .rocksdb_background_errors
1214 .with_label_values(&[cf_name])
1215 .set(
1216 Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1217 .unwrap_or(METRICS_ERROR),
1218 );
1219 db_metrics
1220 .cf_metrics
1221 .rocksdb_base_level
1222 .with_label_values(&[cf_name])
1223 .set(
1224 Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1225 .unwrap_or(METRICS_ERROR),
1226 );
1227 }
1228
1229 pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1230 DBTransaction::new(&self.rocksdb)
1231 }
1232
1233 pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1234 DBTransaction::new_without_snapshot(&self.rocksdb)
1235 }
1236
1237 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1238 self.rocksdb.checkpoint(path)
1239 }
1240
1241 pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1242 Ok(self.rocksdb.snapshot())
1243 }
1244
1245 pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1246 let mut num_keys = 0;
1247 let mut key_bytes_total = 0;
1248 let mut value_bytes_total = 0;
1249 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1250 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1251 let iter = self.iterator_cf().map(Result::unwrap);
1252 for (key, value) in iter {
1253 num_keys += 1;
1254 key_bytes_total += key.len();
1255 value_bytes_total += value.len();
1256 key_hist.record(key.len() as u64)?;
1257 value_hist.record(value.len() as u64)?;
1258 }
1259 Ok(TableSummary {
1260 num_keys,
1261 key_bytes_total,
1262 value_bytes_total,
1263 key_hist,
1264 value_hist,
1265 })
1266 }
1267
1268 fn create_iter_context(
1270 &self,
1271 ) -> (
1272 Option<HistogramTimer>,
1273 Option<Histogram>,
1274 Option<Histogram>,
1275 Option<RocksDBPerfContext>,
1276 ) {
1277 let timer = self
1278 .db_metrics
1279 .op_metrics
1280 .rocksdb_iter_latency_seconds
1281 .with_label_values(&[&self.cf])
1282 .start_timer();
1283 let bytes_scanned = self
1284 .db_metrics
1285 .op_metrics
1286 .rocksdb_iter_bytes
1287 .with_label_values(&[&self.cf]);
1288 let keys_scanned = self
1289 .db_metrics
1290 .op_metrics
1291 .rocksdb_iter_keys
1292 .with_label_values(&[&self.cf]);
1293 let perf_ctx = if self.iter_sample_interval.sample() {
1294 Some(RocksDBPerfContext)
1295 } else {
1296 None
1297 };
1298 (
1299 Some(timer),
1300 Some(bytes_scanned),
1301 Some(keys_scanned),
1302 perf_ctx,
1303 )
1304 }
1305
1306 fn create_read_options_with_bounds(
1309 &self,
1310 lower_bound: Option<K>,
1311 upper_bound: Option<K>,
1312 ) -> ReadOptions
1313 where
1314 K: Serialize,
1315 {
1316 let mut readopts = self.opts.readopts();
1317 if let Some(lower_bound) = lower_bound {
1318 let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1319 readopts.set_iterate_lower_bound(key_buf);
1320 }
1321 if let Some(upper_bound) = upper_bound {
1322 let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1323 readopts.set_iterate_upper_bound(key_buf);
1324 }
1325 readopts
1326 }
1327
1328 pub fn reversed_safe_iter_with_bounds(
1331 &self,
1332 lower_bound: Option<K>,
1333 upper_bound: Option<K>,
1334 ) -> Result<SafeRevIter<'_, K, V>, TypedStoreError>
1335 where
1336 K: Serialize + DeserializeOwned,
1337 V: Serialize + DeserializeOwned,
1338 {
1339 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1340 let readopts = self.create_read_options_with_range((
1341 lower_bound
1342 .as_ref()
1343 .map(Bound::Included)
1344 .unwrap_or(Bound::Unbounded),
1345 upper_bound
1346 .as_ref()
1347 .map(Bound::Included)
1348 .unwrap_or(Bound::Unbounded),
1349 ));
1350
1351 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1352 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1353 let iter = SafeIter::new(
1354 self.cf.clone(),
1355 db_iter,
1356 _timer,
1357 _perf_ctx,
1358 bytes_scanned,
1359 keys_scanned,
1360 Some(self.db_metrics.clone()),
1361 );
1362 Ok(SafeRevIter::new(iter, upper_bound_key.transpose()?))
1363 }
1364
1365 fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1368 where
1369 K: Serialize,
1370 {
1371 let mut readopts = self.opts.readopts();
1372
1373 let lower_bound = range.start_bound();
1374 let upper_bound = range.end_bound();
1375
1376 match lower_bound {
1377 Bound::Included(lower_bound) => {
1378 let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1380 readopts.set_iterate_lower_bound(key_buf);
1381 }
1382 Bound::Excluded(lower_bound) => {
1383 let mut key_buf =
1384 be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1385
1386 big_endian_saturating_add_one(&mut key_buf);
1388 readopts.set_iterate_lower_bound(key_buf);
1389 }
1390 Bound::Unbounded => (),
1391 };
1392
1393 match upper_bound {
1394 Bound::Included(upper_bound) => {
1395 let mut key_buf =
1396 be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1397
1398 if !is_max(&key_buf) {
1401 big_endian_saturating_add_one(&mut key_buf);
1403 readopts.set_iterate_upper_bound(key_buf);
1404 }
1405 }
1406 Bound::Excluded(upper_bound) => {
1407 let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1409 readopts.set_iterate_upper_bound(key_buf);
1410 }
1411 Bound::Unbounded => (),
1412 };
1413
1414 readopts
1415 }
1416}
1417
1418pub struct DBBatch {
1485 rocksdb: Arc<RocksDB>,
1486 batch: RocksDBBatch,
1487 opts: WriteOptions,
1488 db_metrics: Arc<DBMetrics>,
1489 write_sample_interval: SamplingInterval,
1490}
1491
1492impl DBBatch {
1493 pub fn new(
1497 dbref: &Arc<RocksDB>,
1498 batch: RocksDBBatch,
1499 opts: WriteOptions,
1500 db_metrics: &Arc<DBMetrics>,
1501 write_sample_interval: &SamplingInterval,
1502 ) -> Self {
1503 DBBatch {
1504 rocksdb: dbref.clone(),
1505 batch,
1506 opts,
1507 db_metrics: db_metrics.clone(),
1508 write_sample_interval: write_sample_interval.clone(),
1509 }
1510 }
1511
1512 #[instrument(level = "trace", skip_all, err)]
1514 pub fn write(self) -> Result<(), TypedStoreError> {
1515 let db_name = self.rocksdb.db_name();
1516 let timer = self
1517 .db_metrics
1518 .op_metrics
1519 .rocksdb_batch_commit_latency_seconds
1520 .with_label_values(&[&db_name])
1521 .start_timer();
1522 let batch_size = self.batch.size_in_bytes();
1523
1524 let perf_ctx = if self.write_sample_interval.sample() {
1525 Some(RocksDBPerfContext)
1526 } else {
1527 None
1528 };
1529 self.rocksdb.write(self.batch, &self.opts)?;
1530 self.db_metrics
1531 .op_metrics
1532 .rocksdb_batch_commit_bytes
1533 .with_label_values(&[&db_name])
1534 .observe(batch_size as f64);
1535
1536 if perf_ctx.is_some() {
1537 self.db_metrics
1538 .write_perf_ctx_metrics
1539 .report_metrics(&db_name);
1540 }
1541 let elapsed = timer.stop_and_record();
1542 if elapsed > 1.0 {
1543 warn!(?elapsed, ?db_name, "very slow batch write");
1544 self.db_metrics
1545 .op_metrics
1546 .rocksdb_very_slow_batch_writes_count
1547 .with_label_values(&[&db_name])
1548 .inc();
1549 self.db_metrics
1550 .op_metrics
1551 .rocksdb_very_slow_batch_writes_duration_ms
1552 .with_label_values(&[&db_name])
1553 .inc_by((elapsed * 1000.0) as u64);
1554 }
1555 Ok(())
1556 }
1557
1558 pub fn size_in_bytes(&self) -> usize {
1559 self.batch.size_in_bytes()
1560 }
1561}
1562
1563impl DBBatch {
1564 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1565 &mut self,
1566 db: &DBMap<K, V>,
1567 purged_vals: impl IntoIterator<Item = J>,
1568 ) -> Result<(), TypedStoreError> {
1569 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1570 return Err(TypedStoreError::CrossDBBatch);
1571 }
1572
1573 purged_vals
1574 .into_iter()
1575 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1576 let k_buf = be_fix_int_ser(k.borrow())?;
1577 self.batch.delete_cf(&db.cf(), k_buf);
1578
1579 Ok(())
1580 })?;
1581 Ok(())
1582 }
1583
1584 pub fn schedule_delete_range<K: Serialize, V>(
1594 &mut self,
1595 db: &DBMap<K, V>,
1596 from: &K,
1597 to: &K,
1598 ) -> Result<(), TypedStoreError> {
1599 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1600 return Err(TypedStoreError::CrossDBBatch);
1601 }
1602
1603 let from_buf = be_fix_int_ser(from)?;
1604 let to_buf = be_fix_int_ser(to)?;
1605
1606 self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1607 Ok(())
1608 }
1609
1610 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1612 &mut self,
1613 db: &DBMap<K, V>,
1614 new_vals: impl IntoIterator<Item = (J, U)>,
1615 ) -> Result<&mut Self, TypedStoreError> {
1616 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1617 return Err(TypedStoreError::CrossDBBatch);
1618 }
1619 let mut total = 0usize;
1620 new_vals
1621 .into_iter()
1622 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1623 let k_buf = be_fix_int_ser(k.borrow())?;
1624 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1625 total += k_buf.len() + v_buf.len();
1626 self.batch.put_cf(&db.cf(), k_buf, v_buf);
1627 Ok(())
1628 })?;
1629 self.db_metrics
1630 .op_metrics
1631 .rocksdb_batch_put_bytes
1632 .with_label_values(&[&db.cf])
1633 .observe(total as f64);
1634 Ok(self)
1635 }
1636}
1637
1638pub struct DBTransaction<'a> {
1639 rocksdb: Arc<RocksDB>,
1640 transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1641}
1642
1643impl<'a> DBTransaction<'a> {
1644 pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1645 Ok(Self {
1646 rocksdb: db.clone(),
1647 transaction: db.transaction()?,
1648 })
1649 }
1650
1651 pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1652 Ok(Self {
1653 rocksdb: db.clone(),
1654 transaction: db.transaction_without_snapshot()?,
1655 })
1656 }
1657
1658 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1659 &mut self,
1660 db: &DBMap<K, V>,
1661 new_vals: impl IntoIterator<Item = (J, U)>,
1662 ) -> Result<&mut Self, TypedStoreError> {
1663 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1664 return Err(TypedStoreError::CrossDBBatch);
1665 }
1666
1667 new_vals
1668 .into_iter()
1669 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1670 let k_buf = be_fix_int_ser(k.borrow())?;
1671 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1672 self.transaction
1673 .put_cf(&db.cf(), k_buf, v_buf)
1674 .map_err(typed_store_err_from_rocks_err)?;
1675 Ok(())
1676 })?;
1677 Ok(self)
1678 }
1679
1680 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1682 &mut self,
1683 db: &DBMap<K, V>,
1684 purged_vals: impl IntoIterator<Item = J>,
1685 ) -> Result<&mut Self, TypedStoreError> {
1686 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1687 return Err(TypedStoreError::CrossDBBatch);
1688 }
1689 purged_vals
1690 .into_iter()
1691 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1692 let k_buf = be_fix_int_ser(k.borrow())?;
1693 self.transaction
1694 .delete_cf(&db.cf(), k_buf)
1695 .map_err(typed_store_err_from_rocks_err)?;
1696 Ok(())
1697 })?;
1698 Ok(self)
1699 }
1700
1701 pub fn snapshot(
1702 &self,
1703 ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1704 {
1705 self.transaction.snapshot()
1706 }
1707
1708 pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1709 &self,
1710 db: &DBMap<K, V>,
1711 key: &K,
1712 ) -> Result<Option<V>, TypedStoreError> {
1713 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1714 return Err(TypedStoreError::CrossDBBatch);
1715 }
1716 let k_buf = be_fix_int_ser(key)?;
1717 match self
1718 .transaction
1719 .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1720 .map_err(typed_store_err_from_rocks_err)?
1721 {
1722 Some(data) => Ok(Some(
1723 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1724 )),
1725 None => Ok(None),
1726 }
1727 }
1728
1729 pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1730 &self,
1731 db: &DBMap<K, V>,
1732 key: &K,
1733 ) -> Result<Option<V>, TypedStoreError> {
1734 let key_buf = be_fix_int_ser(key)?;
1735 self.transaction
1736 .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1737 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1738 .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1739 }
1740
1741 pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1742 &self,
1743 db: &DBMap<K, V>,
1744 keys: impl IntoIterator<Item = J>,
1745 ) -> Result<Vec<Option<V>>, TypedStoreError> {
1746 let cf = db.cf();
1747 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1748 .into_iter()
1749 .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1750 .collect();
1751
1752 let results = self
1753 .transaction
1754 .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1755
1756 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1757 .into_iter()
1758 .map(
1759 |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1760 Some(data) => Ok(Some(
1761 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1762 )),
1763 None => Ok(None),
1764 },
1765 )
1766 .collect();
1767
1768 values_parsed
1769 }
1770
1771 pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1772 &'a self,
1773 db: &DBMap<K, V>,
1774 ) -> Iter<'a, K, V> {
1775 let db_iter = self
1776 .transaction
1777 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1778 Iter::new(
1779 db.cf.clone(),
1780 RocksDBRawIter::OptimisticTransaction(db_iter),
1781 None,
1782 None,
1783 None,
1784 None,
1785 None,
1786 )
1787 }
1788
1789 pub fn commit(self) -> Result<(), TypedStoreError> {
1790 fail_point!("transaction-commit");
1791 self.transaction.commit().map_err(|e| match e.kind() {
1792 ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1795 _ => typed_store_err_from_rocks_err(e),
1796 })?;
1797 Ok(())
1798 }
1799}
1800
1801macro_rules! delegate_iter_call {
1802 ($self:ident.$method:ident($($args:ident),*)) => {
1803 match $self {
1804 Self::DB(db) => db.$method($($args),*),
1805 Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1806 Self::OptimisticTransaction(db) => db.$method($($args),*),
1807 }
1808 }
1809}
1810
1811pub enum RocksDBRawIter<'a> {
1812 DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1813 OptimisticTransactionDB(
1814 rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1815 ),
1816 OptimisticTransaction(
1817 rocksdb::DBRawIteratorWithThreadMode<
1818 'a,
1819 Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1820 >,
1821 ),
1822}
1823
1824impl RocksDBRawIter<'_> {
1825 pub fn valid(&self) -> bool {
1826 delegate_iter_call!(self.valid())
1827 }
1828 pub fn key(&self) -> Option<&[u8]> {
1829 delegate_iter_call!(self.key())
1830 }
1831 pub fn value(&self) -> Option<&[u8]> {
1832 delegate_iter_call!(self.value())
1833 }
1834 pub fn next(&mut self) {
1835 delegate_iter_call!(self.next())
1836 }
1837 pub fn prev(&mut self) {
1838 delegate_iter_call!(self.prev())
1839 }
1840 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1841 delegate_iter_call!(self.seek(key))
1842 }
1843 pub fn seek_to_last(&mut self) {
1844 delegate_iter_call!(self.seek_to_last())
1845 }
1846 pub fn seek_to_first(&mut self) {
1847 delegate_iter_call!(self.seek_to_first())
1848 }
1849 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1850 delegate_iter_call!(self.seek_for_prev(key))
1851 }
1852 pub fn status(&self) -> Result<(), rocksdb::Error> {
1853 delegate_iter_call!(self.status())
1854 }
1855}
1856
1857pub enum RocksDBIter<'a> {
1858 DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1859 OptimisticTransactionDB(
1860 rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1861 ),
1862}
1863
1864impl Iterator for RocksDBIter<'_> {
1865 type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1866 fn next(&mut self) -> Option<Self::Item> {
1867 match self {
1868 Self::DB(db) => db.next(),
1869 Self::OptimisticTransactionDB(db) => db.next(),
1870 }
1871 }
1872}
1873
1874impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1875where
1876 K: Serialize + DeserializeOwned,
1877 V: Serialize + DeserializeOwned,
1878{
1879 type Error = TypedStoreError;
1880 type Iterator = Iter<'a, K, V>;
1881 type SafeIterator = SafeIter<'a, K, V>;
1882
1883 #[instrument(level = "trace", skip_all, err)]
1884 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1885 let key_buf = be_fix_int_ser(key)?;
1886 let readopts = self.opts.readopts();
1889 Ok(self
1890 .rocksdb
1891 .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1892 && self
1893 .rocksdb
1894 .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1895 .map_err(typed_store_err_from_rocks_err)?
1896 .is_some())
1897 }
1898
1899 #[instrument(level = "trace", skip_all, err)]
1900 fn multi_contains_keys<J>(
1901 &self,
1902 keys: impl IntoIterator<Item = J>,
1903 ) -> Result<Vec<bool>, Self::Error>
1904 where
1905 J: Borrow<K>,
1906 {
1907 let values = self.multi_get_pinned(keys)?;
1908 Ok(values.into_iter().map(|v| v.is_some()).collect())
1909 }
1910
1911 #[instrument(level = "trace", skip_all, err)]
1912 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1913 let _timer = self
1914 .db_metrics
1915 .op_metrics
1916 .rocksdb_get_latency_seconds
1917 .with_label_values(&[&self.cf])
1918 .start_timer();
1919 let perf_ctx = if self.get_sample_interval.sample() {
1920 Some(RocksDBPerfContext)
1921 } else {
1922 None
1923 };
1924 let key_buf = be_fix_int_ser(key)?;
1925 let res = self
1926 .rocksdb
1927 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1928 .map_err(typed_store_err_from_rocks_err)?;
1929 self.db_metrics
1930 .op_metrics
1931 .rocksdb_get_bytes
1932 .with_label_values(&[&self.cf])
1933 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1934 if perf_ctx.is_some() {
1935 self.db_metrics
1936 .read_perf_ctx_metrics
1937 .report_metrics(&self.cf);
1938 }
1939 match res {
1940 Some(data) => Ok(Some(
1941 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1942 )),
1943 None => Ok(None),
1944 }
1945 }
1946
1947 #[instrument(level = "trace", skip_all, err)]
1948 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1949 let timer = self
1950 .db_metrics
1951 .op_metrics
1952 .rocksdb_put_latency_seconds
1953 .with_label_values(&[&self.cf])
1954 .start_timer();
1955 let perf_ctx = if self.write_sample_interval.sample() {
1956 Some(RocksDBPerfContext)
1957 } else {
1958 None
1959 };
1960 let key_buf = be_fix_int_ser(key)?;
1961 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1962 self.db_metrics
1963 .op_metrics
1964 .rocksdb_put_bytes
1965 .with_label_values(&[&self.cf])
1966 .observe((key_buf.len() + value_buf.len()) as f64);
1967 if perf_ctx.is_some() {
1968 self.db_metrics
1969 .write_perf_ctx_metrics
1970 .report_metrics(&self.cf);
1971 }
1972 self.rocksdb
1973 .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
1974 .map_err(typed_store_err_from_rocks_err)?;
1975
1976 let elapsed = timer.stop_and_record();
1977 if elapsed > 1.0 {
1978 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1979 self.db_metrics
1980 .op_metrics
1981 .rocksdb_very_slow_puts_count
1982 .with_label_values(&[&self.cf])
1983 .inc();
1984 self.db_metrics
1985 .op_metrics
1986 .rocksdb_very_slow_puts_duration_ms
1987 .with_label_values(&[&self.cf])
1988 .inc_by((elapsed * 1000.0) as u64);
1989 }
1990
1991 Ok(())
1992 }
1993
1994 #[instrument(level = "trace", skip_all, err)]
1995 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1996 let _timer = self
1997 .db_metrics
1998 .op_metrics
1999 .rocksdb_delete_latency_seconds
2000 .with_label_values(&[&self.cf])
2001 .start_timer();
2002 let perf_ctx = if self.write_sample_interval.sample() {
2003 Some(RocksDBPerfContext)
2004 } else {
2005 None
2006 };
2007 let key_buf = be_fix_int_ser(key)?;
2008 self.rocksdb
2009 .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2010 .map_err(typed_store_err_from_rocks_err)?;
2011 self.db_metrics
2012 .op_metrics
2013 .rocksdb_deletes
2014 .with_label_values(&[&self.cf])
2015 .inc();
2016 if perf_ctx.is_some() {
2017 self.db_metrics
2018 .write_perf_ctx_metrics
2019 .report_metrics(&self.cf);
2020 }
2021 Ok(())
2022 }
2023
2024 #[instrument(level = "trace", skip_all, err)]
2029 fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2030 let _ = self.rocksdb.drop_cf(&self.cf);
2031 self.rocksdb
2032 .create_cf(self.cf.clone(), &default_db_options().options)
2033 .map_err(typed_store_err_from_rocks_err)?;
2034 Ok(())
2035 }
2036
2037 #[instrument(level = "trace", skip_all, err)]
2046 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2047 let first_key = self.unbounded_iter().next().map(|(k, _v)| k);
2048 let last_key = self
2049 .reversed_safe_iter_with_bounds(None, None)?
2050 .next()
2051 .transpose()?
2052 .map(|(k, _v)| k);
2053 if let Some((first_key, last_key)) = first_key.zip(last_key) {
2054 let mut batch = self.batch();
2055 batch.schedule_delete_range(self, &first_key, &last_key)?;
2056 batch.write()?;
2057 }
2058 Ok(())
2059 }
2060
2061 fn is_empty(&self) -> bool {
2062 self.safe_iter().next().is_none()
2063 }
2064
2065 fn unbounded_iter(&'a self) -> Self::Iterator {
2068 let db_iter = self
2069 .rocksdb
2070 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2071 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2072 Iter::new(
2073 self.cf.clone(),
2074 db_iter,
2075 _timer,
2076 _perf_ctx,
2077 bytes_scanned,
2078 keys_scanned,
2079 Some(self.db_metrics.clone()),
2080 )
2081 }
2082
2083 fn iter_with_bounds(
2087 &'a self,
2088 lower_bound: Option<K>,
2089 upper_bound: Option<K>,
2090 ) -> Self::Iterator {
2091 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2092 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2093 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2094 Iter::new(
2095 self.cf.clone(),
2096 db_iter,
2097 _timer,
2098 _perf_ctx,
2099 bytes_scanned,
2100 keys_scanned,
2101 Some(self.db_metrics.clone()),
2102 )
2103 }
2104
2105 fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2108 let readopts = self.create_read_options_with_range(range);
2109 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2110 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2111 Iter::new(
2112 self.cf.clone(),
2113 db_iter,
2114 _timer,
2115 _perf_ctx,
2116 bytes_scanned,
2117 keys_scanned,
2118 Some(self.db_metrics.clone()),
2119 )
2120 }
2121
2122 fn safe_iter(&'a self) -> Self::SafeIterator {
2123 let db_iter = self
2124 .rocksdb
2125 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2126 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2127 SafeIter::new(
2128 self.cf.clone(),
2129 db_iter,
2130 _timer,
2131 _perf_ctx,
2132 bytes_scanned,
2133 keys_scanned,
2134 Some(self.db_metrics.clone()),
2135 )
2136 }
2137
2138 fn safe_iter_with_bounds(
2139 &'a self,
2140 lower_bound: Option<K>,
2141 upper_bound: Option<K>,
2142 ) -> Self::SafeIterator {
2143 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2144 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2145 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2146 SafeIter::new(
2147 self.cf.clone(),
2148 db_iter,
2149 _timer,
2150 _perf_ctx,
2151 bytes_scanned,
2152 keys_scanned,
2153 Some(self.db_metrics.clone()),
2154 )
2155 }
2156
2157 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2158 let readopts = self.create_read_options_with_range(range);
2159 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2160 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2161 SafeIter::new(
2162 self.cf.clone(),
2163 db_iter,
2164 _timer,
2165 _perf_ctx,
2166 bytes_scanned,
2167 keys_scanned,
2168 Some(self.db_metrics.clone()),
2169 )
2170 }
2171
2172 #[instrument(level = "trace", skip_all, err)]
2174 fn multi_get<J>(
2175 &self,
2176 keys: impl IntoIterator<Item = J>,
2177 ) -> Result<Vec<Option<V>>, TypedStoreError>
2178 where
2179 J: Borrow<K>,
2180 {
2181 let results = self.multi_get_pinned(keys)?;
2182 let values_parsed: Result<Vec<_>, TypedStoreError> = results
2183 .into_iter()
2184 .map(|value_byte| match value_byte {
2185 Some(data) => Ok(Some(
2186 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2187 )),
2188 None => Ok(None),
2189 })
2190 .collect();
2191
2192 values_parsed
2193 }
2194
2195 #[instrument(level = "trace", skip_all, err)]
2197 fn multi_insert<J, U>(
2198 &self,
2199 key_val_pairs: impl IntoIterator<Item = (J, U)>,
2200 ) -> Result<(), Self::Error>
2201 where
2202 J: Borrow<K>,
2203 U: Borrow<V>,
2204 {
2205 let mut batch = self.batch();
2206 batch.insert_batch(self, key_val_pairs)?;
2207 batch.write()
2208 }
2209
2210 #[instrument(level = "trace", skip_all, err)]
2212 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2213 where
2214 J: Borrow<K>,
2215 {
2216 let mut batch = self.batch();
2217 batch.delete_batch(self, keys)?;
2218 batch.write()
2219 }
2220
2221 #[instrument(level = "trace", skip_all, err)]
2223 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2224 self.rocksdb
2225 .try_catch_up_with_primary()
2226 .map_err(typed_store_err_from_rocks_err)
2227 }
2228}
2229
2230impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2231where
2232 J: Borrow<K>,
2233 U: Borrow<V>,
2234 K: Serialize,
2235 V: Serialize,
2236{
2237 type Error = TypedStoreError;
2238
2239 fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2240 where
2241 T: Iterator<Item = (J, U)>,
2242 {
2243 let mut batch = self.batch();
2244 batch.insert_batch(self, iter)?;
2245 batch.write()
2246 }
2247
2248 fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2249 let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2250 let mut batch = self.batch();
2251 batch.insert_batch(self, slice_of_refs)?;
2252 batch.write()
2253 }
2254}
2255
2256pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2257 env::var(var_name)
2258 .ok()?
2259 .parse::<usize>()
2260 .tap_err(|e| {
2261 warn!(
2262 "Env var {} does not contain valid usize integer: {}",
2263 var_name, e
2264 )
2265 })
2266 .ok()
2267}
2268
2269#[derive(Clone, Debug)]
2270pub struct ReadWriteOptions {
2271 pub ignore_range_deletions: bool,
2272 sync_to_disk: bool,
2274}
2275
2276impl ReadWriteOptions {
2277 pub fn readopts(&self) -> ReadOptions {
2278 let mut readopts = ReadOptions::default();
2279 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2280 readopts
2281 }
2282
2283 pub fn writeopts(&self) -> WriteOptions {
2284 let mut opts = WriteOptions::default();
2285 opts.set_sync(self.sync_to_disk);
2286 opts
2287 }
2288
2289 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2290 self.ignore_range_deletions = ignore;
2291 self
2292 }
2293}
2294
2295impl Default for ReadWriteOptions {
2296 fn default() -> Self {
2297 Self {
2298 ignore_range_deletions: true,
2299 sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2300 }
2301 }
2302}
2303#[derive(Default, Clone)]
2306pub struct DBOptions {
2307 pub options: rocksdb::Options,
2308 pub rw_options: ReadWriteOptions,
2309}
2310
2311impl DBOptions {
2312 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2316 self.options
2318 .optimize_for_point_lookup(block_cache_size_mb as u64);
2319 self
2320 }
2321
2322 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2325 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2326 info!("Large value blob storage optimization is disabled via env var.");
2327 return self;
2328 }
2329
2330 self.options.set_enable_blob_files(true);
2332 self.options
2333 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2334 self.options.set_enable_blob_gc(true);
2335 self.options.set_min_blob_size(min_blob_size);
2339
2340 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2342 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2343 * 1024
2344 * 1024;
2345 self.options.set_write_buffer_size(write_buffer_size);
2346 let target_file_size_base = 64 << 20;
2349 self.options
2350 .set_target_file_size_base(target_file_size_base);
2351 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2353 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2354 self.options
2355 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2356
2357 self
2358 }
2359
2360 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2362 self.options
2363 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2364 self
2365 }
2366
2367 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2369 self.options
2370 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2371 self.options
2372 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2373 self
2374 }
2375
2376 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2378 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2380 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2381 * 1024
2382 * 1024;
2383 self.options.set_write_buffer_size(write_buffer_size);
2384 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2386 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2387 self.options
2388 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2389 self.options
2391 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2392
2393 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2395 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2396 self.options.set_level_zero_file_num_compaction_trigger(
2397 max_level_zero_file_num.try_into().unwrap(),
2398 );
2399 self.options.set_level_zero_slowdown_writes_trigger(
2400 (max_level_zero_file_num * 12).try_into().unwrap(),
2401 );
2402 self.options
2403 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2404
2405 self.options.set_target_file_size_base(
2407 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2408 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2409 * 1024
2410 * 1024,
2411 );
2412
2413 self.options
2415 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2416
2417 self
2418 }
2419
2420 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2424 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2426 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2427 * 1024
2428 * 1024;
2429 self.options.set_write_buffer_size(write_buffer_size);
2430 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2432 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2433 self.options
2434 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2435 self.options
2437 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2438
2439 self.options
2441 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2442 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2443 compaction_options.set_max_size_amplification_percent(10000);
2444 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2445 self.options
2446 .set_universal_compaction_options(&compaction_options);
2447
2448 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2449 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2450 self.options.set_level_zero_file_num_compaction_trigger(
2451 max_level_zero_file_num.try_into().unwrap(),
2452 );
2453 self.options.set_level_zero_slowdown_writes_trigger(
2454 (max_level_zero_file_num * 12).try_into().unwrap(),
2455 );
2456 self.options
2457 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2458
2459 self.options.set_target_file_size_base(
2461 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2462 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2463 * 1024
2464 * 1024,
2465 );
2466
2467 self.options
2469 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2470
2471 self
2472 }
2473
2474 pub fn set_block_options(
2476 mut self,
2477 block_cache_size_mb: usize,
2478 block_size_bytes: usize,
2479 ) -> DBOptions {
2480 self.options
2481 .set_block_based_table_factory(&get_block_options(
2482 block_cache_size_mb,
2483 block_size_bytes,
2484 ));
2485 self
2486 }
2487
2488 pub fn disable_write_throttling(mut self) -> DBOptions {
2490 self.options.set_soft_pending_compaction_bytes_limit(0);
2491 self.options.set_hard_pending_compaction_bytes_limit(0);
2492 self
2493 }
2494}
2495
2496pub fn default_db_options() -> DBOptions {
2499 let mut opt = rocksdb::Options::default();
2500
2501 if let Some(limit) = fdlimit::raise_fd_limit() {
2505 opt.set_max_open_files((limit / 8) as i32);
2507 }
2508
2509 opt.set_table_cache_num_shard_bits(10);
2512
2513 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2515 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2516 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2517
2518 opt.set_db_write_buffer_size(
2530 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2531 * 1024
2532 * 1024,
2533 );
2534 opt.set_max_total_wal_size(
2535 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2536 );
2537
2538 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2540
2541 opt.set_enable_pipelined_write(true);
2542
2543 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2546
2547 opt.set_memtable_prefix_bloom_ratio(0.02);
2549
2550 DBOptions {
2551 options: opt,
2552 rw_options: ReadWriteOptions::default(),
2553 }
2554}
2555
2556fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2557 let mut block_options = BlockBasedOptions::default();
2562 block_options.set_block_size(block_size_bytes);
2564 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2566 block_options.set_bloom_filter(10.0, false);
2568 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2570 block_options
2571}
2572
2573#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2576pub fn open_cf<P: AsRef<Path>>(
2577 path: P,
2578 db_options: Option<rocksdb::Options>,
2579 metric_conf: MetricConf,
2580 opt_cfs: &[&str],
2581) -> Result<Arc<RocksDB>, TypedStoreError> {
2582 let options = db_options.unwrap_or_else(|| default_db_options().options);
2583 let column_descriptors: Vec<_> = opt_cfs
2584 .iter()
2585 .map(|name| (*name, options.clone()))
2586 .collect();
2587 open_cf_opts(
2588 path,
2589 Some(options.clone()),
2590 metric_conf,
2591 &column_descriptors[..],
2592 )
2593}
2594
2595fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2596 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2598 options.create_if_missing(true);
2599 options.create_missing_column_families(true);
2600 options
2601}
2602
2603#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2606pub fn open_cf_opts<P: AsRef<Path>>(
2607 path: P,
2608 db_options: Option<rocksdb::Options>,
2609 metric_conf: MetricConf,
2610 opt_cfs: &[(&str, rocksdb::Options)],
2611) -> Result<Arc<RocksDB>, TypedStoreError> {
2612 let path = path.as_ref();
2613 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2623 nondeterministic!({
2624 let options = prepare_db_options(db_options);
2625 let rocksdb = {
2626 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2627 &options,
2628 path,
2629 cfs.into_iter()
2630 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2631 )
2632 .map_err(typed_store_err_from_rocks_err)?
2633 };
2634 Ok(Arc::new(RocksDB::DBWithThreadMode(
2635 DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2636 )))
2637 })
2638}
2639
2640#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2643pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2644 path: P,
2645 db_options: Option<rocksdb::Options>,
2646 metric_conf: MetricConf,
2647 opt_cfs: &[(&str, rocksdb::Options)],
2648) -> Result<Arc<RocksDB>, TypedStoreError> {
2649 let path = path.as_ref();
2650 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2651 nondeterministic!({
2653 let options = prepare_db_options(db_options);
2654 let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2655 &options,
2656 path,
2657 cfs.into_iter()
2658 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2659 )
2660 .map_err(typed_store_err_from_rocks_err)?;
2661 Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2662 OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2663 )))
2664 })
2665}
2666
2667pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2670 primary_path: P,
2671 secondary_path: Option<P>,
2672 db_options: Option<rocksdb::Options>,
2673 metric_conf: MetricConf,
2674 opt_cfs: &[(&str, rocksdb::Options)],
2675) -> Result<Arc<RocksDB>, TypedStoreError> {
2676 let primary_path = primary_path.as_ref();
2677 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2678 nondeterministic!({
2680 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2682
2683 fdlimit::raise_fd_limit();
2684 options.set_max_open_files(-1);
2686
2687 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2688 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2689 .ok()
2690 .unwrap_or_default();
2691
2692 let default_db_options = default_db_options();
2693 for cf_key in cfs.iter() {
2695 if !opt_cfs.contains_key(&cf_key[..]) {
2696 opt_cfs.insert(cf_key, default_db_options.options.clone());
2697 }
2698 }
2699
2700 let primary_path = primary_path.to_path_buf();
2701 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2702 let mut s = primary_path.clone();
2703 s.pop();
2704 s.push("SECONDARY");
2705 s.as_path().to_path_buf()
2706 });
2707
2708 let rocksdb = {
2709 options.create_if_missing(true);
2710 options.create_missing_column_families(true);
2711 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2712 &options,
2713 &primary_path,
2714 &secondary_path,
2715 opt_cfs
2716 .iter()
2717 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2718 )
2719 .map_err(typed_store_err_from_rocks_err)?;
2720 db.try_catch_up_with_primary()
2721 .map_err(typed_store_err_from_rocks_err)?;
2722 db
2723 };
2724 Ok(Arc::new(RocksDB::DBWithThreadMode(
2725 DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2726 )))
2727 })
2728}
2729
2730pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2731 const DB_DEFAULT_CF_NAME: &str = "default";
2732
2733 let opts = rocksdb::Options::default();
2734 rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2735 .map_err(|e| e.into())
2736 .map(|q| {
2737 q.iter()
2738 .filter_map(|s| {
2739 if s != DB_DEFAULT_CF_NAME {
2741 Some(s.clone())
2742 } else {
2743 None
2744 }
2745 })
2746 .collect()
2747 })
2748}
2749
2750#[inline]
2752pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2753where
2754 S: ?Sized + serde::Serialize,
2755{
2756 bincode::DefaultOptions::new()
2757 .with_big_endian()
2758 .with_fixint_encoding()
2759 .serialize(t)
2760 .map_err(typed_store_err_from_bincode_err)
2761}
2762
2763#[derive(Clone)]
2764pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2765impl DBMapTableConfigMap {
2766 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2767 Self(map)
2768 }
2769
2770 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2771 self.0.clone()
2772 }
2773}
2774
2775pub enum RocksDBAccessType {
2776 Primary,
2777 Secondary(Option<PathBuf>),
2778}
2779
2780pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2781 rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2782}
2783
2784fn populate_missing_cfs(
2785 input_cfs: &[(&str, rocksdb::Options)],
2786 path: &Path,
2787) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2788 let mut cfs = vec![];
2789 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2790 let existing_cfs =
2791 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2792 .ok()
2793 .unwrap_or_default();
2794
2795 for cf_name in existing_cfs {
2796 if !input_cf_index.contains(&cf_name[..]) {
2797 cfs.push((cf_name, rocksdb::Options::default()));
2798 }
2799 }
2800 cfs.extend(
2801 input_cfs
2802 .iter()
2803 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2804 );
2805 Ok(cfs)
2806}
2807
2808fn big_endian_saturating_add_one(v: &mut [u8]) {
2812 if is_max(v) {
2813 return;
2814 }
2815 for i in (0..v.len()).rev() {
2816 if v[i] == u8::MAX {
2817 v[i] = 0;
2818 } else {
2819 v[i] += 1;
2820 break;
2821 }
2822 }
2823}
2824
2825fn is_max(v: &[u8]) -> bool {
2827 v.iter().all(|&x| x == u8::MAX)
2828}
2829
2830#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2833#[test]
2834fn test_helpers() {
2835 let v = vec![];
2836 assert!(is_max(&v));
2837
2838 fn check_add(v: Vec<u8>) {
2839 let mut v = v;
2840 let num = Num32::from_big_endian(&v);
2841 big_endian_saturating_add_one(&mut v);
2842 assert!(num + 1 == Num32::from_big_endian(&v));
2843 }
2844
2845 uint::construct_uint! {
2846 struct Num32(4);
2848 }
2849
2850 let mut v = vec![255; 32];
2851 big_endian_saturating_add_one(&mut v);
2852 assert!(Num32::MAX == Num32::from_big_endian(&v));
2853
2854 check_add(vec![1; 32]);
2855 check_add(vec![6; 32]);
2856 check_add(vec![254; 32]);
2857
2858 }