1pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod safe_iter;
8
9use std::{
10 borrow::Borrow,
11 collections::{BTreeMap, HashSet},
12 env,
13 ffi::CStr,
14 marker::PhantomData,
15 ops::{Bound, RangeBounds},
16 path::{Path, PathBuf},
17 sync::Arc,
18 time::Duration,
19};
20
21use bincode::Options;
22use collectable::TryExtend;
23use iota_macros::{fail_point, nondeterministic};
24use prometheus::{Histogram, HistogramTimer};
25use rocksdb::{
26 AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
27 ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
28 IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
29 ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
30 WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
31};
32use serde::{Serialize, de::DeserializeOwned};
33use tap::TapFallible;
34use tokio::sync::oneshot;
35use tracing::{debug, error, info, instrument, warn};
36
37use self::iter::Iter;
38use crate::{
39 TypedStoreError,
40 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
41 rocks::{
42 errors::{
43 typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
44 typed_store_err_from_rocks_err,
45 },
46 safe_iter::{SafeIter, SafeRevIter},
47 },
48 traits::{Map, TableSummary},
49};
50
51const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
54const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
55
56const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
59const DEFAULT_DB_WAL_SIZE: usize = 1024;
60
61const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
64const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
65const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
66const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
67const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
68const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
69const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
70const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
71const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
72
73const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
75
76const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
77
78const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
81 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
82
83const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
84
85#[cfg(test)]
86mod tests;
87
88#[macro_export]
126macro_rules! reopen {
127 ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
128 (
129 $(
130 DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
131 ),*
132 )
133 };
134}
135
136#[macro_export]
140macro_rules! retry_transaction {
141 ($transaction:expr) => {
142 retry_transaction!($transaction, Some(20))
143 };
144
145 (
146 $transaction:expr,
147 $max_retries:expr $(,)?
149
150 ) => {{
151 use std::time::Duration;
152
153 use rand::{
154 distributions::{Distribution, Uniform},
155 rngs::ThreadRng,
156 };
157 use tracing::{error, info};
158
159 let mut retries = 0;
160 let max_retries = $max_retries;
161 loop {
162 let status = $transaction;
163 match status {
164 Err(TypedStoreError::RetryableTransaction) => {
165 retries += 1;
166 let delay = {
168 let mut rng = ThreadRng::default();
169 Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
170 };
171 if let Some(max_retries) = max_retries {
172 if retries > max_retries {
173 error!(?max_retries, "max retries exceeded");
174 break status;
175 }
176 }
177 if retries > 10 {
178 error!(?delay, ?retries, "excessive transaction retries...");
180 } else {
181 info!(
182 ?delay,
183 ?retries,
184 "transaction write conflict detected, sleeping"
185 );
186 }
187 std::thread::sleep(delay);
188 }
189 _ => break status,
190 }
191 }
192 }};
193}
194
195#[macro_export]
196macro_rules! retry_transaction_forever {
197 ($transaction:expr) => {
198 $crate::retry_transaction!($transaction, None)
199 };
200}
201
202#[derive(Debug)]
203pub struct DBWithThreadModeWrapper {
204 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
205 pub metric_conf: MetricConf,
206 pub db_path: PathBuf,
207}
208
209impl DBWithThreadModeWrapper {
210 fn new(
211 underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
212 metric_conf: MetricConf,
213 db_path: PathBuf,
214 ) -> Self {
215 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
216 Self {
217 underlying,
218 metric_conf,
219 db_path,
220 }
221 }
222}
223
224impl Drop for DBWithThreadModeWrapper {
225 fn drop(&mut self) {
226 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
227 }
228}
229
230#[derive(Debug)]
231pub struct OptimisticTransactionDBWrapper {
232 pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
233 pub metric_conf: MetricConf,
234 pub db_path: PathBuf,
235}
236
237impl OptimisticTransactionDBWrapper {
238 fn new(
239 underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
240 metric_conf: MetricConf,
241 db_path: PathBuf,
242 ) -> Self {
243 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
244 Self {
245 underlying,
246 metric_conf,
247 db_path,
248 }
249 }
250}
251
252impl Drop for OptimisticTransactionDBWrapper {
253 fn drop(&mut self) {
254 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
255 }
256}
257
258#[derive(Debug)]
260pub enum RocksDB {
261 DBWithThreadMode(DBWithThreadModeWrapper),
262 OptimisticTransactionDB(OptimisticTransactionDBWrapper),
263}
264
265macro_rules! delegate_call {
266 ($self:ident.$method:ident($($args:ident),*)) => {
267 match $self {
268 Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
269 Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
270 }
271 }
272}
273
274impl Drop for RocksDB {
275 fn drop(&mut self) {
276 delegate_call!(self.cancel_all_background_work(true))
277 }
278}
279
280impl RocksDB {
281 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
282 delegate_call!(self.get(key))
283 }
284
285 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
286 &'a self,
287 keys: I,
288 readopts: &ReadOptions,
289 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
290 where
291 K: AsRef<[u8]>,
292 I: IntoIterator<Item = (&'b W, K)>,
293 W: 'b + AsColumnFamilyRef,
294 {
295 delegate_call!(self.multi_get_cf_opt(keys, readopts))
296 }
297
298 pub fn batched_multi_get_cf_opt<I, K>(
299 &self,
300 cf: &impl AsColumnFamilyRef,
301 keys: I,
302 sorted_input: bool,
303 readopts: &ReadOptions,
304 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
305 where
306 I: IntoIterator<Item = K>,
307 K: AsRef<[u8]>,
308 {
309 delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
310 }
311
312 pub fn property_int_value_cf(
313 &self,
314 cf: &impl AsColumnFamilyRef,
315 name: impl CStrLike,
316 ) -> Result<Option<u64>, rocksdb::Error> {
317 delegate_call!(self.property_int_value_cf(cf, name))
318 }
319
320 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
321 &self,
322 cf: &impl AsColumnFamilyRef,
323 key: K,
324 readopts: &ReadOptions,
325 ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
326 delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
327 }
328
329 pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
330 delegate_call!(self.cf_handle(name))
331 }
332
333 pub fn create_cf<N: AsRef<str>>(
334 &self,
335 name: N,
336 opts: &rocksdb::Options,
337 ) -> Result<(), rocksdb::Error> {
338 delegate_call!(self.create_cf(name, opts))
339 }
340
341 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
342 delegate_call!(self.drop_cf(name))
343 }
344
345 pub fn delete_cf<K: AsRef<[u8]>>(
346 &self,
347 cf: &impl AsColumnFamilyRef,
348 key: K,
349 writeopts: &WriteOptions,
350 ) -> Result<(), rocksdb::Error> {
351 fail_point!("delete-cf-before");
352 let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
353 fail_point!("delete-cf-after");
354 #[expect(clippy::let_and_return)]
355 ret
356 }
357
358 pub fn path(&self) -> &Path {
359 delegate_call!(self.path())
360 }
361
362 pub fn put_cf<K, V>(
363 &self,
364 cf: &impl AsColumnFamilyRef,
365 key: K,
366 value: V,
367 writeopts: &WriteOptions,
368 ) -> Result<(), rocksdb::Error>
369 where
370 K: AsRef<[u8]>,
371 V: AsRef<[u8]>,
372 {
373 fail_point!("put-cf-before");
374 let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
375 fail_point!("put-cf-after");
376 #[expect(clippy::let_and_return)]
377 ret
378 }
379
380 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
381 &self,
382 cf: &impl AsColumnFamilyRef,
383 key: K,
384 readopts: &ReadOptions,
385 ) -> bool {
386 delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
387 }
388
389 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
390 delegate_call!(self.try_catch_up_with_primary())
391 }
392
393 pub fn write(
394 &self,
395 batch: RocksDBBatch,
396 writeopts: &WriteOptions,
397 ) -> Result<(), TypedStoreError> {
398 fail_point!("batch-write-before");
399 let ret = match (self, batch) {
400 (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
401 db.underlying
402 .write_opt(batch, writeopts)
403 .map_err(typed_store_err_from_rocks_err)?;
404 Ok(())
405 }
406 (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
407 db.underlying
408 .write_opt(batch, writeopts)
409 .map_err(typed_store_err_from_rocks_err)?;
410 Ok(())
411 }
412 _ => Err(TypedStoreError::RocksDB(
413 "using invalid batch type for the database".to_string(),
414 )),
415 };
416 fail_point!("batch-write-after");
417 #[expect(clippy::let_and_return)]
418 ret
419 }
420
421 pub fn transaction_without_snapshot(
422 &self,
423 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
424 match self {
425 Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
426 Self::DBWithThreadMode(_) => panic!(),
427 }
428 }
429
430 pub fn transaction(
431 &self,
432 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
433 match self {
434 Self::OptimisticTransactionDB(db) => {
435 let mut tx_opts = OptimisticTransactionOptions::new();
436 tx_opts.set_snapshot(true);
437
438 Ok(db
439 .underlying
440 .transaction_opt(&WriteOptions::default(), &tx_opts))
441 }
442 Self::DBWithThreadMode(_) => panic!(),
443 }
444 }
445
446 pub fn raw_iterator_cf<'a: 'b, 'b>(
447 &'a self,
448 cf_handle: &impl AsColumnFamilyRef,
449 readopts: ReadOptions,
450 ) -> RocksDBRawIter<'b> {
451 match self {
452 Self::DBWithThreadMode(db) => {
453 RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
454 }
455 Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
456 db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
457 ),
458 }
459 }
460
461 pub fn iterator_cf<'a: 'b, 'b>(
462 &'a self,
463 cf_handle: &impl AsColumnFamilyRef,
464 readopts: ReadOptions,
465 mode: IteratorMode<'_>,
466 ) -> RocksDBIter<'b> {
467 match self {
468 Self::DBWithThreadMode(db) => {
469 RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
470 }
471 Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
472 db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
473 ),
474 }
475 }
476
477 pub fn compact_range_cf<K: AsRef<[u8]>>(
478 &self,
479 cf: &impl AsColumnFamilyRef,
480 start: Option<K>,
481 end: Option<K>,
482 ) {
483 delegate_call!(self.compact_range_cf(cf, start, end))
484 }
485
486 pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
487 &self,
488 cf: &impl AsColumnFamilyRef,
489 start: Option<K>,
490 end: Option<K>,
491 ) {
492 let opt = &mut CompactOptions::default();
493 opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
494 delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
495 }
496
497 pub fn flush(&self) -> Result<(), TypedStoreError> {
498 delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
499 }
500
501 pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
502 match self {
503 Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
504 Self::OptimisticTransactionDB(d) => {
505 RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
506 }
507 }
508 }
509
510 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
511 let checkpoint = match self {
512 Self::DBWithThreadMode(d) => {
513 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
514 }
515 Self::OptimisticTransactionDB(d) => {
516 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
517 }
518 };
519 checkpoint
520 .create_checkpoint(path)
521 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
522 Ok(())
523 }
524
525 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
526 delegate_call!(self.flush_cf(cf))
527 }
528
529 pub fn set_options_cf(
530 &self,
531 cf: &impl AsColumnFamilyRef,
532 opts: &[(&str, &str)],
533 ) -> Result<(), rocksdb::Error> {
534 delegate_call!(self.set_options_cf(cf, opts))
535 }
536
537 pub fn get_sampling_interval(&self) -> SamplingInterval {
538 match self {
539 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
540 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
541 }
542 }
543
544 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
545 match self {
546 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
547 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
548 }
549 }
550
551 pub fn write_sampling_interval(&self) -> SamplingInterval {
552 match self {
553 Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
554 Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
555 }
556 }
557
558 pub fn iter_sampling_interval(&self) -> SamplingInterval {
559 match self {
560 Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
561 Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
562 }
563 }
564
565 pub fn db_name(&self) -> String {
566 let name = match self {
567 Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
568 Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
569 };
570 if name.is_empty() {
571 self.default_db_name()
572 } else {
573 name.clone()
574 }
575 }
576
577 fn default_db_name(&self) -> String {
578 self.path()
579 .file_name()
580 .and_then(|f| f.to_str())
581 .unwrap_or("unknown")
582 .to_string()
583 }
584
585 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
586 delegate_call!(self.live_files())
587 }
588}
589
590pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
593 let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
594
595 db.get(DB_CORRUPTED_KEY)
596 .map_err(|e| format!("Failed to open database: {e}"))
597 .and_then(|value| match value {
598 Some(v) if v[0] == 1 => Err(
599 "Database is corrupted, please remove the current database and start clean!"
600 .to_string(),
601 ),
602 Some(_) => Ok(()),
603 None => db
604 .put(DB_CORRUPTED_KEY, [1])
605 .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
606 })?;
607
608 Ok(())
609}
610
611pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
612 rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
613}
614
615pub enum RocksDBSnapshot<'a> {
616 DBWithThreadMode(rocksdb::Snapshot<'a>),
617 OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
618}
619
620impl<'a> RocksDBSnapshot<'a> {
621 pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
622 &'a self,
623 keys: I,
624 readopts: ReadOptions,
625 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
626 where
627 K: AsRef<[u8]>,
628 I: IntoIterator<Item = (&'b W, K)>,
629 W: 'b + AsColumnFamilyRef,
630 {
631 match self {
632 Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
633 Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
634 }
635 }
636 pub fn multi_get_cf<'b: 'a, K, I, W>(
637 &'a self,
638 keys: I,
639 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
640 where
641 K: AsRef<[u8]>,
642 I: IntoIterator<Item = (&'b W, K)>,
643 W: 'b + AsColumnFamilyRef,
644 {
645 match self {
646 Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
647 Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
648 }
649 }
650}
651
652pub enum RocksDBBatch {
653 Regular(rocksdb::WriteBatch),
654 Transactional(rocksdb::WriteBatchWithTransaction<true>),
655}
656
657macro_rules! delegate_batch_call {
658 ($self:ident.$method:ident($($args:ident),*)) => {
659 match $self {
660 Self::Regular(b) => b.$method($($args),*),
661 Self::Transactional(b) => b.$method($($args),*),
662 }
663 }
664}
665
666impl RocksDBBatch {
667 fn size_in_bytes(&self) -> usize {
668 delegate_batch_call!(self.size_in_bytes())
669 }
670
671 pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
672 delegate_batch_call!(self.delete_cf(cf, key))
673 }
674
675 pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
676 where
677 K: AsRef<[u8]>,
678 V: AsRef<[u8]>,
679 {
680 delegate_batch_call!(self.put_cf(cf, key, value))
681 }
682
683 pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
684 where
685 K: AsRef<[u8]>,
686 V: AsRef<[u8]>,
687 {
688 delegate_batch_call!(self.merge_cf(cf, key, value))
689 }
690
691 pub fn delete_range_cf<K: AsRef<[u8]>>(
692 &mut self,
693 cf: &impl AsColumnFamilyRef,
694 from: K,
695 to: K,
696 ) -> Result<(), TypedStoreError> {
697 match self {
698 Self::Regular(batch) => {
699 batch.delete_range_cf(cf, from, to);
700 Ok(())
701 }
702 Self::Transactional(_) => panic!(),
703 }
704 }
705}
706
707#[derive(Debug, Default)]
708pub struct MetricConf {
709 pub db_name: String,
710 pub read_sample_interval: SamplingInterval,
711 pub write_sample_interval: SamplingInterval,
712 pub iter_sample_interval: SamplingInterval,
713}
714
715impl MetricConf {
716 pub fn new(db_name: &str) -> Self {
717 if db_name.is_empty() {
718 error!("A meaningful db name should be used for metrics reporting.")
719 }
720 Self {
721 db_name: db_name.to_string(),
722 read_sample_interval: SamplingInterval::default(),
723 write_sample_interval: SamplingInterval::default(),
724 iter_sample_interval: SamplingInterval::default(),
725 }
726 }
727
728 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
729 Self {
730 db_name: self.db_name,
731 read_sample_interval: read_interval,
732 write_sample_interval: SamplingInterval::default(),
733 iter_sample_interval: SamplingInterval::default(),
734 }
735 }
736}
737const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
738const METRICS_ERROR: i64 = -1;
739
740#[derive(Clone, Debug)]
742pub struct DBMap<K, V> {
743 pub rocksdb: Arc<RocksDB>,
744 _phantom: PhantomData<fn(K) -> V>,
745 cf: String,
747 pub opts: ReadWriteOptions,
748 db_metrics: Arc<DBMetrics>,
749 get_sample_interval: SamplingInterval,
750 multiget_sample_interval: SamplingInterval,
751 write_sample_interval: SamplingInterval,
752 iter_sample_interval: SamplingInterval,
753 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
754}
755
756unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
757
758impl<K, V> DBMap<K, V> {
759 pub(crate) fn new(
760 db: Arc<RocksDB>,
761 opts: &ReadWriteOptions,
762 opt_cf: &str,
763 is_deprecated: bool,
764 ) -> Self {
765 let db_cloned = db.clone();
766 let db_metrics = DBMetrics::get();
767 let db_metrics_cloned = db_metrics.clone();
768 let cf = opt_cf.to_string();
769 let (sender, mut recv) = tokio::sync::oneshot::channel();
770 if !is_deprecated {
771 tokio::task::spawn(async move {
772 let mut interval =
773 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
774 loop {
775 tokio::select! {
776 _ = interval.tick() => {
777 let db = db_cloned.clone();
778 let cf = cf.clone();
779 let db_metrics = db_metrics.clone();
780 if let Err(e) = tokio::task::spawn_blocking(move || {
781 Self::report_metrics(&db, &cf, &db_metrics);
782 }).await {
783 error!("Failed to log metrics with error: {}", e);
784 }
785 }
786 _ = &mut recv => break,
787 }
788 }
789 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
790 });
791 }
792 DBMap {
793 rocksdb: db.clone(),
794 opts: opts.clone(),
795 _phantom: PhantomData,
796 cf: opt_cf.to_string(),
797 db_metrics: db_metrics_cloned,
798 _metrics_task_cancel_handle: Arc::new(sender),
799 get_sample_interval: db.get_sampling_interval(),
800 multiget_sample_interval: db.multiget_sampling_interval(),
801 write_sample_interval: db.write_sampling_interval(),
802 iter_sample_interval: db.iter_sampling_interval(),
803 }
804 }
805
806 #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
813 pub fn open<P: AsRef<Path>>(
814 path: P,
815 metric_conf: MetricConf,
816 db_options: Option<rocksdb::Options>,
817 opt_cf: Option<&str>,
818 rw_options: &ReadWriteOptions,
819 ) -> Result<Self, TypedStoreError> {
820 let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
821 let cfs = vec![cf_key];
822 let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
823 Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
824 }
825
826 #[instrument(level = "debug", skip(db), err)]
866 pub fn reopen(
867 db: &Arc<RocksDB>,
868 opt_cf: Option<&str>,
869 rw_options: &ReadWriteOptions,
870 is_deprecated: bool,
871 ) -> Result<Self, TypedStoreError> {
872 let cf_key = opt_cf
873 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
874 .to_owned();
875
876 db.cf_handle(&cf_key)
877 .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
878
879 Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
880 }
881
882 pub fn batch(&self) -> DBBatch {
883 let batch = match *self.rocksdb {
884 RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
885 RocksDB::OptimisticTransactionDB(_) => {
886 RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
887 }
888 };
889 DBBatch::new(
890 &self.rocksdb,
891 batch,
892 self.opts.writeopts(),
893 &self.db_metrics,
894 &self.write_sample_interval,
895 )
896 }
897
898 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
899 let from_buf = be_fix_int_ser(start)?;
900 let to_buf = be_fix_int_ser(end)?;
901 self.rocksdb
902 .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
903 Ok(())
904 }
905
906 pub fn compact_range_raw(
907 &self,
908 cf_name: &str,
909 start: Vec<u8>,
910 end: Vec<u8>,
911 ) -> Result<(), TypedStoreError> {
912 let cf = self
913 .rocksdb
914 .cf_handle(cf_name)
915 .expect("compact range: column family does not exist");
916 self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
917 Ok(())
918 }
919
920 pub fn compact_range_to_bottom<J: Serialize>(
921 &self,
922 start: &J,
923 end: &J,
924 ) -> Result<(), TypedStoreError> {
925 let from_buf = be_fix_int_ser(start)?;
926 let to_buf = be_fix_int_ser(end)?;
927 self.rocksdb
928 .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
929 Ok(())
930 }
931
932 pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
933 self.rocksdb
934 .cf_handle(&self.cf)
935 .expect("Map-keying column family should have been checked at DB creation")
936 }
937
938 pub fn iterator_cf(&self) -> RocksDBIter<'_> {
939 self.rocksdb
940 .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
941 }
942
943 pub fn flush(&self) -> Result<(), TypedStoreError> {
944 self.rocksdb
945 .flush_cf(&self.cf())
946 .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
947 }
948
949 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
950 self.rocksdb.set_options_cf(&self.cf(), opts)
951 }
952
953 fn get_int_property(
954 rocksdb: &RocksDB,
955 cf: &impl AsColumnFamilyRef,
956 property_name: &std::ffi::CStr,
957 ) -> Result<i64, TypedStoreError> {
958 match rocksdb.property_int_value_cf(cf, property_name) {
959 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
960 Ok(None) => Ok(0),
961 Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
962 }
963 }
964
965 fn multi_get_pinned<J>(
967 &self,
968 keys: impl IntoIterator<Item = J>,
969 ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
970 where
971 J: Borrow<K>,
972 K: Serialize,
973 {
974 let _timer = self
975 .db_metrics
976 .op_metrics
977 .rocksdb_multiget_latency_seconds
978 .with_label_values(&[&self.cf])
979 .start_timer();
980 let perf_ctx = if self.multiget_sample_interval.sample() {
981 Some(RocksDBPerfContext)
982 } else {
983 None
984 };
985 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
986 .into_iter()
987 .map(|k| be_fix_int_ser(k.borrow()))
988 .collect();
989 let results: Result<Vec<_>, TypedStoreError> = self
990 .rocksdb
991 .batched_multi_get_cf_opt(
992 &self.cf(),
993 keys_bytes?,
994 false,
996 &self.opts.readopts(),
997 )
998 .into_iter()
999 .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
1000 .collect();
1001 let entries = results?;
1002 let entry_size = entries
1003 .iter()
1004 .flatten()
1005 .map(|entry| entry.len())
1006 .sum::<usize>();
1007 self.db_metrics
1008 .op_metrics
1009 .rocksdb_multiget_bytes
1010 .with_label_values(&[&self.cf])
1011 .observe(entry_size as f64);
1012 if perf_ctx.is_some() {
1013 self.db_metrics
1014 .read_perf_ctx_metrics
1015 .report_metrics(&self.cf);
1016 }
1017 Ok(entries)
1018 }
1019
1020 fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
1021 let Some(cf) = rocksdb.cf_handle(cf_name) else {
1022 tracing::warn!(
1023 "unable to report metrics for cf {cf_name:?} in db {:?}",
1024 rocksdb.db_name()
1025 );
1026 return;
1027 };
1028
1029 db_metrics
1030 .cf_metrics
1031 .rocksdb_total_sst_files_size
1032 .with_label_values(&[cf_name])
1033 .set(
1034 Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1035 .unwrap_or(METRICS_ERROR),
1036 );
1037 db_metrics
1038 .cf_metrics
1039 .rocksdb_total_blob_files_size
1040 .with_label_values(&[cf_name])
1041 .set(
1042 Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1043 .unwrap_or(METRICS_ERROR),
1044 );
1045 let total_num_files: i64 = (0..=6)
1049 .map(|level| {
1050 Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
1051 .unwrap_or(METRICS_ERROR)
1052 })
1053 .sum();
1054 db_metrics
1055 .cf_metrics
1056 .rocksdb_total_num_files
1057 .with_label_values(&[cf_name])
1058 .set(total_num_files);
1059 db_metrics
1060 .cf_metrics
1061 .rocksdb_num_level0_files
1062 .with_label_values(&[cf_name])
1063 .set(
1064 Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
1065 .unwrap_or(METRICS_ERROR),
1066 );
1067 db_metrics
1068 .cf_metrics
1069 .rocksdb_current_size_active_mem_tables
1070 .with_label_values(&[cf_name])
1071 .set(
1072 Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1073 .unwrap_or(METRICS_ERROR),
1074 );
1075 db_metrics
1076 .cf_metrics
1077 .rocksdb_size_all_mem_tables
1078 .with_label_values(&[cf_name])
1079 .set(
1080 Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1081 .unwrap_or(METRICS_ERROR),
1082 );
1083 db_metrics
1084 .cf_metrics
1085 .rocksdb_num_snapshots
1086 .with_label_values(&[cf_name])
1087 .set(
1088 Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1089 .unwrap_or(METRICS_ERROR),
1090 );
1091 db_metrics
1092 .cf_metrics
1093 .rocksdb_oldest_snapshot_time
1094 .with_label_values(&[cf_name])
1095 .set(
1096 Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1097 .unwrap_or(METRICS_ERROR),
1098 );
1099 db_metrics
1100 .cf_metrics
1101 .rocksdb_actual_delayed_write_rate
1102 .with_label_values(&[cf_name])
1103 .set(
1104 Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1105 .unwrap_or(METRICS_ERROR),
1106 );
1107 db_metrics
1108 .cf_metrics
1109 .rocksdb_is_write_stopped
1110 .with_label_values(&[cf_name])
1111 .set(
1112 Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1113 .unwrap_or(METRICS_ERROR),
1114 );
1115 db_metrics
1116 .cf_metrics
1117 .rocksdb_block_cache_capacity
1118 .with_label_values(&[cf_name])
1119 .set(
1120 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1121 .unwrap_or(METRICS_ERROR),
1122 );
1123 db_metrics
1124 .cf_metrics
1125 .rocksdb_block_cache_usage
1126 .with_label_values(&[cf_name])
1127 .set(
1128 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1129 .unwrap_or(METRICS_ERROR),
1130 );
1131 db_metrics
1132 .cf_metrics
1133 .rocksdb_block_cache_pinned_usage
1134 .with_label_values(&[cf_name])
1135 .set(
1136 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1137 .unwrap_or(METRICS_ERROR),
1138 );
1139 db_metrics
1140 .cf_metrics
1141 .rocksdb_estimate_table_readers_mem
1142 .with_label_values(&[cf_name])
1143 .set(
1144 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1145 .unwrap_or(METRICS_ERROR),
1146 );
1147 db_metrics
1148 .cf_metrics
1149 .rocksdb_estimated_num_keys
1150 .with_label_values(&[cf_name])
1151 .set(
1152 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1153 .unwrap_or(METRICS_ERROR),
1154 );
1155 db_metrics
1156 .cf_metrics
1157 .rocksdb_num_immutable_mem_tables
1158 .with_label_values(&[cf_name])
1159 .set(
1160 Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1161 .unwrap_or(METRICS_ERROR),
1162 );
1163 db_metrics
1164 .cf_metrics
1165 .rocksdb_mem_table_flush_pending
1166 .with_label_values(&[cf_name])
1167 .set(
1168 Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1169 .unwrap_or(METRICS_ERROR),
1170 );
1171 db_metrics
1172 .cf_metrics
1173 .rocksdb_compaction_pending
1174 .with_label_values(&[cf_name])
1175 .set(
1176 Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1177 .unwrap_or(METRICS_ERROR),
1178 );
1179 db_metrics
1180 .cf_metrics
1181 .rocksdb_estimate_pending_compaction_bytes
1182 .with_label_values(&[cf_name])
1183 .set(
1184 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1185 .unwrap_or(METRICS_ERROR),
1186 );
1187 db_metrics
1188 .cf_metrics
1189 .rocksdb_num_running_compactions
1190 .with_label_values(&[cf_name])
1191 .set(
1192 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1193 .unwrap_or(METRICS_ERROR),
1194 );
1195 db_metrics
1196 .cf_metrics
1197 .rocksdb_num_running_flushes
1198 .with_label_values(&[cf_name])
1199 .set(
1200 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1201 .unwrap_or(METRICS_ERROR),
1202 );
1203 db_metrics
1204 .cf_metrics
1205 .rocksdb_estimate_oldest_key_time
1206 .with_label_values(&[cf_name])
1207 .set(
1208 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1209 .unwrap_or(METRICS_ERROR),
1210 );
1211 db_metrics
1212 .cf_metrics
1213 .rocksdb_background_errors
1214 .with_label_values(&[cf_name])
1215 .set(
1216 Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1217 .unwrap_or(METRICS_ERROR),
1218 );
1219 db_metrics
1220 .cf_metrics
1221 .rocksdb_base_level
1222 .with_label_values(&[cf_name])
1223 .set(
1224 Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1225 .unwrap_or(METRICS_ERROR),
1226 );
1227 }
1228
1229 pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1230 DBTransaction::new(&self.rocksdb)
1231 }
1232
1233 pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1234 DBTransaction::new_without_snapshot(&self.rocksdb)
1235 }
1236
1237 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1238 self.rocksdb.checkpoint(path)
1239 }
1240
1241 pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1242 Ok(self.rocksdb.snapshot())
1243 }
1244
1245 pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1246 let mut num_keys = 0;
1247 let mut key_bytes_total = 0;
1248 let mut value_bytes_total = 0;
1249 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1250 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1251 let iter = self.iterator_cf().map(Result::unwrap);
1252 for (key, value) in iter {
1253 num_keys += 1;
1254 key_bytes_total += key.len();
1255 value_bytes_total += value.len();
1256 key_hist.record(key.len() as u64)?;
1257 value_hist.record(value.len() as u64)?;
1258 }
1259 Ok(TableSummary {
1260 num_keys,
1261 key_bytes_total,
1262 value_bytes_total,
1263 key_hist,
1264 value_hist,
1265 })
1266 }
1267
1268 fn create_iter_context(
1270 &self,
1271 ) -> (
1272 Option<HistogramTimer>,
1273 Option<Histogram>,
1274 Option<Histogram>,
1275 Option<RocksDBPerfContext>,
1276 ) {
1277 let timer = self
1278 .db_metrics
1279 .op_metrics
1280 .rocksdb_iter_latency_seconds
1281 .with_label_values(&[&self.cf])
1282 .start_timer();
1283 let bytes_scanned = self
1284 .db_metrics
1285 .op_metrics
1286 .rocksdb_iter_bytes
1287 .with_label_values(&[&self.cf]);
1288 let keys_scanned = self
1289 .db_metrics
1290 .op_metrics
1291 .rocksdb_iter_keys
1292 .with_label_values(&[&self.cf]);
1293 let perf_ctx = if self.iter_sample_interval.sample() {
1294 Some(RocksDBPerfContext)
1295 } else {
1296 None
1297 };
1298 (
1299 Some(timer),
1300 Some(bytes_scanned),
1301 Some(keys_scanned),
1302 perf_ctx,
1303 )
1304 }
1305
1306 fn create_read_options_with_bounds(
1309 &self,
1310 lower_bound: Option<K>,
1311 upper_bound: Option<K>,
1312 ) -> ReadOptions
1313 where
1314 K: Serialize,
1315 {
1316 let mut readopts = self.opts.readopts();
1317 if let Some(lower_bound) = lower_bound {
1318 let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1319 readopts.set_iterate_lower_bound(key_buf);
1320 }
1321 if let Some(upper_bound) = upper_bound {
1322 let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1323 readopts.set_iterate_upper_bound(key_buf);
1324 }
1325 readopts
1326 }
1327
1328 pub fn reversed_safe_iter_with_bounds(
1331 &self,
1332 lower_bound: Option<K>,
1333 upper_bound: Option<K>,
1334 ) -> Result<SafeRevIter<'_, K, V>, TypedStoreError>
1335 where
1336 K: Serialize + DeserializeOwned,
1337 V: Serialize + DeserializeOwned,
1338 {
1339 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1340 let readopts = self.create_read_options_with_range((
1341 lower_bound
1342 .as_ref()
1343 .map(Bound::Included)
1344 .unwrap_or(Bound::Unbounded),
1345 upper_bound
1346 .as_ref()
1347 .map(Bound::Included)
1348 .unwrap_or(Bound::Unbounded),
1349 ));
1350
1351 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1352 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1353 let iter = SafeIter::new(
1354 self.cf.clone(),
1355 db_iter,
1356 _timer,
1357 _perf_ctx,
1358 bytes_scanned,
1359 keys_scanned,
1360 Some(self.db_metrics.clone()),
1361 );
1362 Ok(SafeRevIter::new(iter, upper_bound_key.transpose()?))
1363 }
1364
1365 fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1368 where
1369 K: Serialize,
1370 {
1371 let mut readopts = self.opts.readopts();
1372
1373 let lower_bound = range.start_bound();
1374 let upper_bound = range.end_bound();
1375
1376 match lower_bound {
1377 Bound::Included(lower_bound) => {
1378 let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1380 readopts.set_iterate_lower_bound(key_buf);
1381 }
1382 Bound::Excluded(lower_bound) => {
1383 let mut key_buf =
1384 be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1385
1386 big_endian_saturating_add_one(&mut key_buf);
1388 readopts.set_iterate_lower_bound(key_buf);
1389 }
1390 Bound::Unbounded => (),
1391 };
1392
1393 match upper_bound {
1394 Bound::Included(upper_bound) => {
1395 let mut key_buf =
1396 be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1397
1398 if !is_max(&key_buf) {
1401 big_endian_saturating_add_one(&mut key_buf);
1403 readopts.set_iterate_upper_bound(key_buf);
1404 }
1405 }
1406 Bound::Excluded(upper_bound) => {
1407 let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1409 readopts.set_iterate_upper_bound(key_buf);
1410 }
1411 Bound::Unbounded => (),
1412 };
1413
1414 readopts
1415 }
1416}
1417
1418pub struct DBBatch {
1485 rocksdb: Arc<RocksDB>,
1486 batch: RocksDBBatch,
1487 opts: WriteOptions,
1488 db_metrics: Arc<DBMetrics>,
1489 write_sample_interval: SamplingInterval,
1490}
1491
1492impl DBBatch {
1493 pub fn new(
1497 dbref: &Arc<RocksDB>,
1498 batch: RocksDBBatch,
1499 opts: WriteOptions,
1500 db_metrics: &Arc<DBMetrics>,
1501 write_sample_interval: &SamplingInterval,
1502 ) -> Self {
1503 DBBatch {
1504 rocksdb: dbref.clone(),
1505 batch,
1506 opts,
1507 db_metrics: db_metrics.clone(),
1508 write_sample_interval: write_sample_interval.clone(),
1509 }
1510 }
1511
1512 #[instrument(level = "trace", skip_all, err)]
1514 pub fn write(self) -> Result<(), TypedStoreError> {
1515 let db_name = self.rocksdb.db_name();
1516 let timer = self
1517 .db_metrics
1518 .op_metrics
1519 .rocksdb_batch_commit_latency_seconds
1520 .with_label_values(&[&db_name])
1521 .start_timer();
1522 let batch_size = self.batch.size_in_bytes();
1523
1524 let perf_ctx = if self.write_sample_interval.sample() {
1525 Some(RocksDBPerfContext)
1526 } else {
1527 None
1528 };
1529 self.rocksdb.write(self.batch, &self.opts)?;
1530 self.db_metrics
1531 .op_metrics
1532 .rocksdb_batch_commit_bytes
1533 .with_label_values(&[&db_name])
1534 .observe(batch_size as f64);
1535
1536 if perf_ctx.is_some() {
1537 self.db_metrics
1538 .write_perf_ctx_metrics
1539 .report_metrics(&db_name);
1540 }
1541 let elapsed = timer.stop_and_record();
1542 if elapsed > 1.0 {
1543 warn!(?elapsed, ?db_name, "very slow batch write");
1544 self.db_metrics
1545 .op_metrics
1546 .rocksdb_very_slow_batch_writes_count
1547 .with_label_values(&[&db_name])
1548 .inc();
1549 self.db_metrics
1550 .op_metrics
1551 .rocksdb_very_slow_batch_writes_duration_ms
1552 .with_label_values(&[&db_name])
1553 .inc_by((elapsed * 1000.0) as u64);
1554 }
1555 Ok(())
1556 }
1557
1558 pub fn size_in_bytes(&self) -> usize {
1559 self.batch.size_in_bytes()
1560 }
1561}
1562
1563impl DBBatch {
1564 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1565 &mut self,
1566 db: &DBMap<K, V>,
1567 purged_vals: impl IntoIterator<Item = J>,
1568 ) -> Result<(), TypedStoreError> {
1569 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1570 return Err(TypedStoreError::CrossDBBatch);
1571 }
1572
1573 purged_vals
1574 .into_iter()
1575 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1576 let k_buf = be_fix_int_ser(k.borrow())?;
1577 self.batch.delete_cf(&db.cf(), k_buf);
1578
1579 Ok(())
1580 })?;
1581 Ok(())
1582 }
1583
1584 pub fn schedule_delete_range<K: Serialize, V>(
1594 &mut self,
1595 db: &DBMap<K, V>,
1596 from: &K,
1597 to: &K,
1598 ) -> Result<(), TypedStoreError> {
1599 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1600 return Err(TypedStoreError::CrossDBBatch);
1601 }
1602
1603 let from_buf = be_fix_int_ser(from)?;
1604 let to_buf = be_fix_int_ser(to)?;
1605
1606 self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1607 Ok(())
1608 }
1609
1610 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1612 &mut self,
1613 db: &DBMap<K, V>,
1614 new_vals: impl IntoIterator<Item = (J, U)>,
1615 ) -> Result<&mut Self, TypedStoreError> {
1616 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1617 return Err(TypedStoreError::CrossDBBatch);
1618 }
1619 let mut total = 0usize;
1620 new_vals
1621 .into_iter()
1622 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1623 let k_buf = be_fix_int_ser(k.borrow())?;
1624 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1625 total += k_buf.len() + v_buf.len();
1626 self.batch.put_cf(&db.cf(), k_buf, v_buf);
1627 Ok(())
1628 })?;
1629 self.db_metrics
1630 .op_metrics
1631 .rocksdb_batch_put_bytes
1632 .with_label_values(&[&db.cf])
1633 .observe(total as f64);
1634 Ok(self)
1635 }
1636}
1637
1638pub struct DBTransaction<'a> {
1639 rocksdb: Arc<RocksDB>,
1640 transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1641}
1642
1643impl<'a> DBTransaction<'a> {
1644 pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1645 Ok(Self {
1646 rocksdb: db.clone(),
1647 transaction: db.transaction()?,
1648 })
1649 }
1650
1651 pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1652 Ok(Self {
1653 rocksdb: db.clone(),
1654 transaction: db.transaction_without_snapshot()?,
1655 })
1656 }
1657
1658 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1659 &mut self,
1660 db: &DBMap<K, V>,
1661 new_vals: impl IntoIterator<Item = (J, U)>,
1662 ) -> Result<&mut Self, TypedStoreError> {
1663 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1664 return Err(TypedStoreError::CrossDBBatch);
1665 }
1666
1667 new_vals
1668 .into_iter()
1669 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1670 let k_buf = be_fix_int_ser(k.borrow())?;
1671 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1672 self.transaction
1673 .put_cf(&db.cf(), k_buf, v_buf)
1674 .map_err(typed_store_err_from_rocks_err)?;
1675 Ok(())
1676 })?;
1677 Ok(self)
1678 }
1679
1680 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1682 &mut self,
1683 db: &DBMap<K, V>,
1684 purged_vals: impl IntoIterator<Item = J>,
1685 ) -> Result<&mut Self, TypedStoreError> {
1686 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1687 return Err(TypedStoreError::CrossDBBatch);
1688 }
1689 purged_vals
1690 .into_iter()
1691 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1692 let k_buf = be_fix_int_ser(k.borrow())?;
1693 self.transaction
1694 .delete_cf(&db.cf(), k_buf)
1695 .map_err(typed_store_err_from_rocks_err)?;
1696 Ok(())
1697 })?;
1698 Ok(self)
1699 }
1700
1701 pub fn snapshot(
1702 &self,
1703 ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1704 {
1705 self.transaction.snapshot()
1706 }
1707
1708 pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1709 &self,
1710 db: &DBMap<K, V>,
1711 key: &K,
1712 ) -> Result<Option<V>, TypedStoreError> {
1713 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1714 return Err(TypedStoreError::CrossDBBatch);
1715 }
1716 let k_buf = be_fix_int_ser(key)?;
1717 match self
1718 .transaction
1719 .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1720 .map_err(typed_store_err_from_rocks_err)?
1721 {
1722 Some(data) => Ok(Some(
1723 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1724 )),
1725 None => Ok(None),
1726 }
1727 }
1728
1729 #[instrument(level = "trace", skip_all)]
1730 pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1731 &self,
1732 db: &DBMap<K, V>,
1733 key: &K,
1734 ) -> Result<Option<V>, TypedStoreError> {
1735 let key_buf = be_fix_int_ser(key)?;
1736 self.transaction
1737 .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1738 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1739 .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1740 }
1741
1742 pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1743 &self,
1744 db: &DBMap<K, V>,
1745 keys: impl IntoIterator<Item = J>,
1746 ) -> Result<Vec<Option<V>>, TypedStoreError> {
1747 let cf = db.cf();
1748 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1749 .into_iter()
1750 .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1751 .collect();
1752
1753 let results = self
1754 .transaction
1755 .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1756
1757 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1758 .into_iter()
1759 .map(
1760 |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1761 Some(data) => Ok(Some(
1762 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1763 )),
1764 None => Ok(None),
1765 },
1766 )
1767 .collect();
1768
1769 values_parsed
1770 }
1771
1772 pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1773 &'a self,
1774 db: &DBMap<K, V>,
1775 ) -> Iter<'a, K, V> {
1776 let db_iter = self
1777 .transaction
1778 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1779 Iter::new(
1780 db.cf.clone(),
1781 RocksDBRawIter::OptimisticTransaction(db_iter),
1782 None,
1783 None,
1784 None,
1785 None,
1786 None,
1787 )
1788 }
1789
1790 #[instrument(level = "trace", skip_all)]
1791 pub fn commit(self) -> Result<(), TypedStoreError> {
1792 fail_point!("transaction-commit");
1793 self.transaction.commit().map_err(|e| match e.kind() {
1794 ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1797 _ => typed_store_err_from_rocks_err(e),
1798 })?;
1799 Ok(())
1800 }
1801}
1802
1803macro_rules! delegate_iter_call {
1804 ($self:ident.$method:ident($($args:ident),*)) => {
1805 match $self {
1806 Self::DB(db) => db.$method($($args),*),
1807 Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1808 Self::OptimisticTransaction(db) => db.$method($($args),*),
1809 }
1810 }
1811}
1812
1813pub enum RocksDBRawIter<'a> {
1814 DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1815 OptimisticTransactionDB(
1816 rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1817 ),
1818 OptimisticTransaction(
1819 rocksdb::DBRawIteratorWithThreadMode<
1820 'a,
1821 Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1822 >,
1823 ),
1824}
1825
1826impl RocksDBRawIter<'_> {
1827 pub fn valid(&self) -> bool {
1828 delegate_iter_call!(self.valid())
1829 }
1830 pub fn key(&self) -> Option<&[u8]> {
1831 delegate_iter_call!(self.key())
1832 }
1833 pub fn value(&self) -> Option<&[u8]> {
1834 delegate_iter_call!(self.value())
1835 }
1836 pub fn next(&mut self) {
1837 delegate_iter_call!(self.next())
1838 }
1839 pub fn prev(&mut self) {
1840 delegate_iter_call!(self.prev())
1841 }
1842 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1843 delegate_iter_call!(self.seek(key))
1844 }
1845 pub fn seek_to_last(&mut self) {
1846 delegate_iter_call!(self.seek_to_last())
1847 }
1848 pub fn seek_to_first(&mut self) {
1849 delegate_iter_call!(self.seek_to_first())
1850 }
1851 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1852 delegate_iter_call!(self.seek_for_prev(key))
1853 }
1854 pub fn status(&self) -> Result<(), rocksdb::Error> {
1855 delegate_iter_call!(self.status())
1856 }
1857}
1858
1859pub enum RocksDBIter<'a> {
1860 DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1861 OptimisticTransactionDB(
1862 rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1863 ),
1864}
1865
1866impl Iterator for RocksDBIter<'_> {
1867 type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1868 fn next(&mut self) -> Option<Self::Item> {
1869 match self {
1870 Self::DB(db) => db.next(),
1871 Self::OptimisticTransactionDB(db) => db.next(),
1872 }
1873 }
1874}
1875
1876impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1877where
1878 K: Serialize + DeserializeOwned,
1879 V: Serialize + DeserializeOwned,
1880{
1881 type Error = TypedStoreError;
1882 type Iterator = Iter<'a, K, V>;
1883 type SafeIterator = SafeIter<'a, K, V>;
1884
1885 #[instrument(level = "trace", skip_all, err)]
1886 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1887 let key_buf = be_fix_int_ser(key)?;
1888 let readopts = self.opts.readopts();
1891 Ok(self
1892 .rocksdb
1893 .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1894 && self
1895 .rocksdb
1896 .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1897 .map_err(typed_store_err_from_rocks_err)?
1898 .is_some())
1899 }
1900
1901 #[instrument(level = "trace", skip_all, err)]
1902 fn multi_contains_keys<J>(
1903 &self,
1904 keys: impl IntoIterator<Item = J>,
1905 ) -> Result<Vec<bool>, Self::Error>
1906 where
1907 J: Borrow<K>,
1908 {
1909 let values = self.multi_get_pinned(keys)?;
1910 Ok(values.into_iter().map(|v| v.is_some()).collect())
1911 }
1912
1913 #[instrument(level = "trace", skip_all, err)]
1914 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1915 let _timer = self
1916 .db_metrics
1917 .op_metrics
1918 .rocksdb_get_latency_seconds
1919 .with_label_values(&[&self.cf])
1920 .start_timer();
1921 let perf_ctx = if self.get_sample_interval.sample() {
1922 Some(RocksDBPerfContext)
1923 } else {
1924 None
1925 };
1926 let key_buf = be_fix_int_ser(key)?;
1927 let res = self
1928 .rocksdb
1929 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1930 .map_err(typed_store_err_from_rocks_err)?;
1931 self.db_metrics
1932 .op_metrics
1933 .rocksdb_get_bytes
1934 .with_label_values(&[&self.cf])
1935 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1936 if perf_ctx.is_some() {
1937 self.db_metrics
1938 .read_perf_ctx_metrics
1939 .report_metrics(&self.cf);
1940 }
1941 match res {
1942 Some(data) => Ok(Some(
1943 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1944 )),
1945 None => Ok(None),
1946 }
1947 }
1948
1949 #[instrument(level = "trace", skip_all, err)]
1950 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1951 let timer = self
1952 .db_metrics
1953 .op_metrics
1954 .rocksdb_put_latency_seconds
1955 .with_label_values(&[&self.cf])
1956 .start_timer();
1957 let perf_ctx = if self.write_sample_interval.sample() {
1958 Some(RocksDBPerfContext)
1959 } else {
1960 None
1961 };
1962 let key_buf = be_fix_int_ser(key)?;
1963 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1964 self.db_metrics
1965 .op_metrics
1966 .rocksdb_put_bytes
1967 .with_label_values(&[&self.cf])
1968 .observe((key_buf.len() + value_buf.len()) as f64);
1969 if perf_ctx.is_some() {
1970 self.db_metrics
1971 .write_perf_ctx_metrics
1972 .report_metrics(&self.cf);
1973 }
1974 self.rocksdb
1975 .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
1976 .map_err(typed_store_err_from_rocks_err)?;
1977
1978 let elapsed = timer.stop_and_record();
1979 if elapsed > 1.0 {
1980 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1981 self.db_metrics
1982 .op_metrics
1983 .rocksdb_very_slow_puts_count
1984 .with_label_values(&[&self.cf])
1985 .inc();
1986 self.db_metrics
1987 .op_metrics
1988 .rocksdb_very_slow_puts_duration_ms
1989 .with_label_values(&[&self.cf])
1990 .inc_by((elapsed * 1000.0) as u64);
1991 }
1992
1993 Ok(())
1994 }
1995
1996 #[instrument(level = "trace", skip_all, err)]
1997 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1998 let _timer = self
1999 .db_metrics
2000 .op_metrics
2001 .rocksdb_delete_latency_seconds
2002 .with_label_values(&[&self.cf])
2003 .start_timer();
2004 let perf_ctx = if self.write_sample_interval.sample() {
2005 Some(RocksDBPerfContext)
2006 } else {
2007 None
2008 };
2009 let key_buf = be_fix_int_ser(key)?;
2010 self.rocksdb
2011 .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2012 .map_err(typed_store_err_from_rocks_err)?;
2013 self.db_metrics
2014 .op_metrics
2015 .rocksdb_deletes
2016 .with_label_values(&[&self.cf])
2017 .inc();
2018 if perf_ctx.is_some() {
2019 self.db_metrics
2020 .write_perf_ctx_metrics
2021 .report_metrics(&self.cf);
2022 }
2023 Ok(())
2024 }
2025
2026 #[instrument(level = "trace", skip_all, err)]
2031 fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2032 let _ = self.rocksdb.drop_cf(&self.cf);
2033 self.rocksdb
2034 .create_cf(self.cf.clone(), &default_db_options().options)
2035 .map_err(typed_store_err_from_rocks_err)?;
2036 Ok(())
2037 }
2038
2039 #[instrument(level = "trace", skip_all, err)]
2048 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2049 let first_key = self.unbounded_iter().next().map(|(k, _v)| k);
2050 let last_key = self
2051 .reversed_safe_iter_with_bounds(None, None)?
2052 .next()
2053 .transpose()?
2054 .map(|(k, _v)| k);
2055 if let Some((first_key, last_key)) = first_key.zip(last_key) {
2056 let mut batch = self.batch();
2057 batch.schedule_delete_range(self, &first_key, &last_key)?;
2058 batch.write()?;
2059 }
2060 Ok(())
2061 }
2062
2063 fn is_empty(&self) -> bool {
2064 self.safe_iter().next().is_none()
2065 }
2066
2067 fn unbounded_iter(&'a self) -> Self::Iterator {
2070 let db_iter = self
2071 .rocksdb
2072 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2073 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2074 Iter::new(
2075 self.cf.clone(),
2076 db_iter,
2077 _timer,
2078 _perf_ctx,
2079 bytes_scanned,
2080 keys_scanned,
2081 Some(self.db_metrics.clone()),
2082 )
2083 }
2084
2085 fn iter_with_bounds(
2089 &'a self,
2090 lower_bound: Option<K>,
2091 upper_bound: Option<K>,
2092 ) -> Self::Iterator {
2093 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2094 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2095 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2096 Iter::new(
2097 self.cf.clone(),
2098 db_iter,
2099 _timer,
2100 _perf_ctx,
2101 bytes_scanned,
2102 keys_scanned,
2103 Some(self.db_metrics.clone()),
2104 )
2105 }
2106
2107 fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2110 let readopts = self.create_read_options_with_range(range);
2111 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2112 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2113 Iter::new(
2114 self.cf.clone(),
2115 db_iter,
2116 _timer,
2117 _perf_ctx,
2118 bytes_scanned,
2119 keys_scanned,
2120 Some(self.db_metrics.clone()),
2121 )
2122 }
2123
2124 fn safe_iter(&'a self) -> Self::SafeIterator {
2125 let db_iter = self
2126 .rocksdb
2127 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2128 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2129 SafeIter::new(
2130 self.cf.clone(),
2131 db_iter,
2132 _timer,
2133 _perf_ctx,
2134 bytes_scanned,
2135 keys_scanned,
2136 Some(self.db_metrics.clone()),
2137 )
2138 }
2139
2140 fn safe_iter_with_bounds(
2141 &'a self,
2142 lower_bound: Option<K>,
2143 upper_bound: Option<K>,
2144 ) -> Self::SafeIterator {
2145 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2146 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2147 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2148 SafeIter::new(
2149 self.cf.clone(),
2150 db_iter,
2151 _timer,
2152 _perf_ctx,
2153 bytes_scanned,
2154 keys_scanned,
2155 Some(self.db_metrics.clone()),
2156 )
2157 }
2158
2159 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2160 let readopts = self.create_read_options_with_range(range);
2161 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2162 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2163 SafeIter::new(
2164 self.cf.clone(),
2165 db_iter,
2166 _timer,
2167 _perf_ctx,
2168 bytes_scanned,
2169 keys_scanned,
2170 Some(self.db_metrics.clone()),
2171 )
2172 }
2173
2174 #[instrument(level = "trace", skip_all, err)]
2176 fn multi_get<J>(
2177 &self,
2178 keys: impl IntoIterator<Item = J>,
2179 ) -> Result<Vec<Option<V>>, TypedStoreError>
2180 where
2181 J: Borrow<K>,
2182 {
2183 let results = self.multi_get_pinned(keys)?;
2184 let values_parsed: Result<Vec<_>, TypedStoreError> = results
2185 .into_iter()
2186 .map(|value_byte| match value_byte {
2187 Some(data) => Ok(Some(
2188 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2189 )),
2190 None => Ok(None),
2191 })
2192 .collect();
2193
2194 values_parsed
2195 }
2196
2197 #[instrument(level = "trace", skip_all, err)]
2199 fn multi_insert<J, U>(
2200 &self,
2201 key_val_pairs: impl IntoIterator<Item = (J, U)>,
2202 ) -> Result<(), Self::Error>
2203 where
2204 J: Borrow<K>,
2205 U: Borrow<V>,
2206 {
2207 let mut batch = self.batch();
2208 batch.insert_batch(self, key_val_pairs)?;
2209 batch.write()
2210 }
2211
2212 #[instrument(level = "trace", skip_all, err)]
2214 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2215 where
2216 J: Borrow<K>,
2217 {
2218 let mut batch = self.batch();
2219 batch.delete_batch(self, keys)?;
2220 batch.write()
2221 }
2222
2223 #[instrument(level = "trace", skip_all, err)]
2225 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2226 self.rocksdb
2227 .try_catch_up_with_primary()
2228 .map_err(typed_store_err_from_rocks_err)
2229 }
2230}
2231
2232impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2233where
2234 J: Borrow<K>,
2235 U: Borrow<V>,
2236 K: Serialize,
2237 V: Serialize,
2238{
2239 type Error = TypedStoreError;
2240
2241 fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2242 where
2243 T: Iterator<Item = (J, U)>,
2244 {
2245 let mut batch = self.batch();
2246 batch.insert_batch(self, iter)?;
2247 batch.write()
2248 }
2249
2250 fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2251 let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2252 let mut batch = self.batch();
2253 batch.insert_batch(self, slice_of_refs)?;
2254 batch.write()
2255 }
2256}
2257
2258pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2259 env::var(var_name)
2260 .ok()?
2261 .parse::<usize>()
2262 .tap_err(|e| {
2263 warn!(
2264 "Env var {} does not contain valid usize integer: {}",
2265 var_name, e
2266 )
2267 })
2268 .ok()
2269}
2270
2271#[derive(Clone, Debug)]
2272pub struct ReadWriteOptions {
2273 pub ignore_range_deletions: bool,
2274 sync_to_disk: bool,
2276}
2277
2278impl ReadWriteOptions {
2279 pub fn readopts(&self) -> ReadOptions {
2280 let mut readopts = ReadOptions::default();
2281 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2282 readopts
2283 }
2284
2285 pub fn writeopts(&self) -> WriteOptions {
2286 let mut opts = WriteOptions::default();
2287 opts.set_sync(self.sync_to_disk);
2288 opts
2289 }
2290
2291 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2292 self.ignore_range_deletions = ignore;
2293 self
2294 }
2295}
2296
2297impl Default for ReadWriteOptions {
2298 fn default() -> Self {
2299 Self {
2300 ignore_range_deletions: true,
2301 sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2302 }
2303 }
2304}
2305#[derive(Default, Clone)]
2308pub struct DBOptions {
2309 pub options: rocksdb::Options,
2310 pub rw_options: ReadWriteOptions,
2311}
2312
2313impl DBOptions {
2314 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2318 self.options
2320 .optimize_for_point_lookup(block_cache_size_mb as u64);
2321 self
2322 }
2323
2324 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2327 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2328 info!("Large value blob storage optimization is disabled via env var.");
2329 return self;
2330 }
2331
2332 self.options.set_enable_blob_files(true);
2334 self.options
2335 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2336 self.options.set_enable_blob_gc(true);
2337 self.options.set_min_blob_size(min_blob_size);
2341
2342 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2344 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2345 * 1024
2346 * 1024;
2347 self.options.set_write_buffer_size(write_buffer_size);
2348 let target_file_size_base = 64 << 20;
2351 self.options
2352 .set_target_file_size_base(target_file_size_base);
2353 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2355 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2356 self.options
2357 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2358
2359 self
2360 }
2361
2362 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2364 self.options
2365 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2366 self
2367 }
2368
2369 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2371 self.options
2372 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2373 self.options
2374 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2375 self
2376 }
2377
2378 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2380 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2382 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2383 * 1024
2384 * 1024;
2385 self.options.set_write_buffer_size(write_buffer_size);
2386 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2388 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2389 self.options
2390 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2391 self.options
2393 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2394
2395 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2397 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2398 self.options.set_level_zero_file_num_compaction_trigger(
2399 max_level_zero_file_num.try_into().unwrap(),
2400 );
2401 self.options.set_level_zero_slowdown_writes_trigger(
2402 (max_level_zero_file_num * 12).try_into().unwrap(),
2403 );
2404 self.options
2405 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2406
2407 self.options.set_target_file_size_base(
2409 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2410 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2411 * 1024
2412 * 1024,
2413 );
2414
2415 self.options
2417 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2418
2419 self
2420 }
2421
2422 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2426 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2428 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2429 * 1024
2430 * 1024;
2431 self.options.set_write_buffer_size(write_buffer_size);
2432 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2434 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2435 self.options
2436 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2437 self.options
2439 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2440
2441 self.options
2443 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2444 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2445 compaction_options.set_max_size_amplification_percent(10000);
2446 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2447 self.options
2448 .set_universal_compaction_options(&compaction_options);
2449
2450 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2451 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2452 self.options.set_level_zero_file_num_compaction_trigger(
2453 max_level_zero_file_num.try_into().unwrap(),
2454 );
2455 self.options.set_level_zero_slowdown_writes_trigger(
2456 (max_level_zero_file_num * 12).try_into().unwrap(),
2457 );
2458 self.options
2459 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2460
2461 self.options.set_target_file_size_base(
2463 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2464 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2465 * 1024
2466 * 1024,
2467 );
2468
2469 self.options
2471 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2472
2473 self
2474 }
2475
2476 pub fn set_block_options(
2478 mut self,
2479 block_cache_size_mb: usize,
2480 block_size_bytes: usize,
2481 ) -> DBOptions {
2482 self.options
2483 .set_block_based_table_factory(&get_block_options(
2484 block_cache_size_mb,
2485 block_size_bytes,
2486 ));
2487 self
2488 }
2489
2490 pub fn disable_write_throttling(mut self) -> DBOptions {
2492 self.options.set_soft_pending_compaction_bytes_limit(0);
2493 self.options.set_hard_pending_compaction_bytes_limit(0);
2494 self
2495 }
2496}
2497
2498pub fn default_db_options() -> DBOptions {
2501 let mut opt = rocksdb::Options::default();
2502
2503 if let Some(limit) = fdlimit::raise_fd_limit() {
2507 opt.set_max_open_files((limit / 8) as i32);
2509 }
2510
2511 opt.set_table_cache_num_shard_bits(10);
2514
2515 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2517 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2518 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2519
2520 opt.set_db_write_buffer_size(
2532 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2533 * 1024
2534 * 1024,
2535 );
2536 opt.set_max_total_wal_size(
2537 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2538 );
2539
2540 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2542
2543 opt.set_enable_pipelined_write(true);
2544
2545 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2548
2549 opt.set_memtable_prefix_bloom_ratio(0.02);
2551
2552 DBOptions {
2553 options: opt,
2554 rw_options: ReadWriteOptions::default(),
2555 }
2556}
2557
2558fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2559 let mut block_options = BlockBasedOptions::default();
2564 block_options.set_block_size(block_size_bytes);
2566 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2568 block_options.set_bloom_filter(10.0, false);
2570 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2572 block_options
2573}
2574
2575#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2578pub fn open_cf<P: AsRef<Path>>(
2579 path: P,
2580 db_options: Option<rocksdb::Options>,
2581 metric_conf: MetricConf,
2582 opt_cfs: &[&str],
2583) -> Result<Arc<RocksDB>, TypedStoreError> {
2584 let options = db_options.unwrap_or_else(|| default_db_options().options);
2585 let column_descriptors: Vec<_> = opt_cfs
2586 .iter()
2587 .map(|name| (*name, options.clone()))
2588 .collect();
2589 open_cf_opts(
2590 path,
2591 Some(options.clone()),
2592 metric_conf,
2593 &column_descriptors[..],
2594 )
2595}
2596
2597fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2598 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2600 options.create_if_missing(true);
2601 options.create_missing_column_families(true);
2602 options
2603}
2604
2605#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2608pub fn open_cf_opts<P: AsRef<Path>>(
2609 path: P,
2610 db_options: Option<rocksdb::Options>,
2611 metric_conf: MetricConf,
2612 opt_cfs: &[(&str, rocksdb::Options)],
2613) -> Result<Arc<RocksDB>, TypedStoreError> {
2614 let path = path.as_ref();
2615 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2625 nondeterministic!({
2626 let options = prepare_db_options(db_options);
2627 let rocksdb = {
2628 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2629 &options,
2630 path,
2631 cfs.into_iter()
2632 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2633 )
2634 .map_err(typed_store_err_from_rocks_err)?
2635 };
2636 Ok(Arc::new(RocksDB::DBWithThreadMode(
2637 DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2638 )))
2639 })
2640}
2641
2642#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2645pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2646 path: P,
2647 db_options: Option<rocksdb::Options>,
2648 metric_conf: MetricConf,
2649 opt_cfs: &[(&str, rocksdb::Options)],
2650) -> Result<Arc<RocksDB>, TypedStoreError> {
2651 let path = path.as_ref();
2652 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2653 nondeterministic!({
2655 let options = prepare_db_options(db_options);
2656 let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2657 &options,
2658 path,
2659 cfs.into_iter()
2660 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2661 )
2662 .map_err(typed_store_err_from_rocks_err)?;
2663 Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2664 OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2665 )))
2666 })
2667}
2668
2669pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2672 primary_path: P,
2673 secondary_path: Option<P>,
2674 db_options: Option<rocksdb::Options>,
2675 metric_conf: MetricConf,
2676 opt_cfs: &[(&str, rocksdb::Options)],
2677) -> Result<Arc<RocksDB>, TypedStoreError> {
2678 let primary_path = primary_path.as_ref();
2679 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2680 nondeterministic!({
2682 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2684
2685 fdlimit::raise_fd_limit();
2686 options.set_max_open_files(-1);
2688
2689 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2690 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2691 .ok()
2692 .unwrap_or_default();
2693
2694 let default_db_options = default_db_options();
2695 for cf_key in cfs.iter() {
2697 if !opt_cfs.contains_key(&cf_key[..]) {
2698 opt_cfs.insert(cf_key, default_db_options.options.clone());
2699 }
2700 }
2701
2702 let primary_path = primary_path.to_path_buf();
2703 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2704 let mut s = primary_path.clone();
2705 s.pop();
2706 s.push("SECONDARY");
2707 s.as_path().to_path_buf()
2708 });
2709
2710 let rocksdb = {
2711 options.create_if_missing(true);
2712 options.create_missing_column_families(true);
2713 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2714 &options,
2715 &primary_path,
2716 &secondary_path,
2717 opt_cfs
2718 .iter()
2719 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2720 )
2721 .map_err(typed_store_err_from_rocks_err)?;
2722 db.try_catch_up_with_primary()
2723 .map_err(typed_store_err_from_rocks_err)?;
2724 db
2725 };
2726 Ok(Arc::new(RocksDB::DBWithThreadMode(
2727 DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2728 )))
2729 })
2730}
2731
2732pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2733 const DB_DEFAULT_CF_NAME: &str = "default";
2734
2735 let opts = rocksdb::Options::default();
2736 rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2737 .map_err(|e| e.into())
2738 .map(|q| {
2739 q.iter()
2740 .filter_map(|s| {
2741 if s != DB_DEFAULT_CF_NAME {
2743 Some(s.clone())
2744 } else {
2745 None
2746 }
2747 })
2748 .collect()
2749 })
2750}
2751
2752#[inline]
2754pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2755where
2756 S: ?Sized + serde::Serialize,
2757{
2758 bincode::DefaultOptions::new()
2759 .with_big_endian()
2760 .with_fixint_encoding()
2761 .serialize(t)
2762 .map_err(typed_store_err_from_bincode_err)
2763}
2764
2765#[derive(Clone)]
2766pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2767impl DBMapTableConfigMap {
2768 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2769 Self(map)
2770 }
2771
2772 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2773 self.0.clone()
2774 }
2775}
2776
2777pub enum RocksDBAccessType {
2778 Primary,
2779 Secondary(Option<PathBuf>),
2780}
2781
2782pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2783 rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2784}
2785
2786fn populate_missing_cfs(
2787 input_cfs: &[(&str, rocksdb::Options)],
2788 path: &Path,
2789) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2790 let mut cfs = vec![];
2791 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2792 let existing_cfs =
2793 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2794 .ok()
2795 .unwrap_or_default();
2796
2797 for cf_name in existing_cfs {
2798 if !input_cf_index.contains(&cf_name[..]) {
2799 cfs.push((cf_name, rocksdb::Options::default()));
2800 }
2801 }
2802 cfs.extend(
2803 input_cfs
2804 .iter()
2805 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2806 );
2807 Ok(cfs)
2808}
2809
2810fn big_endian_saturating_add_one(v: &mut [u8]) {
2814 if is_max(v) {
2815 return;
2816 }
2817 for i in (0..v.len()).rev() {
2818 if v[i] == u8::MAX {
2819 v[i] = 0;
2820 } else {
2821 v[i] += 1;
2822 break;
2823 }
2824 }
2825}
2826
2827fn is_max(v: &[u8]) -> bool {
2829 v.iter().all(|&x| x == u8::MAX)
2830}
2831
2832#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2835#[test]
2836fn test_helpers() {
2837 let v = vec![];
2838 assert!(is_max(&v));
2839
2840 fn check_add(v: Vec<u8>) {
2841 let mut v = v;
2842 let num = Num32::from_big_endian(&v);
2843 big_endian_saturating_add_one(&mut v);
2844 assert!(num + 1 == Num32::from_big_endian(&v));
2845 }
2846
2847 uint::construct_uint! {
2848 struct Num32(4);
2850 }
2851
2852 let mut v = vec![255; 32];
2853 big_endian_saturating_add_one(&mut v);
2854 assert!(Num32::MAX == Num32::from_big_endian(&v));
2855
2856 check_add(vec![1; 32]);
2857 check_add(vec![6; 32]);
2858 check_add(vec![254; 32]);
2859
2860 }