1pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod keys;
8pub(crate) mod safe_iter;
9pub mod util;
10pub(crate) mod values;
11
12use std::{
13 borrow::Borrow,
14 collections::{BTreeMap, HashSet},
15 env,
16 ffi::CStr,
17 marker::PhantomData,
18 ops::{Bound, RangeBounds},
19 path::{Path, PathBuf},
20 sync::Arc,
21 time::Duration,
22};
23
24use bincode::Options;
25use collectable::TryExtend;
26use iota_macros::{fail_point, nondeterministic};
27use itertools::Itertools;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::{
30 AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
31 ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
32 IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
33 ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
34 WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
35};
36use serde::{Serialize, de::DeserializeOwned};
37use tap::TapFallible;
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, instrument, warn};
40
41use self::{iter::Iter, keys::Keys, values::Values};
42use crate::{
43 TypedStoreError,
44 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
45 rocks::{
46 errors::{
47 typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
48 typed_store_err_from_rocks_err,
49 },
50 safe_iter::SafeIter,
51 },
52 traits::{Map, TableSummary},
53};
54
55const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
58const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
59
60const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
63const DEFAULT_DB_WAL_SIZE: usize = 1024;
64
65const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
68const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
69const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
70const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
71const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
72const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
73const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
74const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
75const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
76
77const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
79
80const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
81
82const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
85 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
86
87const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
88
89#[cfg(test)]
90mod tests;
91
92#[macro_export]
130macro_rules! reopen {
131 ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
132 (
133 $(
134 DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
135 ),*
136 )
137 };
138}
139
140#[macro_export]
144macro_rules! retry_transaction {
145 ($transaction:expr) => {
146 retry_transaction!($transaction, Some(20))
147 };
148
149 (
150 $transaction:expr,
151 $max_retries:expr $(,)?
153
154 ) => {{
155 use std::time::Duration;
156
157 use rand::{
158 distributions::{Distribution, Uniform},
159 rngs::ThreadRng,
160 };
161 use tracing::{error, info};
162
163 let mut retries = 0;
164 let max_retries = $max_retries;
165 loop {
166 let status = $transaction;
167 match status {
168 Err(TypedStoreError::RetryableTransaction) => {
169 retries += 1;
170 let delay = {
172 let mut rng = ThreadRng::default();
173 Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
174 };
175 if let Some(max_retries) = max_retries {
176 if retries > max_retries {
177 error!(?max_retries, "max retries exceeded");
178 break status;
179 }
180 }
181 if retries > 10 {
182 error!(?delay, ?retries, "excessive transaction retries...");
184 } else {
185 info!(
186 ?delay,
187 ?retries,
188 "transaction write conflict detected, sleeping"
189 );
190 }
191 std::thread::sleep(delay);
192 }
193 _ => break status,
194 }
195 }
196 }};
197}
198
199#[macro_export]
200macro_rules! retry_transaction_forever {
201 ($transaction:expr) => {
202 $crate::retry_transaction!($transaction, None)
203 };
204}
205
206#[derive(Debug)]
207pub struct DBWithThreadModeWrapper {
208 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
209 pub metric_conf: MetricConf,
210 pub db_path: PathBuf,
211}
212
213impl DBWithThreadModeWrapper {
214 fn new(
215 underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
216 metric_conf: MetricConf,
217 db_path: PathBuf,
218 ) -> Self {
219 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
220 Self {
221 underlying,
222 metric_conf,
223 db_path,
224 }
225 }
226}
227
228impl Drop for DBWithThreadModeWrapper {
229 fn drop(&mut self) {
230 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
231 }
232}
233
234#[derive(Debug)]
235pub struct OptimisticTransactionDBWrapper {
236 pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
237 pub metric_conf: MetricConf,
238 pub db_path: PathBuf,
239}
240
241impl OptimisticTransactionDBWrapper {
242 fn new(
243 underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
244 metric_conf: MetricConf,
245 db_path: PathBuf,
246 ) -> Self {
247 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
248 Self {
249 underlying,
250 metric_conf,
251 db_path,
252 }
253 }
254}
255
256impl Drop for OptimisticTransactionDBWrapper {
257 fn drop(&mut self) {
258 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
259 }
260}
261
262#[derive(Debug)]
264pub enum RocksDB {
265 DBWithThreadMode(DBWithThreadModeWrapper),
266 OptimisticTransactionDB(OptimisticTransactionDBWrapper),
267}
268
269macro_rules! delegate_call {
270 ($self:ident.$method:ident($($args:ident),*)) => {
271 match $self {
272 Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
273 Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
274 }
275 }
276}
277
278impl Drop for RocksDB {
279 fn drop(&mut self) {
280 delegate_call!(self.cancel_all_background_work(true))
281 }
282}
283
284impl RocksDB {
285 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
286 delegate_call!(self.get(key))
287 }
288
289 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
290 &'a self,
291 keys: I,
292 readopts: &ReadOptions,
293 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
294 where
295 K: AsRef<[u8]>,
296 I: IntoIterator<Item = (&'b W, K)>,
297 W: 'b + AsColumnFamilyRef,
298 {
299 delegate_call!(self.multi_get_cf_opt(keys, readopts))
300 }
301
302 pub fn batched_multi_get_cf_opt<I, K>(
303 &self,
304 cf: &impl AsColumnFamilyRef,
305 keys: I,
306 sorted_input: bool,
307 readopts: &ReadOptions,
308 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
309 where
310 I: IntoIterator<Item = K>,
311 K: AsRef<[u8]>,
312 {
313 delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
314 }
315
316 pub fn property_int_value_cf(
317 &self,
318 cf: &impl AsColumnFamilyRef,
319 name: impl CStrLike,
320 ) -> Result<Option<u64>, rocksdb::Error> {
321 delegate_call!(self.property_int_value_cf(cf, name))
322 }
323
324 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
325 &self,
326 cf: &impl AsColumnFamilyRef,
327 key: K,
328 readopts: &ReadOptions,
329 ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
330 delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
331 }
332
333 pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
334 delegate_call!(self.cf_handle(name))
335 }
336
337 pub fn create_cf<N: AsRef<str>>(
338 &self,
339 name: N,
340 opts: &rocksdb::Options,
341 ) -> Result<(), rocksdb::Error> {
342 delegate_call!(self.create_cf(name, opts))
343 }
344
345 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
346 delegate_call!(self.drop_cf(name))
347 }
348
349 pub fn delete_file_in_range<K: AsRef<[u8]>>(
350 &self,
351 cf: &impl AsColumnFamilyRef,
352 from: K,
353 to: K,
354 ) -> Result<(), rocksdb::Error> {
355 delegate_call!(self.delete_file_in_range_cf(cf, from, to))
356 }
357
358 pub fn delete_cf<K: AsRef<[u8]>>(
359 &self,
360 cf: &impl AsColumnFamilyRef,
361 key: K,
362 writeopts: &WriteOptions,
363 ) -> Result<(), rocksdb::Error> {
364 fail_point!("delete-cf-before");
365 let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
366 fail_point!("delete-cf-after");
367 #[expect(clippy::let_and_return)]
368 ret
369 }
370
371 pub fn path(&self) -> &Path {
372 delegate_call!(self.path())
373 }
374
375 pub fn put_cf<K, V>(
376 &self,
377 cf: &impl AsColumnFamilyRef,
378 key: K,
379 value: V,
380 writeopts: &WriteOptions,
381 ) -> Result<(), rocksdb::Error>
382 where
383 K: AsRef<[u8]>,
384 V: AsRef<[u8]>,
385 {
386 fail_point!("put-cf-before");
387 let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
388 fail_point!("put-cf-after");
389 #[expect(clippy::let_and_return)]
390 ret
391 }
392
393 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
394 &self,
395 cf: &impl AsColumnFamilyRef,
396 key: K,
397 readopts: &ReadOptions,
398 ) -> bool {
399 delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
400 }
401
402 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
403 delegate_call!(self.try_catch_up_with_primary())
404 }
405
406 pub fn write(
407 &self,
408 batch: RocksDBBatch,
409 writeopts: &WriteOptions,
410 ) -> Result<(), TypedStoreError> {
411 fail_point!("batch-write-before");
412 let ret = match (self, batch) {
413 (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
414 db.underlying
415 .write_opt(batch, writeopts)
416 .map_err(typed_store_err_from_rocks_err)?;
417 Ok(())
418 }
419 (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
420 db.underlying
421 .write_opt(batch, writeopts)
422 .map_err(typed_store_err_from_rocks_err)?;
423 Ok(())
424 }
425 _ => Err(TypedStoreError::RocksDB(
426 "using invalid batch type for the database".to_string(),
427 )),
428 };
429 fail_point!("batch-write-after");
430 #[expect(clippy::let_and_return)]
431 ret
432 }
433
434 pub fn transaction_without_snapshot(
435 &self,
436 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
437 match self {
438 Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
439 Self::DBWithThreadMode(_) => panic!(),
440 }
441 }
442
443 pub fn transaction(
444 &self,
445 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
446 match self {
447 Self::OptimisticTransactionDB(db) => {
448 let mut tx_opts = OptimisticTransactionOptions::new();
449 tx_opts.set_snapshot(true);
450
451 Ok(db
452 .underlying
453 .transaction_opt(&WriteOptions::default(), &tx_opts))
454 }
455 Self::DBWithThreadMode(_) => panic!(),
456 }
457 }
458
459 pub fn raw_iterator_cf<'a: 'b, 'b>(
460 &'a self,
461 cf_handle: &impl AsColumnFamilyRef,
462 readopts: ReadOptions,
463 ) -> RocksDBRawIter<'b> {
464 match self {
465 Self::DBWithThreadMode(db) => {
466 RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
467 }
468 Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
469 db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
470 ),
471 }
472 }
473
474 pub fn iterator_cf<'a: 'b, 'b>(
475 &'a self,
476 cf_handle: &impl AsColumnFamilyRef,
477 readopts: ReadOptions,
478 mode: IteratorMode<'_>,
479 ) -> RocksDBIter<'b> {
480 match self {
481 Self::DBWithThreadMode(db) => {
482 RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
483 }
484 Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
485 db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
486 ),
487 }
488 }
489
490 pub fn compact_range_cf<K: AsRef<[u8]>>(
491 &self,
492 cf: &impl AsColumnFamilyRef,
493 start: Option<K>,
494 end: Option<K>,
495 ) {
496 delegate_call!(self.compact_range_cf(cf, start, end))
497 }
498
499 pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
500 &self,
501 cf: &impl AsColumnFamilyRef,
502 start: Option<K>,
503 end: Option<K>,
504 ) {
505 let opt = &mut CompactOptions::default();
506 opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
507 delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
508 }
509
510 pub fn flush(&self) -> Result<(), TypedStoreError> {
511 delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
512 }
513
514 pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
515 match self {
516 Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
517 Self::OptimisticTransactionDB(d) => {
518 RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
519 }
520 }
521 }
522
523 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
524 let checkpoint = match self {
525 Self::DBWithThreadMode(d) => {
526 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
527 }
528 Self::OptimisticTransactionDB(d) => {
529 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
530 }
531 };
532 checkpoint
533 .create_checkpoint(path)
534 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
535 Ok(())
536 }
537
538 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
539 delegate_call!(self.flush_cf(cf))
540 }
541
542 pub fn set_options_cf(
543 &self,
544 cf: &impl AsColumnFamilyRef,
545 opts: &[(&str, &str)],
546 ) -> Result<(), rocksdb::Error> {
547 delegate_call!(self.set_options_cf(cf, opts))
548 }
549
550 pub fn get_sampling_interval(&self) -> SamplingInterval {
551 match self {
552 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
553 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
554 }
555 }
556
557 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
558 match self {
559 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
560 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
561 }
562 }
563
564 pub fn write_sampling_interval(&self) -> SamplingInterval {
565 match self {
566 Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
567 Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
568 }
569 }
570
571 pub fn iter_sampling_interval(&self) -> SamplingInterval {
572 match self {
573 Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
574 Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
575 }
576 }
577
578 pub fn db_name(&self) -> String {
579 let name = match self {
580 Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
581 Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
582 };
583 if name.is_empty() {
584 self.default_db_name()
585 } else {
586 name.clone()
587 }
588 }
589
590 fn default_db_name(&self) -> String {
591 self.path()
592 .file_name()
593 .and_then(|f| f.to_str())
594 .unwrap_or("unknown")
595 .to_string()
596 }
597
598 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
599 delegate_call!(self.live_files())
600 }
601}
602
603pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
606 let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
607
608 db.get(DB_CORRUPTED_KEY)
609 .map_err(|e| format!("Failed to open database: {e}"))
610 .and_then(|value| match value {
611 Some(v) if v[0] == 1 => Err(
612 "Database is corrupted, please remove the current database and start clean!"
613 .to_string(),
614 ),
615 Some(_) => Ok(()),
616 None => db
617 .put(DB_CORRUPTED_KEY, [1])
618 .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
619 })?;
620
621 Ok(())
622}
623
624pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
625 rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
626}
627
628pub enum RocksDBSnapshot<'a> {
629 DBWithThreadMode(rocksdb::Snapshot<'a>),
630 OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
631}
632
633impl<'a> RocksDBSnapshot<'a> {
634 pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
635 &'a self,
636 keys: I,
637 readopts: ReadOptions,
638 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
639 where
640 K: AsRef<[u8]>,
641 I: IntoIterator<Item = (&'b W, K)>,
642 W: 'b + AsColumnFamilyRef,
643 {
644 match self {
645 Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
646 Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
647 }
648 }
649 pub fn multi_get_cf<'b: 'a, K, I, W>(
650 &'a self,
651 keys: I,
652 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
653 where
654 K: AsRef<[u8]>,
655 I: IntoIterator<Item = (&'b W, K)>,
656 W: 'b + AsColumnFamilyRef,
657 {
658 match self {
659 Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
660 Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
661 }
662 }
663}
664
665pub enum RocksDBBatch {
666 Regular(rocksdb::WriteBatch),
667 Transactional(rocksdb::WriteBatchWithTransaction<true>),
668}
669
670macro_rules! delegate_batch_call {
671 ($self:ident.$method:ident($($args:ident),*)) => {
672 match $self {
673 Self::Regular(b) => b.$method($($args),*),
674 Self::Transactional(b) => b.$method($($args),*),
675 }
676 }
677}
678
679impl RocksDBBatch {
680 fn size_in_bytes(&self) -> usize {
681 delegate_batch_call!(self.size_in_bytes())
682 }
683
684 pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
685 delegate_batch_call!(self.delete_cf(cf, key))
686 }
687
688 pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
689 where
690 K: AsRef<[u8]>,
691 V: AsRef<[u8]>,
692 {
693 delegate_batch_call!(self.put_cf(cf, key, value))
694 }
695
696 pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
697 where
698 K: AsRef<[u8]>,
699 V: AsRef<[u8]>,
700 {
701 delegate_batch_call!(self.merge_cf(cf, key, value))
702 }
703
704 pub fn delete_range_cf<K: AsRef<[u8]>>(
705 &mut self,
706 cf: &impl AsColumnFamilyRef,
707 from: K,
708 to: K,
709 ) -> Result<(), TypedStoreError> {
710 match self {
711 Self::Regular(batch) => {
712 batch.delete_range_cf(cf, from, to);
713 Ok(())
714 }
715 Self::Transactional(_) => panic!(),
716 }
717 }
718}
719
720#[derive(Debug, Default)]
721pub struct MetricConf {
722 pub db_name: String,
723 pub read_sample_interval: SamplingInterval,
724 pub write_sample_interval: SamplingInterval,
725 pub iter_sample_interval: SamplingInterval,
726}
727
728impl MetricConf {
729 pub fn new(db_name: &str) -> Self {
730 if db_name.is_empty() {
731 error!("A meaningful db name should be used for metrics reporting.")
732 }
733 Self {
734 db_name: db_name.to_string(),
735 read_sample_interval: SamplingInterval::default(),
736 write_sample_interval: SamplingInterval::default(),
737 iter_sample_interval: SamplingInterval::default(),
738 }
739 }
740
741 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
742 Self {
743 db_name: self.db_name,
744 read_sample_interval: read_interval,
745 write_sample_interval: SamplingInterval::default(),
746 iter_sample_interval: SamplingInterval::default(),
747 }
748 }
749}
750const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
751const METRICS_ERROR: i64 = -1;
752
753#[derive(Clone, Debug)]
755pub struct DBMap<K, V> {
756 pub rocksdb: Arc<RocksDB>,
757 _phantom: PhantomData<fn(K) -> V>,
758 cf: String,
760 pub opts: ReadWriteOptions,
761 db_metrics: Arc<DBMetrics>,
762 get_sample_interval: SamplingInterval,
763 multiget_sample_interval: SamplingInterval,
764 write_sample_interval: SamplingInterval,
765 iter_sample_interval: SamplingInterval,
766 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
767}
768
769unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
770
771impl<K, V> DBMap<K, V> {
772 pub(crate) fn new(
773 db: Arc<RocksDB>,
774 opts: &ReadWriteOptions,
775 opt_cf: &str,
776 is_deprecated: bool,
777 ) -> Self {
778 let db_cloned = db.clone();
779 let db_metrics = DBMetrics::get();
780 let db_metrics_cloned = db_metrics.clone();
781 let cf = opt_cf.to_string();
782 let (sender, mut recv) = tokio::sync::oneshot::channel();
783 if !is_deprecated {
784 tokio::task::spawn(async move {
785 let mut interval =
786 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
787 loop {
788 tokio::select! {
789 _ = interval.tick() => {
790 let db = db_cloned.clone();
791 let cf = cf.clone();
792 let db_metrics = db_metrics.clone();
793 if let Err(e) = tokio::task::spawn_blocking(move || {
794 Self::report_metrics(&db, &cf, &db_metrics);
795 }).await {
796 error!("Failed to log metrics with error: {}", e);
797 }
798 }
799 _ = &mut recv => break,
800 }
801 }
802 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
803 });
804 }
805 DBMap {
806 rocksdb: db.clone(),
807 opts: opts.clone(),
808 _phantom: PhantomData,
809 cf: opt_cf.to_string(),
810 db_metrics: db_metrics_cloned,
811 _metrics_task_cancel_handle: Arc::new(sender),
812 get_sample_interval: db.get_sampling_interval(),
813 multiget_sample_interval: db.multiget_sampling_interval(),
814 write_sample_interval: db.write_sampling_interval(),
815 iter_sample_interval: db.iter_sampling_interval(),
816 }
817 }
818
819 #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
826 pub fn open<P: AsRef<Path>>(
827 path: P,
828 metric_conf: MetricConf,
829 db_options: Option<rocksdb::Options>,
830 opt_cf: Option<&str>,
831 rw_options: &ReadWriteOptions,
832 ) -> Result<Self, TypedStoreError> {
833 let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
834 let cfs = vec![cf_key];
835 let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
836 Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
837 }
838
839 #[instrument(level = "debug", skip(db), err)]
879 pub fn reopen(
880 db: &Arc<RocksDB>,
881 opt_cf: Option<&str>,
882 rw_options: &ReadWriteOptions,
883 is_deprecated: bool,
884 ) -> Result<Self, TypedStoreError> {
885 let cf_key = opt_cf
886 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
887 .to_owned();
888
889 db.cf_handle(&cf_key)
890 .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
891
892 Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
893 }
894
895 pub fn batch(&self) -> DBBatch {
896 let batch = match *self.rocksdb {
897 RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
898 RocksDB::OptimisticTransactionDB(_) => {
899 RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
900 }
901 };
902 DBBatch::new(
903 &self.rocksdb,
904 batch,
905 self.opts.writeopts(),
906 &self.db_metrics,
907 &self.write_sample_interval,
908 )
909 }
910
911 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
912 let from_buf = be_fix_int_ser(start)?;
913 let to_buf = be_fix_int_ser(end)?;
914 self.rocksdb
915 .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
916 Ok(())
917 }
918
919 pub fn compact_range_raw(
920 &self,
921 cf_name: &str,
922 start: Vec<u8>,
923 end: Vec<u8>,
924 ) -> Result<(), TypedStoreError> {
925 let cf = self
926 .rocksdb
927 .cf_handle(cf_name)
928 .expect("compact range: column family does not exist");
929 self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
930 Ok(())
931 }
932
933 pub fn compact_range_to_bottom<J: Serialize>(
934 &self,
935 start: &J,
936 end: &J,
937 ) -> Result<(), TypedStoreError> {
938 let from_buf = be_fix_int_ser(start)?;
939 let to_buf = be_fix_int_ser(end)?;
940 self.rocksdb
941 .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
942 Ok(())
943 }
944
945 pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
946 self.rocksdb
947 .cf_handle(&self.cf)
948 .expect("Map-keying column family should have been checked at DB creation")
949 }
950
951 pub fn iterator_cf(&self) -> RocksDBIter<'_> {
952 self.rocksdb
953 .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
954 }
955
956 pub fn flush(&self) -> Result<(), TypedStoreError> {
957 self.rocksdb
958 .flush_cf(&self.cf())
959 .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
960 }
961
962 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
963 self.rocksdb.set_options_cf(&self.cf(), opts)
964 }
965
966 fn get_int_property(
967 rocksdb: &RocksDB,
968 cf: &impl AsColumnFamilyRef,
969 property_name: &std::ffi::CStr,
970 ) -> Result<i64, TypedStoreError> {
971 match rocksdb.property_int_value_cf(cf, property_name) {
972 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
973 Ok(None) => Ok(0),
974 Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
975 }
976 }
977
978 fn multi_get_pinned<J>(
980 &self,
981 keys: impl IntoIterator<Item = J>,
982 ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
983 where
984 J: Borrow<K>,
985 K: Serialize,
986 {
987 let _timer = self
988 .db_metrics
989 .op_metrics
990 .rocksdb_multiget_latency_seconds
991 .with_label_values(&[&self.cf])
992 .start_timer();
993 let perf_ctx = if self.multiget_sample_interval.sample() {
994 Some(RocksDBPerfContext)
995 } else {
996 None
997 };
998 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
999 .into_iter()
1000 .map(|k| be_fix_int_ser(k.borrow()))
1001 .collect();
1002 let results: Result<Vec<_>, TypedStoreError> = self
1003 .rocksdb
1004 .batched_multi_get_cf_opt(
1005 &self.cf(),
1006 keys_bytes?,
1007 false,
1009 &self.opts.readopts(),
1010 )
1011 .into_iter()
1012 .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
1013 .collect();
1014 let entries = results?;
1015 let entry_size = entries
1016 .iter()
1017 .flatten()
1018 .map(|entry| entry.len())
1019 .sum::<usize>();
1020 self.db_metrics
1021 .op_metrics
1022 .rocksdb_multiget_bytes
1023 .with_label_values(&[&self.cf])
1024 .observe(entry_size as f64);
1025 if perf_ctx.is_some() {
1026 self.db_metrics
1027 .read_perf_ctx_metrics
1028 .report_metrics(&self.cf);
1029 }
1030 Ok(entries)
1031 }
1032
1033 fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
1034 let Some(cf) = rocksdb.cf_handle(cf_name) else {
1035 tracing::warn!(
1036 "unable to report metrics for cf {cf_name:?} in db {:?}",
1037 rocksdb.db_name()
1038 );
1039 return;
1040 };
1041
1042 db_metrics
1043 .cf_metrics
1044 .rocksdb_total_sst_files_size
1045 .with_label_values(&[cf_name])
1046 .set(
1047 Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1048 .unwrap_or(METRICS_ERROR),
1049 );
1050 db_metrics
1051 .cf_metrics
1052 .rocksdb_total_blob_files_size
1053 .with_label_values(&[cf_name])
1054 .set(
1055 Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1056 .unwrap_or(METRICS_ERROR),
1057 );
1058 let total_num_files: i64 = (0..=6)
1062 .map(|level| {
1063 Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
1064 .unwrap_or(METRICS_ERROR)
1065 })
1066 .sum();
1067 db_metrics
1068 .cf_metrics
1069 .rocksdb_total_num_files
1070 .with_label_values(&[cf_name])
1071 .set(total_num_files);
1072 db_metrics
1073 .cf_metrics
1074 .rocksdb_num_level0_files
1075 .with_label_values(&[cf_name])
1076 .set(
1077 Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
1078 .unwrap_or(METRICS_ERROR),
1079 );
1080 db_metrics
1081 .cf_metrics
1082 .rocksdb_current_size_active_mem_tables
1083 .with_label_values(&[cf_name])
1084 .set(
1085 Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1086 .unwrap_or(METRICS_ERROR),
1087 );
1088 db_metrics
1089 .cf_metrics
1090 .rocksdb_size_all_mem_tables
1091 .with_label_values(&[cf_name])
1092 .set(
1093 Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1094 .unwrap_or(METRICS_ERROR),
1095 );
1096 db_metrics
1097 .cf_metrics
1098 .rocksdb_num_snapshots
1099 .with_label_values(&[cf_name])
1100 .set(
1101 Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1102 .unwrap_or(METRICS_ERROR),
1103 );
1104 db_metrics
1105 .cf_metrics
1106 .rocksdb_oldest_snapshot_time
1107 .with_label_values(&[cf_name])
1108 .set(
1109 Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1110 .unwrap_or(METRICS_ERROR),
1111 );
1112 db_metrics
1113 .cf_metrics
1114 .rocksdb_actual_delayed_write_rate
1115 .with_label_values(&[cf_name])
1116 .set(
1117 Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1118 .unwrap_or(METRICS_ERROR),
1119 );
1120 db_metrics
1121 .cf_metrics
1122 .rocksdb_is_write_stopped
1123 .with_label_values(&[cf_name])
1124 .set(
1125 Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1126 .unwrap_or(METRICS_ERROR),
1127 );
1128 db_metrics
1129 .cf_metrics
1130 .rocksdb_block_cache_capacity
1131 .with_label_values(&[cf_name])
1132 .set(
1133 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1134 .unwrap_or(METRICS_ERROR),
1135 );
1136 db_metrics
1137 .cf_metrics
1138 .rocksdb_block_cache_usage
1139 .with_label_values(&[cf_name])
1140 .set(
1141 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1142 .unwrap_or(METRICS_ERROR),
1143 );
1144 db_metrics
1145 .cf_metrics
1146 .rocksdb_block_cache_pinned_usage
1147 .with_label_values(&[cf_name])
1148 .set(
1149 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1150 .unwrap_or(METRICS_ERROR),
1151 );
1152 db_metrics
1153 .cf_metrics
1154 .rocksdb_estimate_table_readers_mem
1155 .with_label_values(&[cf_name])
1156 .set(
1157 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1158 .unwrap_or(METRICS_ERROR),
1159 );
1160 db_metrics
1161 .cf_metrics
1162 .rocksdb_estimated_num_keys
1163 .with_label_values(&[cf_name])
1164 .set(
1165 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1166 .unwrap_or(METRICS_ERROR),
1167 );
1168 db_metrics
1169 .cf_metrics
1170 .rocksdb_num_immutable_mem_tables
1171 .with_label_values(&[cf_name])
1172 .set(
1173 Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1174 .unwrap_or(METRICS_ERROR),
1175 );
1176 db_metrics
1177 .cf_metrics
1178 .rocksdb_mem_table_flush_pending
1179 .with_label_values(&[cf_name])
1180 .set(
1181 Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1182 .unwrap_or(METRICS_ERROR),
1183 );
1184 db_metrics
1185 .cf_metrics
1186 .rocksdb_compaction_pending
1187 .with_label_values(&[cf_name])
1188 .set(
1189 Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1190 .unwrap_or(METRICS_ERROR),
1191 );
1192 db_metrics
1193 .cf_metrics
1194 .rocksdb_estimate_pending_compaction_bytes
1195 .with_label_values(&[cf_name])
1196 .set(
1197 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1198 .unwrap_or(METRICS_ERROR),
1199 );
1200 db_metrics
1201 .cf_metrics
1202 .rocksdb_num_running_compactions
1203 .with_label_values(&[cf_name])
1204 .set(
1205 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1206 .unwrap_or(METRICS_ERROR),
1207 );
1208 db_metrics
1209 .cf_metrics
1210 .rocksdb_num_running_flushes
1211 .with_label_values(&[cf_name])
1212 .set(
1213 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1214 .unwrap_or(METRICS_ERROR),
1215 );
1216 db_metrics
1217 .cf_metrics
1218 .rocksdb_estimate_oldest_key_time
1219 .with_label_values(&[cf_name])
1220 .set(
1221 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1222 .unwrap_or(METRICS_ERROR),
1223 );
1224 db_metrics
1225 .cf_metrics
1226 .rocksdb_background_errors
1227 .with_label_values(&[cf_name])
1228 .set(
1229 Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1230 .unwrap_or(METRICS_ERROR),
1231 );
1232 db_metrics
1233 .cf_metrics
1234 .rocksdb_base_level
1235 .with_label_values(&[cf_name])
1236 .set(
1237 Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1238 .unwrap_or(METRICS_ERROR),
1239 );
1240 }
1241
1242 pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1243 DBTransaction::new(&self.rocksdb)
1244 }
1245
1246 pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1247 DBTransaction::new_without_snapshot(&self.rocksdb)
1248 }
1249
1250 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1251 self.rocksdb.checkpoint(path)
1252 }
1253
1254 pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1255 Ok(self.rocksdb.snapshot())
1256 }
1257
1258 pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1259 let mut num_keys = 0;
1260 let mut key_bytes_total = 0;
1261 let mut value_bytes_total = 0;
1262 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1263 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1264 let iter = self.iterator_cf().map(Result::unwrap);
1265 for (key, value) in iter {
1266 num_keys += 1;
1267 key_bytes_total += key.len();
1268 value_bytes_total += value.len();
1269 key_hist.record(key.len() as u64)?;
1270 value_hist.record(value.len() as u64)?;
1271 }
1272 Ok(TableSummary {
1273 num_keys,
1274 key_bytes_total,
1275 value_bytes_total,
1276 key_hist,
1277 value_hist,
1278 })
1279 }
1280
1281 fn create_iter_context(
1283 &self,
1284 ) -> (
1285 Option<HistogramTimer>,
1286 Option<Histogram>,
1287 Option<Histogram>,
1288 Option<RocksDBPerfContext>,
1289 ) {
1290 let timer = self
1291 .db_metrics
1292 .op_metrics
1293 .rocksdb_iter_latency_seconds
1294 .with_label_values(&[&self.cf])
1295 .start_timer();
1296 let bytes_scanned = self
1297 .db_metrics
1298 .op_metrics
1299 .rocksdb_iter_bytes
1300 .with_label_values(&[&self.cf]);
1301 let keys_scanned = self
1302 .db_metrics
1303 .op_metrics
1304 .rocksdb_iter_keys
1305 .with_label_values(&[&self.cf]);
1306 let perf_ctx = if self.iter_sample_interval.sample() {
1307 Some(RocksDBPerfContext)
1308 } else {
1309 None
1310 };
1311 (
1312 Some(timer),
1313 Some(bytes_scanned),
1314 Some(keys_scanned),
1315 perf_ctx,
1316 )
1317 }
1318
1319 fn create_read_options_with_bounds(
1322 &self,
1323 lower_bound: Option<K>,
1324 upper_bound: Option<K>,
1325 ) -> ReadOptions
1326 where
1327 K: Serialize,
1328 {
1329 let mut readopts = self.opts.readopts();
1330 if let Some(lower_bound) = lower_bound {
1331 let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1332 readopts.set_iterate_lower_bound(key_buf);
1333 }
1334 if let Some(upper_bound) = upper_bound {
1335 let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1336 readopts.set_iterate_upper_bound(key_buf);
1337 }
1338 readopts
1339 }
1340
1341 fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1344 where
1345 K: Serialize,
1346 {
1347 let mut readopts = self.opts.readopts();
1348
1349 let lower_bound = range.start_bound();
1350 let upper_bound = range.end_bound();
1351
1352 match lower_bound {
1353 Bound::Included(lower_bound) => {
1354 let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1356 readopts.set_iterate_lower_bound(key_buf);
1357 }
1358 Bound::Excluded(lower_bound) => {
1359 let mut key_buf =
1360 be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1361
1362 big_endian_saturating_add_one(&mut key_buf);
1364 readopts.set_iterate_lower_bound(key_buf);
1365 }
1366 Bound::Unbounded => (),
1367 };
1368
1369 match upper_bound {
1370 Bound::Included(upper_bound) => {
1371 let mut key_buf =
1372 be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1373
1374 if !is_max(&key_buf) {
1377 big_endian_saturating_add_one(&mut key_buf);
1379 readopts.set_iterate_upper_bound(key_buf);
1380 }
1381 }
1382 Bound::Excluded(upper_bound) => {
1383 let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1385 readopts.set_iterate_upper_bound(key_buf);
1386 }
1387 Bound::Unbounded => (),
1388 };
1389
1390 readopts
1391 }
1392}
1393
1394pub struct DBBatch {
1461 rocksdb: Arc<RocksDB>,
1462 batch: RocksDBBatch,
1463 opts: WriteOptions,
1464 db_metrics: Arc<DBMetrics>,
1465 write_sample_interval: SamplingInterval,
1466}
1467
1468impl DBBatch {
1469 pub fn new(
1473 dbref: &Arc<RocksDB>,
1474 batch: RocksDBBatch,
1475 opts: WriteOptions,
1476 db_metrics: &Arc<DBMetrics>,
1477 write_sample_interval: &SamplingInterval,
1478 ) -> Self {
1479 DBBatch {
1480 rocksdb: dbref.clone(),
1481 batch,
1482 opts,
1483 db_metrics: db_metrics.clone(),
1484 write_sample_interval: write_sample_interval.clone(),
1485 }
1486 }
1487
1488 #[instrument(level = "trace", skip_all, err)]
1490 pub fn write(self) -> Result<(), TypedStoreError> {
1491 let db_name = self.rocksdb.db_name();
1492 let timer = self
1493 .db_metrics
1494 .op_metrics
1495 .rocksdb_batch_commit_latency_seconds
1496 .with_label_values(&[&db_name])
1497 .start_timer();
1498 let batch_size = self.batch.size_in_bytes();
1499
1500 let perf_ctx = if self.write_sample_interval.sample() {
1501 Some(RocksDBPerfContext)
1502 } else {
1503 None
1504 };
1505 self.rocksdb.write(self.batch, &self.opts)?;
1506 self.db_metrics
1507 .op_metrics
1508 .rocksdb_batch_commit_bytes
1509 .with_label_values(&[&db_name])
1510 .observe(batch_size as f64);
1511
1512 if perf_ctx.is_some() {
1513 self.db_metrics
1514 .write_perf_ctx_metrics
1515 .report_metrics(&db_name);
1516 }
1517 let elapsed = timer.stop_and_record();
1518 if elapsed > 1.0 {
1519 warn!(?elapsed, ?db_name, "very slow batch write");
1520 self.db_metrics
1521 .op_metrics
1522 .rocksdb_very_slow_batch_writes_count
1523 .with_label_values(&[&db_name])
1524 .inc();
1525 self.db_metrics
1526 .op_metrics
1527 .rocksdb_very_slow_batch_writes_duration_ms
1528 .with_label_values(&[&db_name])
1529 .inc_by((elapsed * 1000.0) as u64);
1530 }
1531 Ok(())
1532 }
1533
1534 pub fn size_in_bytes(&self) -> usize {
1535 self.batch.size_in_bytes()
1536 }
1537}
1538
1539impl DBBatch {
1540 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1541 &mut self,
1542 db: &DBMap<K, V>,
1543 purged_vals: impl IntoIterator<Item = J>,
1544 ) -> Result<(), TypedStoreError> {
1545 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1546 return Err(TypedStoreError::CrossDBBatch);
1547 }
1548
1549 purged_vals
1550 .into_iter()
1551 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1552 let k_buf = be_fix_int_ser(k.borrow())?;
1553 self.batch.delete_cf(&db.cf(), k_buf);
1554
1555 Ok(())
1556 })?;
1557 Ok(())
1558 }
1559
1560 pub fn schedule_delete_range<K: Serialize, V>(
1570 &mut self,
1571 db: &DBMap<K, V>,
1572 from: &K,
1573 to: &K,
1574 ) -> Result<(), TypedStoreError> {
1575 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1576 return Err(TypedStoreError::CrossDBBatch);
1577 }
1578
1579 let from_buf = be_fix_int_ser(from)?;
1580 let to_buf = be_fix_int_ser(to)?;
1581
1582 self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1583 Ok(())
1584 }
1585
1586 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1588 &mut self,
1589 db: &DBMap<K, V>,
1590 new_vals: impl IntoIterator<Item = (J, U)>,
1591 ) -> Result<&mut Self, TypedStoreError> {
1592 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1593 return Err(TypedStoreError::CrossDBBatch);
1594 }
1595 let mut total = 0usize;
1596 new_vals
1597 .into_iter()
1598 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1599 let k_buf = be_fix_int_ser(k.borrow())?;
1600 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1601 total += k_buf.len() + v_buf.len();
1602 self.batch.put_cf(&db.cf(), k_buf, v_buf);
1603 Ok(())
1604 })?;
1605 self.db_metrics
1606 .op_metrics
1607 .rocksdb_batch_put_bytes
1608 .with_label_values(&[&db.cf])
1609 .observe(total as f64);
1610 Ok(self)
1611 }
1612
1613 pub fn merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1615 &mut self,
1616 db: &DBMap<K, V>,
1617 new_vals: impl IntoIterator<Item = (J, U)>,
1618 ) -> Result<&mut Self, TypedStoreError> {
1619 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1620 return Err(TypedStoreError::CrossDBBatch);
1621 }
1622
1623 new_vals
1624 .into_iter()
1625 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1626 let k_buf = be_fix_int_ser(k.borrow())?;
1627 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1628 self.batch.merge_cf(&db.cf(), k_buf, v_buf);
1629 Ok(())
1630 })?;
1631 Ok(self)
1632 }
1633
1634 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, V: Serialize, B: AsRef<[u8]>>(
1636 &mut self,
1637 db: &DBMap<K, V>,
1638 new_vals: impl IntoIterator<Item = (J, B)>,
1639 ) -> Result<&mut Self, TypedStoreError> {
1640 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1641 return Err(TypedStoreError::CrossDBBatch);
1642 }
1643 new_vals
1644 .into_iter()
1645 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1646 let k_buf = be_fix_int_ser(k.borrow())?;
1647 self.batch.merge_cf(&db.cf(), k_buf, v);
1648 Ok(())
1649 })?;
1650 Ok(self)
1651 }
1652}
1653
1654pub struct DBTransaction<'a> {
1655 rocksdb: Arc<RocksDB>,
1656 transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1657}
1658
1659impl<'a> DBTransaction<'a> {
1660 pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1661 Ok(Self {
1662 rocksdb: db.clone(),
1663 transaction: db.transaction()?,
1664 })
1665 }
1666
1667 pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1668 Ok(Self {
1669 rocksdb: db.clone(),
1670 transaction: db.transaction_without_snapshot()?,
1671 })
1672 }
1673
1674 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1675 &mut self,
1676 db: &DBMap<K, V>,
1677 new_vals: impl IntoIterator<Item = (J, U)>,
1678 ) -> Result<&mut Self, TypedStoreError> {
1679 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1680 return Err(TypedStoreError::CrossDBBatch);
1681 }
1682
1683 new_vals
1684 .into_iter()
1685 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1686 let k_buf = be_fix_int_ser(k.borrow())?;
1687 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1688 self.transaction
1689 .put_cf(&db.cf(), k_buf, v_buf)
1690 .map_err(typed_store_err_from_rocks_err)?;
1691 Ok(())
1692 })?;
1693 Ok(self)
1694 }
1695
1696 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1698 &mut self,
1699 db: &DBMap<K, V>,
1700 purged_vals: impl IntoIterator<Item = J>,
1701 ) -> Result<&mut Self, TypedStoreError> {
1702 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1703 return Err(TypedStoreError::CrossDBBatch);
1704 }
1705 purged_vals
1706 .into_iter()
1707 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1708 let k_buf = be_fix_int_ser(k.borrow())?;
1709 self.transaction
1710 .delete_cf(&db.cf(), k_buf)
1711 .map_err(typed_store_err_from_rocks_err)?;
1712 Ok(())
1713 })?;
1714 Ok(self)
1715 }
1716
1717 pub fn snapshot(
1718 &self,
1719 ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1720 {
1721 self.transaction.snapshot()
1722 }
1723
1724 pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1725 &self,
1726 db: &DBMap<K, V>,
1727 key: &K,
1728 ) -> Result<Option<V>, TypedStoreError> {
1729 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1730 return Err(TypedStoreError::CrossDBBatch);
1731 }
1732 let k_buf = be_fix_int_ser(key)?;
1733 match self
1734 .transaction
1735 .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1736 .map_err(typed_store_err_from_rocks_err)?
1737 {
1738 Some(data) => Ok(Some(
1739 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1740 )),
1741 None => Ok(None),
1742 }
1743 }
1744
1745 pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1746 &self,
1747 db: &DBMap<K, V>,
1748 key: &K,
1749 ) -> Result<Option<V>, TypedStoreError> {
1750 let key_buf = be_fix_int_ser(key)?;
1751 self.transaction
1752 .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1753 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1754 .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1755 }
1756
1757 pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1758 &self,
1759 db: &DBMap<K, V>,
1760 keys: impl IntoIterator<Item = J>,
1761 ) -> Result<Vec<Option<V>>, TypedStoreError> {
1762 let cf = db.cf();
1763 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1764 .into_iter()
1765 .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1766 .collect();
1767
1768 let results = self
1769 .transaction
1770 .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1771
1772 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1773 .into_iter()
1774 .map(
1775 |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1776 Some(data) => Ok(Some(
1777 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1778 )),
1779 None => Ok(None),
1780 },
1781 )
1782 .collect();
1783
1784 values_parsed
1785 }
1786
1787 pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1788 &'a self,
1789 db: &DBMap<K, V>,
1790 ) -> Iter<'a, K, V> {
1791 let db_iter = self
1792 .transaction
1793 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1794 Iter::new(
1795 db.cf.clone(),
1796 RocksDBRawIter::OptimisticTransaction(db_iter),
1797 None,
1798 None,
1799 None,
1800 None,
1801 None,
1802 )
1803 }
1804
1805 pub fn keys<K: DeserializeOwned, V: DeserializeOwned>(
1806 &'a self,
1807 db: &DBMap<K, V>,
1808 ) -> Keys<'a, K> {
1809 let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1810 self.transaction
1811 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1812 );
1813 db_iter.seek_to_first();
1814
1815 Keys::new(db_iter)
1816 }
1817
1818 pub fn values<K: DeserializeOwned, V: DeserializeOwned>(
1819 &'a self,
1820 db: &DBMap<K, V>,
1821 ) -> Values<'a, V> {
1822 let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1823 self.transaction
1824 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1825 );
1826 db_iter.seek_to_first();
1827
1828 Values::new(db_iter)
1829 }
1830
1831 pub fn commit(self) -> Result<(), TypedStoreError> {
1832 fail_point!("transaction-commit");
1833 self.transaction.commit().map_err(|e| match e.kind() {
1834 ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1837 _ => typed_store_err_from_rocks_err(e),
1838 })?;
1839 Ok(())
1840 }
1841}
1842
1843macro_rules! delegate_iter_call {
1844 ($self:ident.$method:ident($($args:ident),*)) => {
1845 match $self {
1846 Self::DB(db) => db.$method($($args),*),
1847 Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1848 Self::OptimisticTransaction(db) => db.$method($($args),*),
1849 }
1850 }
1851}
1852
1853pub enum RocksDBRawIter<'a> {
1854 DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1855 OptimisticTransactionDB(
1856 rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1857 ),
1858 OptimisticTransaction(
1859 rocksdb::DBRawIteratorWithThreadMode<
1860 'a,
1861 Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1862 >,
1863 ),
1864}
1865
1866impl RocksDBRawIter<'_> {
1867 pub fn valid(&self) -> bool {
1868 delegate_iter_call!(self.valid())
1869 }
1870 pub fn key(&self) -> Option<&[u8]> {
1871 delegate_iter_call!(self.key())
1872 }
1873 pub fn value(&self) -> Option<&[u8]> {
1874 delegate_iter_call!(self.value())
1875 }
1876 pub fn next(&mut self) {
1877 delegate_iter_call!(self.next())
1878 }
1879 pub fn prev(&mut self) {
1880 delegate_iter_call!(self.prev())
1881 }
1882 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1883 delegate_iter_call!(self.seek(key))
1884 }
1885 pub fn seek_to_last(&mut self) {
1886 delegate_iter_call!(self.seek_to_last())
1887 }
1888 pub fn seek_to_first(&mut self) {
1889 delegate_iter_call!(self.seek_to_first())
1890 }
1891 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1892 delegate_iter_call!(self.seek_for_prev(key))
1893 }
1894 pub fn status(&self) -> Result<(), rocksdb::Error> {
1895 delegate_iter_call!(self.status())
1896 }
1897}
1898
1899pub enum RocksDBIter<'a> {
1900 DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1901 OptimisticTransactionDB(
1902 rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1903 ),
1904}
1905
1906impl Iterator for RocksDBIter<'_> {
1907 type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1908 fn next(&mut self) -> Option<Self::Item> {
1909 match self {
1910 Self::DB(db) => db.next(),
1911 Self::OptimisticTransactionDB(db) => db.next(),
1912 }
1913 }
1914}
1915
1916impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1917where
1918 K: Serialize + DeserializeOwned,
1919 V: Serialize + DeserializeOwned,
1920{
1921 type Error = TypedStoreError;
1922 type Iterator = Iter<'a, K, V>;
1923 type SafeIterator = SafeIter<'a, K, V>;
1924 type Keys = Keys<'a, K>;
1925 type Values = Values<'a, V>;
1926
1927 #[instrument(level = "trace", skip_all, err)]
1928 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1929 let key_buf = be_fix_int_ser(key)?;
1930 let readopts = self.opts.readopts();
1933 Ok(self
1934 .rocksdb
1935 .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1936 && self
1937 .rocksdb
1938 .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1939 .map_err(typed_store_err_from_rocks_err)?
1940 .is_some())
1941 }
1942
1943 #[instrument(level = "trace", skip_all, err)]
1944 fn multi_contains_keys<J>(
1945 &self,
1946 keys: impl IntoIterator<Item = J>,
1947 ) -> Result<Vec<bool>, Self::Error>
1948 where
1949 J: Borrow<K>,
1950 {
1951 let values = self.multi_get_pinned(keys)?;
1952 Ok(values.into_iter().map(|v| v.is_some()).collect())
1953 }
1954
1955 #[instrument(level = "trace", skip_all, err)]
1956 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1957 let _timer = self
1958 .db_metrics
1959 .op_metrics
1960 .rocksdb_get_latency_seconds
1961 .with_label_values(&[&self.cf])
1962 .start_timer();
1963 let perf_ctx = if self.get_sample_interval.sample() {
1964 Some(RocksDBPerfContext)
1965 } else {
1966 None
1967 };
1968 let key_buf = be_fix_int_ser(key)?;
1969 let res = self
1970 .rocksdb
1971 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1972 .map_err(typed_store_err_from_rocks_err)?;
1973 self.db_metrics
1974 .op_metrics
1975 .rocksdb_get_bytes
1976 .with_label_values(&[&self.cf])
1977 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1978 if perf_ctx.is_some() {
1979 self.db_metrics
1980 .read_perf_ctx_metrics
1981 .report_metrics(&self.cf);
1982 }
1983 match res {
1984 Some(data) => Ok(Some(
1985 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1986 )),
1987 None => Ok(None),
1988 }
1989 }
1990
1991 #[instrument(level = "trace", skip_all, err)]
1992 fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
1993 let _timer = self
1994 .db_metrics
1995 .op_metrics
1996 .rocksdb_get_latency_seconds
1997 .with_label_values(&[&self.cf])
1998 .start_timer();
1999 let perf_ctx = if self.get_sample_interval.sample() {
2000 Some(RocksDBPerfContext)
2001 } else {
2002 None
2003 };
2004 let key_buf = be_fix_int_ser(key)?;
2005 let res = self
2006 .rocksdb
2007 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
2008 .map_err(typed_store_err_from_rocks_err)?;
2009 self.db_metrics
2010 .op_metrics
2011 .rocksdb_get_bytes
2012 .with_label_values(&[&self.cf])
2013 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
2014 if perf_ctx.is_some() {
2015 self.db_metrics
2016 .read_perf_ctx_metrics
2017 .report_metrics(&self.cf);
2018 }
2019 match res {
2020 Some(data) => Ok(Some(data.to_vec())),
2021 None => Ok(None),
2022 }
2023 }
2024
2025 #[instrument(level = "trace", skip_all, err)]
2026 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
2027 let timer = self
2028 .db_metrics
2029 .op_metrics
2030 .rocksdb_put_latency_seconds
2031 .with_label_values(&[&self.cf])
2032 .start_timer();
2033 let perf_ctx = if self.write_sample_interval.sample() {
2034 Some(RocksDBPerfContext)
2035 } else {
2036 None
2037 };
2038 let key_buf = be_fix_int_ser(key)?;
2039 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
2040 self.db_metrics
2041 .op_metrics
2042 .rocksdb_put_bytes
2043 .with_label_values(&[&self.cf])
2044 .observe((key_buf.len() + value_buf.len()) as f64);
2045 if perf_ctx.is_some() {
2046 self.db_metrics
2047 .write_perf_ctx_metrics
2048 .report_metrics(&self.cf);
2049 }
2050 self.rocksdb
2051 .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
2052 .map_err(typed_store_err_from_rocks_err)?;
2053
2054 let elapsed = timer.stop_and_record();
2055 if elapsed > 1.0 {
2056 warn!(?elapsed, cf = ?self.cf, "very slow insert");
2057 self.db_metrics
2058 .op_metrics
2059 .rocksdb_very_slow_puts_count
2060 .with_label_values(&[&self.cf])
2061 .inc();
2062 self.db_metrics
2063 .op_metrics
2064 .rocksdb_very_slow_puts_duration_ms
2065 .with_label_values(&[&self.cf])
2066 .inc_by((elapsed * 1000.0) as u64);
2067 }
2068
2069 Ok(())
2070 }
2071
2072 #[instrument(level = "trace", skip_all, err)]
2073 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
2074 let _timer = self
2075 .db_metrics
2076 .op_metrics
2077 .rocksdb_delete_latency_seconds
2078 .with_label_values(&[&self.cf])
2079 .start_timer();
2080 let perf_ctx = if self.write_sample_interval.sample() {
2081 Some(RocksDBPerfContext)
2082 } else {
2083 None
2084 };
2085 let key_buf = be_fix_int_ser(key)?;
2086 self.rocksdb
2087 .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2088 .map_err(typed_store_err_from_rocks_err)?;
2089 self.db_metrics
2090 .op_metrics
2091 .rocksdb_deletes
2092 .with_label_values(&[&self.cf])
2093 .inc();
2094 if perf_ctx.is_some() {
2095 self.db_metrics
2096 .write_perf_ctx_metrics
2097 .report_metrics(&self.cf);
2098 }
2099 Ok(())
2100 }
2101
2102 #[instrument(level = "trace", skip_all, err)]
2112 fn delete_file_in_range(&self, from: &K, to: &K) -> Result<(), TypedStoreError> {
2113 let from_buf = be_fix_int_ser(from.borrow())?;
2114 let to_buf = be_fix_int_ser(to.borrow())?;
2115 self.rocksdb
2116 .delete_file_in_range(&self.cf(), from_buf, to_buf)
2117 .map_err(typed_store_err_from_rocks_err)?;
2118 Ok(())
2119 }
2120
2121 #[instrument(level = "trace", skip_all, err)]
2126 fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2127 let _ = self.rocksdb.drop_cf(&self.cf);
2128 self.rocksdb
2129 .create_cf(self.cf.clone(), &default_db_options().options)
2130 .map_err(typed_store_err_from_rocks_err)?;
2131 Ok(())
2132 }
2133
2134 #[instrument(level = "trace", skip_all, err)]
2143 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2144 let mut iter = self.unbounded_iter().seek_to_first();
2145 let first_key = iter.next().map(|(k, _v)| k);
2146 let last_key = iter.skip_to_last().next().map(|(k, _v)| k);
2147 if let Some((first_key, last_key)) = first_key.zip(last_key) {
2148 let mut batch = self.batch();
2149 batch.schedule_delete_range(self, &first_key, &last_key)?;
2150 batch.write()?;
2151 }
2152 Ok(())
2153 }
2154
2155 fn is_empty(&self) -> bool {
2156 self.safe_iter().next().is_none()
2157 }
2158
2159 fn unbounded_iter(&'a self) -> Self::Iterator {
2162 let db_iter = self
2163 .rocksdb
2164 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2165 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2166 Iter::new(
2167 self.cf.clone(),
2168 db_iter,
2169 _timer,
2170 _perf_ctx,
2171 bytes_scanned,
2172 keys_scanned,
2173 Some(self.db_metrics.clone()),
2174 )
2175 }
2176
2177 fn iter_with_bounds(
2181 &'a self,
2182 lower_bound: Option<K>,
2183 upper_bound: Option<K>,
2184 ) -> Self::Iterator {
2185 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2186 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2187 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2188 Iter::new(
2189 self.cf.clone(),
2190 db_iter,
2191 _timer,
2192 _perf_ctx,
2193 bytes_scanned,
2194 keys_scanned,
2195 Some(self.db_metrics.clone()),
2196 )
2197 }
2198
2199 fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2202 let readopts = self.create_read_options_with_range(range);
2203 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2204 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2205 Iter::new(
2206 self.cf.clone(),
2207 db_iter,
2208 _timer,
2209 _perf_ctx,
2210 bytes_scanned,
2211 keys_scanned,
2212 Some(self.db_metrics.clone()),
2213 )
2214 }
2215
2216 fn safe_iter(&'a self) -> Self::SafeIterator {
2217 let db_iter = self
2218 .rocksdb
2219 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2220 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2221 SafeIter::new(
2222 self.cf.clone(),
2223 db_iter,
2224 _timer,
2225 _perf_ctx,
2226 bytes_scanned,
2227 keys_scanned,
2228 Some(self.db_metrics.clone()),
2229 )
2230 }
2231
2232 fn safe_iter_with_bounds(
2233 &'a self,
2234 lower_bound: Option<K>,
2235 upper_bound: Option<K>,
2236 ) -> Self::SafeIterator {
2237 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2238 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2239 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2240 SafeIter::new(
2241 self.cf.clone(),
2242 db_iter,
2243 _timer,
2244 _perf_ctx,
2245 bytes_scanned,
2246 keys_scanned,
2247 Some(self.db_metrics.clone()),
2248 )
2249 }
2250
2251 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2252 let readopts = self.create_read_options_with_range(range);
2253 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2254 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2255 SafeIter::new(
2256 self.cf.clone(),
2257 db_iter,
2258 _timer,
2259 _perf_ctx,
2260 bytes_scanned,
2261 keys_scanned,
2262 Some(self.db_metrics.clone()),
2263 )
2264 }
2265
2266 fn keys(&'a self) -> Self::Keys {
2267 let mut db_iter = self
2268 .rocksdb
2269 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2270 db_iter.seek_to_first();
2271
2272 Keys::new(db_iter)
2273 }
2274
2275 fn values(&'a self) -> Self::Values {
2276 let mut db_iter = self
2277 .rocksdb
2278 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2279 db_iter.seek_to_first();
2280
2281 Values::new(db_iter)
2282 }
2283
2284 #[instrument(level = "trace", skip_all, err)]
2286 fn multi_get_raw_bytes<J>(
2287 &self,
2288 keys: impl IntoIterator<Item = J>,
2289 ) -> Result<Vec<Option<Vec<u8>>>, TypedStoreError>
2290 where
2291 J: Borrow<K>,
2292 {
2293 let results = self
2294 .multi_get_pinned(keys)?
2295 .into_iter()
2296 .map(|val| val.map(|v| v.to_vec()))
2297 .collect();
2298 Ok(results)
2299 }
2300
2301 #[instrument(level = "trace", skip_all, err)]
2303 fn multi_get<J>(
2304 &self,
2305 keys: impl IntoIterator<Item = J>,
2306 ) -> Result<Vec<Option<V>>, TypedStoreError>
2307 where
2308 J: Borrow<K>,
2309 {
2310 let results = self.multi_get_pinned(keys)?;
2311 let values_parsed: Result<Vec<_>, TypedStoreError> = results
2312 .into_iter()
2313 .map(|value_byte| match value_byte {
2314 Some(data) => Ok(Some(
2315 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2316 )),
2317 None => Ok(None),
2318 })
2319 .collect();
2320
2321 values_parsed
2322 }
2323
2324 #[instrument(level = "trace", skip_all, err)]
2326 fn chunked_multi_get<J>(
2327 &self,
2328 keys: impl IntoIterator<Item = J>,
2329 chunk_size: usize,
2330 ) -> Result<Vec<Option<V>>, TypedStoreError>
2331 where
2332 J: Borrow<K>,
2333 {
2334 let cf = self.cf();
2335 let keys_bytes = keys
2336 .into_iter()
2337 .map(|k| (&cf, be_fix_int_ser(k.borrow()).unwrap()));
2338 let chunked_keys = keys_bytes.into_iter().chunks(chunk_size);
2339 let snapshot = self.snapshot()?;
2340 let mut results = vec![];
2341 for chunk in chunked_keys.into_iter() {
2342 let chunk_result = snapshot.multi_get_cf(chunk);
2343 let values_parsed: Result<Vec<_>, TypedStoreError> = chunk_result
2344 .into_iter()
2345 .map(|value_byte| {
2346 let value_byte = value_byte.map_err(typed_store_err_from_rocks_err)?;
2347 match value_byte {
2348 Some(data) => Ok(Some(
2349 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2350 )),
2351 None => Ok(None),
2352 }
2353 })
2354 .collect();
2355 results.extend(values_parsed?);
2356 }
2357 Ok(results)
2358 }
2359
2360 #[instrument(level = "trace", skip_all, err)]
2362 fn multi_insert<J, U>(
2363 &self,
2364 key_val_pairs: impl IntoIterator<Item = (J, U)>,
2365 ) -> Result<(), Self::Error>
2366 where
2367 J: Borrow<K>,
2368 U: Borrow<V>,
2369 {
2370 let mut batch = self.batch();
2371 batch.insert_batch(self, key_val_pairs)?;
2372 batch.write()
2373 }
2374
2375 #[instrument(level = "trace", skip_all, err)]
2377 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2378 where
2379 J: Borrow<K>,
2380 {
2381 let mut batch = self.batch();
2382 batch.delete_batch(self, keys)?;
2383 batch.write()
2384 }
2385
2386 #[instrument(level = "trace", skip_all, err)]
2388 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2389 self.rocksdb
2390 .try_catch_up_with_primary()
2391 .map_err(typed_store_err_from_rocks_err)
2392 }
2393}
2394
2395impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2396where
2397 J: Borrow<K>,
2398 U: Borrow<V>,
2399 K: Serialize,
2400 V: Serialize,
2401{
2402 type Error = TypedStoreError;
2403
2404 fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2405 where
2406 T: Iterator<Item = (J, U)>,
2407 {
2408 let mut batch = self.batch();
2409 batch.insert_batch(self, iter)?;
2410 batch.write()
2411 }
2412
2413 fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2414 let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2415 let mut batch = self.batch();
2416 batch.insert_batch(self, slice_of_refs)?;
2417 batch.write()
2418 }
2419}
2420
2421pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2422 env::var(var_name)
2423 .ok()?
2424 .parse::<usize>()
2425 .tap_err(|e| {
2426 warn!(
2427 "Env var {} does not contain valid usize integer: {}",
2428 var_name, e
2429 )
2430 })
2431 .ok()
2432}
2433
2434#[derive(Clone, Debug)]
2435pub struct ReadWriteOptions {
2436 pub ignore_range_deletions: bool,
2437 sync_to_disk: bool,
2439}
2440
2441impl ReadWriteOptions {
2442 pub fn readopts(&self) -> ReadOptions {
2443 let mut readopts = ReadOptions::default();
2444 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2445 readopts
2446 }
2447
2448 pub fn writeopts(&self) -> WriteOptions {
2449 let mut opts = WriteOptions::default();
2450 opts.set_sync(self.sync_to_disk);
2451 opts
2452 }
2453
2454 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2455 self.ignore_range_deletions = ignore;
2456 self
2457 }
2458}
2459
2460impl Default for ReadWriteOptions {
2461 fn default() -> Self {
2462 Self {
2463 ignore_range_deletions: true,
2464 sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2465 }
2466 }
2467}
2468#[derive(Default, Clone)]
2471pub struct DBOptions {
2472 pub options: rocksdb::Options,
2473 pub rw_options: ReadWriteOptions,
2474}
2475
2476impl DBOptions {
2477 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2481 self.options
2483 .optimize_for_point_lookup(block_cache_size_mb as u64);
2484 self
2485 }
2486
2487 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2490 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2491 info!("Large value blob storage optimization is disabled via env var.");
2492 return self;
2493 }
2494
2495 self.options.set_enable_blob_files(true);
2497 self.options
2498 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2499 self.options.set_enable_blob_gc(true);
2500 self.options.set_min_blob_size(min_blob_size);
2504
2505 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2507 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2508 * 1024
2509 * 1024;
2510 self.options.set_write_buffer_size(write_buffer_size);
2511 let target_file_size_base = 64 << 20;
2514 self.options
2515 .set_target_file_size_base(target_file_size_base);
2516 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2518 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2519 self.options
2520 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2521
2522 self
2523 }
2524
2525 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2527 self.options
2528 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2529 self
2530 }
2531
2532 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2534 self.options
2535 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2536 self.options
2537 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2538 self
2539 }
2540
2541 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2543 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2545 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2546 * 1024
2547 * 1024;
2548 self.options.set_write_buffer_size(write_buffer_size);
2549 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2551 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2552 self.options
2553 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2554 self.options
2556 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2557
2558 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2560 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2561 self.options.set_level_zero_file_num_compaction_trigger(
2562 max_level_zero_file_num.try_into().unwrap(),
2563 );
2564 self.options.set_level_zero_slowdown_writes_trigger(
2565 (max_level_zero_file_num * 12).try_into().unwrap(),
2566 );
2567 self.options
2568 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2569
2570 self.options.set_target_file_size_base(
2572 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2573 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2574 * 1024
2575 * 1024,
2576 );
2577
2578 self.options
2580 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2581
2582 self
2583 }
2584
2585 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2589 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2591 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2592 * 1024
2593 * 1024;
2594 self.options.set_write_buffer_size(write_buffer_size);
2595 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2597 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2598 self.options
2599 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2600 self.options
2602 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2603
2604 self.options
2606 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2607 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2608 compaction_options.set_max_size_amplification_percent(10000);
2609 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2610 self.options
2611 .set_universal_compaction_options(&compaction_options);
2612
2613 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2614 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2615 self.options.set_level_zero_file_num_compaction_trigger(
2616 max_level_zero_file_num.try_into().unwrap(),
2617 );
2618 self.options.set_level_zero_slowdown_writes_trigger(
2619 (max_level_zero_file_num * 12).try_into().unwrap(),
2620 );
2621 self.options
2622 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2623
2624 self.options.set_target_file_size_base(
2626 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2627 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2628 * 1024
2629 * 1024,
2630 );
2631
2632 self.options
2634 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2635
2636 self
2637 }
2638
2639 pub fn set_block_options(
2641 mut self,
2642 block_cache_size_mb: usize,
2643 block_size_bytes: usize,
2644 ) -> DBOptions {
2645 self.options
2646 .set_block_based_table_factory(&get_block_options(
2647 block_cache_size_mb,
2648 block_size_bytes,
2649 ));
2650 self
2651 }
2652
2653 pub fn disable_write_throttling(mut self) -> DBOptions {
2655 self.options.set_soft_pending_compaction_bytes_limit(0);
2656 self.options.set_hard_pending_compaction_bytes_limit(0);
2657 self
2658 }
2659}
2660
2661pub fn default_db_options() -> DBOptions {
2664 let mut opt = rocksdb::Options::default();
2665
2666 if let Some(limit) = fdlimit::raise_fd_limit() {
2670 opt.set_max_open_files((limit / 8) as i32);
2672 }
2673
2674 opt.set_table_cache_num_shard_bits(10);
2677
2678 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2680 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2681 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2682
2683 opt.set_db_write_buffer_size(
2695 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2696 * 1024
2697 * 1024,
2698 );
2699 opt.set_max_total_wal_size(
2700 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2701 );
2702
2703 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2705
2706 opt.set_enable_pipelined_write(true);
2707
2708 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2711
2712 opt.set_memtable_prefix_bloom_ratio(0.02);
2714
2715 DBOptions {
2716 options: opt,
2717 rw_options: ReadWriteOptions::default(),
2718 }
2719}
2720
2721fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2722 let mut block_options = BlockBasedOptions::default();
2727 block_options.set_block_size(block_size_bytes);
2729 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2731 block_options.set_bloom_filter(10.0, false);
2733 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2735 block_options
2736}
2737
2738#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2741pub fn open_cf<P: AsRef<Path>>(
2742 path: P,
2743 db_options: Option<rocksdb::Options>,
2744 metric_conf: MetricConf,
2745 opt_cfs: &[&str],
2746) -> Result<Arc<RocksDB>, TypedStoreError> {
2747 let options = db_options.unwrap_or_else(|| default_db_options().options);
2748 let column_descriptors: Vec<_> = opt_cfs
2749 .iter()
2750 .map(|name| (*name, options.clone()))
2751 .collect();
2752 open_cf_opts(
2753 path,
2754 Some(options.clone()),
2755 metric_conf,
2756 &column_descriptors[..],
2757 )
2758}
2759
2760fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2761 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2763 options.create_if_missing(true);
2764 options.create_missing_column_families(true);
2765 options
2766}
2767
2768#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2771pub fn open_cf_opts<P: AsRef<Path>>(
2772 path: P,
2773 db_options: Option<rocksdb::Options>,
2774 metric_conf: MetricConf,
2775 opt_cfs: &[(&str, rocksdb::Options)],
2776) -> Result<Arc<RocksDB>, TypedStoreError> {
2777 let path = path.as_ref();
2778 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2788 nondeterministic!({
2789 let options = prepare_db_options(db_options);
2790 let rocksdb = {
2791 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2792 &options,
2793 path,
2794 cfs.into_iter()
2795 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2796 )
2797 .map_err(typed_store_err_from_rocks_err)?
2798 };
2799 Ok(Arc::new(RocksDB::DBWithThreadMode(
2800 DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2801 )))
2802 })
2803}
2804
2805#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2808pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2809 path: P,
2810 db_options: Option<rocksdb::Options>,
2811 metric_conf: MetricConf,
2812 opt_cfs: &[(&str, rocksdb::Options)],
2813) -> Result<Arc<RocksDB>, TypedStoreError> {
2814 let path = path.as_ref();
2815 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2816 nondeterministic!({
2818 let options = prepare_db_options(db_options);
2819 let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2820 &options,
2821 path,
2822 cfs.into_iter()
2823 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2824 )
2825 .map_err(typed_store_err_from_rocks_err)?;
2826 Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2827 OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2828 )))
2829 })
2830}
2831
2832pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2835 primary_path: P,
2836 secondary_path: Option<P>,
2837 db_options: Option<rocksdb::Options>,
2838 metric_conf: MetricConf,
2839 opt_cfs: &[(&str, rocksdb::Options)],
2840) -> Result<Arc<RocksDB>, TypedStoreError> {
2841 let primary_path = primary_path.as_ref();
2842 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2843 nondeterministic!({
2845 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2847
2848 fdlimit::raise_fd_limit();
2849 options.set_max_open_files(-1);
2851
2852 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2853 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2854 .ok()
2855 .unwrap_or_default();
2856
2857 let default_db_options = default_db_options();
2858 for cf_key in cfs.iter() {
2860 if !opt_cfs.contains_key(&cf_key[..]) {
2861 opt_cfs.insert(cf_key, default_db_options.options.clone());
2862 }
2863 }
2864
2865 let primary_path = primary_path.to_path_buf();
2866 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2867 let mut s = primary_path.clone();
2868 s.pop();
2869 s.push("SECONDARY");
2870 s.as_path().to_path_buf()
2871 });
2872
2873 let rocksdb = {
2874 options.create_if_missing(true);
2875 options.create_missing_column_families(true);
2876 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2877 &options,
2878 &primary_path,
2879 &secondary_path,
2880 opt_cfs
2881 .iter()
2882 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2883 )
2884 .map_err(typed_store_err_from_rocks_err)?;
2885 db.try_catch_up_with_primary()
2886 .map_err(typed_store_err_from_rocks_err)?;
2887 db
2888 };
2889 Ok(Arc::new(RocksDB::DBWithThreadMode(
2890 DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2891 )))
2892 })
2893}
2894
2895pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2896 const DB_DEFAULT_CF_NAME: &str = "default";
2897
2898 let opts = rocksdb::Options::default();
2899 rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2900 .map_err(|e| e.into())
2901 .map(|q| {
2902 q.iter()
2903 .filter_map(|s| {
2904 if s != DB_DEFAULT_CF_NAME {
2906 Some(s.clone())
2907 } else {
2908 None
2909 }
2910 })
2911 .collect()
2912 })
2913}
2914
2915#[inline]
2917pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2918where
2919 S: ?Sized + serde::Serialize,
2920{
2921 bincode::DefaultOptions::new()
2922 .with_big_endian()
2923 .with_fixint_encoding()
2924 .serialize(t)
2925 .map_err(typed_store_err_from_bincode_err)
2926}
2927
2928#[derive(Clone)]
2929pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2930impl DBMapTableConfigMap {
2931 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2932 Self(map)
2933 }
2934
2935 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2936 self.0.clone()
2937 }
2938}
2939
2940pub enum RocksDBAccessType {
2941 Primary,
2942 Secondary(Option<PathBuf>),
2943}
2944
2945pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2946 rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2947}
2948
2949fn populate_missing_cfs(
2950 input_cfs: &[(&str, rocksdb::Options)],
2951 path: &Path,
2952) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2953 let mut cfs = vec![];
2954 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2955 let existing_cfs =
2956 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2957 .ok()
2958 .unwrap_or_default();
2959
2960 for cf_name in existing_cfs {
2961 if !input_cf_index.contains(&cf_name[..]) {
2962 cfs.push((cf_name, rocksdb::Options::default()));
2963 }
2964 }
2965 cfs.extend(
2966 input_cfs
2967 .iter()
2968 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2969 );
2970 Ok(cfs)
2971}
2972
2973fn big_endian_saturating_add_one(v: &mut [u8]) {
2977 if is_max(v) {
2978 return;
2979 }
2980 for i in (0..v.len()).rev() {
2981 if v[i] == u8::MAX {
2982 v[i] = 0;
2983 } else {
2984 v[i] += 1;
2985 break;
2986 }
2987 }
2988}
2989
2990fn is_max(v: &[u8]) -> bool {
2992 v.iter().all(|&x| x == u8::MAX)
2993}
2994
2995#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2998#[test]
2999fn test_helpers() {
3000 let v = vec![];
3001 assert!(is_max(&v));
3002
3003 fn check_add(v: Vec<u8>) {
3004 let mut v = v;
3005 let num = Num32::from_big_endian(&v);
3006 big_endian_saturating_add_one(&mut v);
3007 assert!(num + 1 == Num32::from_big_endian(&v));
3008 }
3009
3010 uint::construct_uint! {
3011 struct Num32(4);
3013 }
3014
3015 let mut v = vec![255; 32];
3016 big_endian_saturating_add_one(&mut v);
3017 assert!(Num32::MAX == Num32::from_big_endian(&v));
3018
3019 check_add(vec![1; 32]);
3020 check_add(vec![6; 32]);
3021 check_add(vec![254; 32]);
3022
3023 }