1pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod keys;
8pub(crate) mod safe_iter;
9pub mod util;
10pub(crate) mod values;
11
12use std::{
13 borrow::Borrow,
14 collections::{BTreeMap, HashSet},
15 env,
16 ffi::CStr,
17 marker::PhantomData,
18 ops::{Bound, RangeBounds},
19 path::{Path, PathBuf},
20 sync::Arc,
21 time::Duration,
22};
23
24use bincode::Options;
25use collectable::TryExtend;
26use iota_macros::{fail_point, nondeterministic};
27use itertools::Itertools;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::{
30 AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
31 ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
32 IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
33 ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
34 WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
35};
36use serde::{Serialize, de::DeserializeOwned};
37use tap::TapFallible;
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, instrument, warn};
40
41use self::{iter::Iter, keys::Keys, values::Values};
42use crate::{
43 TypedStoreError,
44 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
45 rocks::{
46 errors::{
47 typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
48 typed_store_err_from_rocks_err,
49 },
50 safe_iter::SafeIter,
51 },
52 traits::{Map, TableSummary},
53};
54
55const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
58const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
59
60const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
63const DEFAULT_DB_WAL_SIZE: usize = 1024;
64
65const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
68const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
69const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
70const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
71const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
72const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
73const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
74const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
75const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
76
77const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
79
80const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
81
82const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
85 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
86
87const DB_CORRUPTED_KEY: &[u8] = b"db_corrupted";
88
89#[cfg(test)]
90mod tests;
91
92#[macro_export]
130macro_rules! reopen {
131 ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
132 (
133 $(
134 DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
135 ),*
136 )
137 };
138}
139
140#[macro_export]
144macro_rules! retry_transaction {
145 ($transaction:expr) => {
146 retry_transaction!($transaction, Some(20))
147 };
148
149 (
150 $transaction:expr,
151 $max_retries:expr $(,)?
153
154 ) => {{
155 use rand::{
156 distributions::{Distribution, Uniform},
157 rngs::ThreadRng,
158 };
159 use tokio::time::{Duration, sleep};
160 use tracing::{error, info};
161
162 let mut retries = 0;
163 let max_retries = $max_retries;
164 loop {
165 let status = $transaction;
166 match status {
167 Err(TypedStoreError::RetryableTransaction) => {
168 retries += 1;
169 let delay = {
171 let mut rng = ThreadRng::default();
172 Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
173 };
174 if let Some(max_retries) = max_retries {
175 if retries > max_retries {
176 error!(?max_retries, "max retries exceeded");
177 break status;
178 }
179 }
180 if retries > 10 {
181 error!(?delay, ?retries, "excessive transaction retries...");
183 } else {
184 info!(
185 ?delay,
186 ?retries,
187 "transaction write conflict detected, sleeping"
188 );
189 }
190 sleep(delay).await;
191 }
192 _ => break status,
193 }
194 }
195 }};
196}
197
198#[macro_export]
199macro_rules! retry_transaction_forever {
200 ($transaction:expr) => {
201 $crate::retry_transaction!($transaction, None)
202 };
203}
204
205#[derive(Debug)]
206pub struct DBWithThreadModeWrapper {
207 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
208 pub metric_conf: MetricConf,
209 pub db_path: PathBuf,
210}
211
212impl DBWithThreadModeWrapper {
213 fn new(
214 underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
215 metric_conf: MetricConf,
216 db_path: PathBuf,
217 ) -> Self {
218 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
219 Self {
220 underlying,
221 metric_conf,
222 db_path,
223 }
224 }
225}
226
227impl Drop for DBWithThreadModeWrapper {
228 fn drop(&mut self) {
229 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
230 }
231}
232
233#[derive(Debug)]
234pub struct OptimisticTransactionDBWrapper {
235 pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
236 pub metric_conf: MetricConf,
237 pub db_path: PathBuf,
238}
239
240impl OptimisticTransactionDBWrapper {
241 fn new(
242 underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
243 metric_conf: MetricConf,
244 db_path: PathBuf,
245 ) -> Self {
246 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
247 Self {
248 underlying,
249 metric_conf,
250 db_path,
251 }
252 }
253}
254
255impl Drop for OptimisticTransactionDBWrapper {
256 fn drop(&mut self) {
257 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
258 }
259}
260
261#[derive(Debug)]
263pub enum RocksDB {
264 DBWithThreadMode(DBWithThreadModeWrapper),
265 OptimisticTransactionDB(OptimisticTransactionDBWrapper),
266}
267
268macro_rules! delegate_call {
269 ($self:ident.$method:ident($($args:ident),*)) => {
270 match $self {
271 Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
272 Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
273 }
274 }
275}
276
277impl Drop for RocksDB {
278 fn drop(&mut self) {
279 delegate_call!(self.cancel_all_background_work(true))
280 }
281}
282
283impl RocksDB {
284 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
285 delegate_call!(self.get(key))
286 }
287
288 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
289 &'a self,
290 keys: I,
291 readopts: &ReadOptions,
292 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
293 where
294 K: AsRef<[u8]>,
295 I: IntoIterator<Item = (&'b W, K)>,
296 W: 'b + AsColumnFamilyRef,
297 {
298 delegate_call!(self.multi_get_cf_opt(keys, readopts))
299 }
300
301 pub fn batched_multi_get_cf_opt<I, K>(
302 &self,
303 cf: &impl AsColumnFamilyRef,
304 keys: I,
305 sorted_input: bool,
306 readopts: &ReadOptions,
307 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
308 where
309 I: IntoIterator<Item = K>,
310 K: AsRef<[u8]>,
311 {
312 delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
313 }
314
315 pub fn property_int_value_cf(
316 &self,
317 cf: &impl AsColumnFamilyRef,
318 name: impl CStrLike,
319 ) -> Result<Option<u64>, rocksdb::Error> {
320 delegate_call!(self.property_int_value_cf(cf, name))
321 }
322
323 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
324 &self,
325 cf: &impl AsColumnFamilyRef,
326 key: K,
327 readopts: &ReadOptions,
328 ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
329 delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
330 }
331
332 pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
333 delegate_call!(self.cf_handle(name))
334 }
335
336 pub fn create_cf<N: AsRef<str>>(
337 &self,
338 name: N,
339 opts: &rocksdb::Options,
340 ) -> Result<(), rocksdb::Error> {
341 delegate_call!(self.create_cf(name, opts))
342 }
343
344 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
345 delegate_call!(self.drop_cf(name))
346 }
347
348 pub fn delete_file_in_range<K: AsRef<[u8]>>(
349 &self,
350 cf: &impl AsColumnFamilyRef,
351 from: K,
352 to: K,
353 ) -> Result<(), rocksdb::Error> {
354 delegate_call!(self.delete_file_in_range_cf(cf, from, to))
355 }
356
357 pub fn delete_cf<K: AsRef<[u8]>>(
358 &self,
359 cf: &impl AsColumnFamilyRef,
360 key: K,
361 writeopts: &WriteOptions,
362 ) -> Result<(), rocksdb::Error> {
363 fail_point!("delete-cf-before");
364 let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
365 fail_point!("delete-cf-after");
366 #[expect(clippy::let_and_return)]
367 ret
368 }
369
370 pub fn path(&self) -> &Path {
371 delegate_call!(self.path())
372 }
373
374 pub fn put_cf<K, V>(
375 &self,
376 cf: &impl AsColumnFamilyRef,
377 key: K,
378 value: V,
379 writeopts: &WriteOptions,
380 ) -> Result<(), rocksdb::Error>
381 where
382 K: AsRef<[u8]>,
383 V: AsRef<[u8]>,
384 {
385 fail_point!("put-cf-before");
386 let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
387 fail_point!("put-cf-after");
388 #[expect(clippy::let_and_return)]
389 ret
390 }
391
392 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
393 &self,
394 cf: &impl AsColumnFamilyRef,
395 key: K,
396 readopts: &ReadOptions,
397 ) -> bool {
398 delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
399 }
400
401 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
402 delegate_call!(self.try_catch_up_with_primary())
403 }
404
405 pub fn write(
406 &self,
407 batch: RocksDBBatch,
408 writeopts: &WriteOptions,
409 ) -> Result<(), TypedStoreError> {
410 fail_point!("batch-write-before");
411 let ret = match (self, batch) {
412 (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
413 db.underlying
414 .write_opt(batch, writeopts)
415 .map_err(typed_store_err_from_rocks_err)?;
416 Ok(())
417 }
418 (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
419 db.underlying
420 .write_opt(batch, writeopts)
421 .map_err(typed_store_err_from_rocks_err)?;
422 Ok(())
423 }
424 _ => Err(TypedStoreError::RocksDB(
425 "using invalid batch type for the database".to_string(),
426 )),
427 };
428 fail_point!("batch-write-after");
429 #[expect(clippy::let_and_return)]
430 ret
431 }
432
433 pub fn transaction_without_snapshot(
434 &self,
435 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
436 match self {
437 Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
438 Self::DBWithThreadMode(_) => panic!(),
439 }
440 }
441
442 pub fn transaction(
443 &self,
444 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
445 match self {
446 Self::OptimisticTransactionDB(db) => {
447 let mut tx_opts = OptimisticTransactionOptions::new();
448 tx_opts.set_snapshot(true);
449
450 Ok(db
451 .underlying
452 .transaction_opt(&WriteOptions::default(), &tx_opts))
453 }
454 Self::DBWithThreadMode(_) => panic!(),
455 }
456 }
457
458 pub fn raw_iterator_cf<'a: 'b, 'b>(
459 &'a self,
460 cf_handle: &impl AsColumnFamilyRef,
461 readopts: ReadOptions,
462 ) -> RocksDBRawIter<'b> {
463 match self {
464 Self::DBWithThreadMode(db) => {
465 RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
466 }
467 Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
468 db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
469 ),
470 }
471 }
472
473 pub fn iterator_cf<'a: 'b, 'b>(
474 &'a self,
475 cf_handle: &impl AsColumnFamilyRef,
476 readopts: ReadOptions,
477 mode: IteratorMode<'_>,
478 ) -> RocksDBIter<'b> {
479 match self {
480 Self::DBWithThreadMode(db) => {
481 RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
482 }
483 Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
484 db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
485 ),
486 }
487 }
488
489 pub fn compact_range_cf<K: AsRef<[u8]>>(
490 &self,
491 cf: &impl AsColumnFamilyRef,
492 start: Option<K>,
493 end: Option<K>,
494 ) {
495 delegate_call!(self.compact_range_cf(cf, start, end))
496 }
497
498 pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
499 &self,
500 cf: &impl AsColumnFamilyRef,
501 start: Option<K>,
502 end: Option<K>,
503 ) {
504 let opt = &mut CompactOptions::default();
505 opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
506 delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
507 }
508
509 pub fn flush(&self) -> Result<(), TypedStoreError> {
510 delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
511 }
512
513 pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
514 match self {
515 Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
516 Self::OptimisticTransactionDB(d) => {
517 RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
518 }
519 }
520 }
521
522 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
523 let checkpoint = match self {
524 Self::DBWithThreadMode(d) => {
525 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
526 }
527 Self::OptimisticTransactionDB(d) => {
528 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
529 }
530 };
531 checkpoint
532 .create_checkpoint(path)
533 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
534 Ok(())
535 }
536
537 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
538 delegate_call!(self.flush_cf(cf))
539 }
540
541 pub fn set_options_cf(
542 &self,
543 cf: &impl AsColumnFamilyRef,
544 opts: &[(&str, &str)],
545 ) -> Result<(), rocksdb::Error> {
546 delegate_call!(self.set_options_cf(cf, opts))
547 }
548
549 pub fn get_sampling_interval(&self) -> SamplingInterval {
550 match self {
551 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
552 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
553 }
554 }
555
556 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
557 match self {
558 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
559 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
560 }
561 }
562
563 pub fn write_sampling_interval(&self) -> SamplingInterval {
564 match self {
565 Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
566 Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
567 }
568 }
569
570 pub fn iter_sampling_interval(&self) -> SamplingInterval {
571 match self {
572 Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
573 Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
574 }
575 }
576
577 pub fn db_name(&self) -> String {
578 let name = match self {
579 Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
580 Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
581 };
582 if name.is_empty() {
583 self.default_db_name()
584 } else {
585 name.clone()
586 }
587 }
588
589 fn default_db_name(&self) -> String {
590 self.path()
591 .file_name()
592 .and_then(|f| f.to_str())
593 .unwrap_or("unknown")
594 .to_string()
595 }
596
597 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
598 delegate_call!(self.live_files())
599 }
600}
601
602pub fn check_and_mark_db_corruption(path: &Path) -> Result<(), String> {
605 let db = rocksdb::DB::open_default(path).map_err(|e| e.to_string())?;
606
607 db.get(DB_CORRUPTED_KEY)
608 .map_err(|e| format!("Failed to open database: {e}"))
609 .and_then(|value| match value {
610 Some(v) if v[0] == 1 => Err(
611 "Database is corrupted, please remove the current database and start clean!"
612 .to_string(),
613 ),
614 Some(_) => Ok(()),
615 None => db
616 .put(DB_CORRUPTED_KEY, [1])
617 .map_err(|e| format!("Failed to set corrupted key in database: {e}")),
618 })?;
619
620 Ok(())
621}
622
623pub fn unmark_db_corruption(path: &Path) -> Result<(), Error> {
624 rocksdb::DB::open_default(path)?.put(DB_CORRUPTED_KEY, [0])
625}
626
627pub enum RocksDBSnapshot<'a> {
628 DBWithThreadMode(rocksdb::Snapshot<'a>),
629 OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
630}
631
632impl<'a> RocksDBSnapshot<'a> {
633 pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
634 &'a self,
635 keys: I,
636 readopts: ReadOptions,
637 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
638 where
639 K: AsRef<[u8]>,
640 I: IntoIterator<Item = (&'b W, K)>,
641 W: 'b + AsColumnFamilyRef,
642 {
643 match self {
644 Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
645 Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
646 }
647 }
648 pub fn multi_get_cf<'b: 'a, K, I, W>(
649 &'a self,
650 keys: I,
651 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
652 where
653 K: AsRef<[u8]>,
654 I: IntoIterator<Item = (&'b W, K)>,
655 W: 'b + AsColumnFamilyRef,
656 {
657 match self {
658 Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
659 Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
660 }
661 }
662}
663
664pub enum RocksDBBatch {
665 Regular(rocksdb::WriteBatch),
666 Transactional(rocksdb::WriteBatchWithTransaction<true>),
667}
668
669macro_rules! delegate_batch_call {
670 ($self:ident.$method:ident($($args:ident),*)) => {
671 match $self {
672 Self::Regular(b) => b.$method($($args),*),
673 Self::Transactional(b) => b.$method($($args),*),
674 }
675 }
676}
677
678impl RocksDBBatch {
679 fn size_in_bytes(&self) -> usize {
680 delegate_batch_call!(self.size_in_bytes())
681 }
682
683 pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
684 delegate_batch_call!(self.delete_cf(cf, key))
685 }
686
687 pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
688 where
689 K: AsRef<[u8]>,
690 V: AsRef<[u8]>,
691 {
692 delegate_batch_call!(self.put_cf(cf, key, value))
693 }
694
695 pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
696 where
697 K: AsRef<[u8]>,
698 V: AsRef<[u8]>,
699 {
700 delegate_batch_call!(self.merge_cf(cf, key, value))
701 }
702
703 pub fn delete_range_cf<K: AsRef<[u8]>>(
704 &mut self,
705 cf: &impl AsColumnFamilyRef,
706 from: K,
707 to: K,
708 ) -> Result<(), TypedStoreError> {
709 match self {
710 Self::Regular(batch) => {
711 batch.delete_range_cf(cf, from, to);
712 Ok(())
713 }
714 Self::Transactional(_) => panic!(),
715 }
716 }
717}
718
719#[derive(Debug, Default)]
720pub struct MetricConf {
721 pub db_name: String,
722 pub read_sample_interval: SamplingInterval,
723 pub write_sample_interval: SamplingInterval,
724 pub iter_sample_interval: SamplingInterval,
725}
726
727impl MetricConf {
728 pub fn new(db_name: &str) -> Self {
729 if db_name.is_empty() {
730 error!("A meaningful db name should be used for metrics reporting.")
731 }
732 Self {
733 db_name: db_name.to_string(),
734 read_sample_interval: SamplingInterval::default(),
735 write_sample_interval: SamplingInterval::default(),
736 iter_sample_interval: SamplingInterval::default(),
737 }
738 }
739
740 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
741 Self {
742 db_name: self.db_name,
743 read_sample_interval: read_interval,
744 write_sample_interval: SamplingInterval::default(),
745 iter_sample_interval: SamplingInterval::default(),
746 }
747 }
748}
749const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
750const METRICS_ERROR: i64 = -1;
751
752#[derive(Clone, Debug)]
754pub struct DBMap<K, V> {
755 pub rocksdb: Arc<RocksDB>,
756 _phantom: PhantomData<fn(K) -> V>,
757 cf: String,
759 pub opts: ReadWriteOptions,
760 db_metrics: Arc<DBMetrics>,
761 get_sample_interval: SamplingInterval,
762 multiget_sample_interval: SamplingInterval,
763 write_sample_interval: SamplingInterval,
764 iter_sample_interval: SamplingInterval,
765 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
766}
767
768unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
769
770impl<K, V> DBMap<K, V> {
771 pub(crate) fn new(
772 db: Arc<RocksDB>,
773 opts: &ReadWriteOptions,
774 opt_cf: &str,
775 is_deprecated: bool,
776 ) -> Self {
777 let db_cloned = db.clone();
778 let db_metrics = DBMetrics::get();
779 let db_metrics_cloned = db_metrics.clone();
780 let cf = opt_cf.to_string();
781 let (sender, mut recv) = tokio::sync::oneshot::channel();
782 if !is_deprecated {
783 tokio::task::spawn(async move {
784 let mut interval =
785 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
786 loop {
787 tokio::select! {
788 _ = interval.tick() => {
789 let db = db_cloned.clone();
790 let cf = cf.clone();
791 let db_metrics = db_metrics.clone();
792 if let Err(e) = tokio::task::spawn_blocking(move || {
793 Self::report_metrics(&db, &cf, &db_metrics);
794 }).await {
795 error!("Failed to log metrics with error: {}", e);
796 }
797 }
798 _ = &mut recv => break,
799 }
800 }
801 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
802 });
803 }
804 DBMap {
805 rocksdb: db.clone(),
806 opts: opts.clone(),
807 _phantom: PhantomData,
808 cf: opt_cf.to_string(),
809 db_metrics: db_metrics_cloned,
810 _metrics_task_cancel_handle: Arc::new(sender),
811 get_sample_interval: db.get_sampling_interval(),
812 multiget_sample_interval: db.multiget_sampling_interval(),
813 write_sample_interval: db.write_sampling_interval(),
814 iter_sample_interval: db.iter_sampling_interval(),
815 }
816 }
817
818 #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
825 pub fn open<P: AsRef<Path>>(
826 path: P,
827 metric_conf: MetricConf,
828 db_options: Option<rocksdb::Options>,
829 opt_cf: Option<&str>,
830 rw_options: &ReadWriteOptions,
831 ) -> Result<Self, TypedStoreError> {
832 let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
833 let cfs = vec![cf_key];
834 let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
835 Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
836 }
837
838 #[instrument(level = "debug", skip(db), err)]
878 pub fn reopen(
879 db: &Arc<RocksDB>,
880 opt_cf: Option<&str>,
881 rw_options: &ReadWriteOptions,
882 is_deprecated: bool,
883 ) -> Result<Self, TypedStoreError> {
884 let cf_key = opt_cf
885 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
886 .to_owned();
887
888 db.cf_handle(&cf_key)
889 .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
890
891 Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
892 }
893
894 pub fn batch(&self) -> DBBatch {
895 let batch = match *self.rocksdb {
896 RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
897 RocksDB::OptimisticTransactionDB(_) => {
898 RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
899 }
900 };
901 DBBatch::new(
902 &self.rocksdb,
903 batch,
904 self.opts.writeopts(),
905 &self.db_metrics,
906 &self.write_sample_interval,
907 )
908 }
909
910 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
911 let from_buf = be_fix_int_ser(start)?;
912 let to_buf = be_fix_int_ser(end)?;
913 self.rocksdb
914 .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
915 Ok(())
916 }
917
918 pub fn compact_range_raw(
919 &self,
920 cf_name: &str,
921 start: Vec<u8>,
922 end: Vec<u8>,
923 ) -> Result<(), TypedStoreError> {
924 let cf = self
925 .rocksdb
926 .cf_handle(cf_name)
927 .expect("compact range: column family does not exist");
928 self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
929 Ok(())
930 }
931
932 pub fn compact_range_to_bottom<J: Serialize>(
933 &self,
934 start: &J,
935 end: &J,
936 ) -> Result<(), TypedStoreError> {
937 let from_buf = be_fix_int_ser(start)?;
938 let to_buf = be_fix_int_ser(end)?;
939 self.rocksdb
940 .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
941 Ok(())
942 }
943
944 pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
945 self.rocksdb
946 .cf_handle(&self.cf)
947 .expect("Map-keying column family should have been checked at DB creation")
948 }
949
950 pub fn iterator_cf(&self) -> RocksDBIter<'_> {
951 self.rocksdb
952 .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
953 }
954
955 pub fn flush(&self) -> Result<(), TypedStoreError> {
956 self.rocksdb
957 .flush_cf(&self.cf())
958 .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
959 }
960
961 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
962 self.rocksdb.set_options_cf(&self.cf(), opts)
963 }
964
965 fn get_int_property(
966 rocksdb: &RocksDB,
967 cf: &impl AsColumnFamilyRef,
968 property_name: &std::ffi::CStr,
969 ) -> Result<i64, TypedStoreError> {
970 match rocksdb.property_int_value_cf(cf, property_name) {
971 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
972 Ok(None) => Ok(0),
973 Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
974 }
975 }
976
977 fn multi_get_pinned<J>(
979 &self,
980 keys: impl IntoIterator<Item = J>,
981 ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
982 where
983 J: Borrow<K>,
984 K: Serialize,
985 {
986 let _timer = self
987 .db_metrics
988 .op_metrics
989 .rocksdb_multiget_latency_seconds
990 .with_label_values(&[&self.cf])
991 .start_timer();
992 let perf_ctx = if self.multiget_sample_interval.sample() {
993 Some(RocksDBPerfContext)
994 } else {
995 None
996 };
997 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
998 .into_iter()
999 .map(|k| be_fix_int_ser(k.borrow()))
1000 .collect();
1001 let results: Result<Vec<_>, TypedStoreError> = self
1002 .rocksdb
1003 .batched_multi_get_cf_opt(
1004 &self.cf(),
1005 keys_bytes?,
1006 false,
1008 &self.opts.readopts(),
1009 )
1010 .into_iter()
1011 .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
1012 .collect();
1013 let entries = results?;
1014 let entry_size = entries
1015 .iter()
1016 .flatten()
1017 .map(|entry| entry.len())
1018 .sum::<usize>();
1019 self.db_metrics
1020 .op_metrics
1021 .rocksdb_multiget_bytes
1022 .with_label_values(&[&self.cf])
1023 .observe(entry_size as f64);
1024 if perf_ctx.is_some() {
1025 self.db_metrics
1026 .read_perf_ctx_metrics
1027 .report_metrics(&self.cf);
1028 }
1029 Ok(entries)
1030 }
1031
1032 fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
1033 let Some(cf) = rocksdb.cf_handle(cf_name) else {
1034 tracing::warn!(
1035 "unable to report metrics for cf {cf_name:?} in db {:?}",
1036 rocksdb.db_name()
1037 );
1038 return;
1039 };
1040
1041 db_metrics
1042 .cf_metrics
1043 .rocksdb_total_sst_files_size
1044 .with_label_values(&[cf_name])
1045 .set(
1046 Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1047 .unwrap_or(METRICS_ERROR),
1048 );
1049 db_metrics
1050 .cf_metrics
1051 .rocksdb_total_blob_files_size
1052 .with_label_values(&[cf_name])
1053 .set(
1054 Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1055 .unwrap_or(METRICS_ERROR),
1056 );
1057 let total_num_files: i64 = (0..=6)
1061 .map(|level| {
1062 Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
1063 .unwrap_or(METRICS_ERROR)
1064 })
1065 .sum();
1066 db_metrics
1067 .cf_metrics
1068 .rocksdb_total_num_files
1069 .with_label_values(&[cf_name])
1070 .set(total_num_files);
1071 db_metrics
1072 .cf_metrics
1073 .rocksdb_num_level0_files
1074 .with_label_values(&[cf_name])
1075 .set(
1076 Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
1077 .unwrap_or(METRICS_ERROR),
1078 );
1079 db_metrics
1080 .cf_metrics
1081 .rocksdb_current_size_active_mem_tables
1082 .with_label_values(&[cf_name])
1083 .set(
1084 Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1085 .unwrap_or(METRICS_ERROR),
1086 );
1087 db_metrics
1088 .cf_metrics
1089 .rocksdb_size_all_mem_tables
1090 .with_label_values(&[cf_name])
1091 .set(
1092 Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1093 .unwrap_or(METRICS_ERROR),
1094 );
1095 db_metrics
1096 .cf_metrics
1097 .rocksdb_num_snapshots
1098 .with_label_values(&[cf_name])
1099 .set(
1100 Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1101 .unwrap_or(METRICS_ERROR),
1102 );
1103 db_metrics
1104 .cf_metrics
1105 .rocksdb_oldest_snapshot_time
1106 .with_label_values(&[cf_name])
1107 .set(
1108 Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1109 .unwrap_or(METRICS_ERROR),
1110 );
1111 db_metrics
1112 .cf_metrics
1113 .rocksdb_actual_delayed_write_rate
1114 .with_label_values(&[cf_name])
1115 .set(
1116 Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1117 .unwrap_or(METRICS_ERROR),
1118 );
1119 db_metrics
1120 .cf_metrics
1121 .rocksdb_is_write_stopped
1122 .with_label_values(&[cf_name])
1123 .set(
1124 Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1125 .unwrap_or(METRICS_ERROR),
1126 );
1127 db_metrics
1128 .cf_metrics
1129 .rocksdb_block_cache_capacity
1130 .with_label_values(&[cf_name])
1131 .set(
1132 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1133 .unwrap_or(METRICS_ERROR),
1134 );
1135 db_metrics
1136 .cf_metrics
1137 .rocksdb_block_cache_usage
1138 .with_label_values(&[cf_name])
1139 .set(
1140 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1141 .unwrap_or(METRICS_ERROR),
1142 );
1143 db_metrics
1144 .cf_metrics
1145 .rocksdb_block_cache_pinned_usage
1146 .with_label_values(&[cf_name])
1147 .set(
1148 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1149 .unwrap_or(METRICS_ERROR),
1150 );
1151 db_metrics
1152 .cf_metrics
1153 .rocksdb_estimate_table_readers_mem
1154 .with_label_values(&[cf_name])
1155 .set(
1156 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1157 .unwrap_or(METRICS_ERROR),
1158 );
1159 db_metrics
1160 .cf_metrics
1161 .rocksdb_estimated_num_keys
1162 .with_label_values(&[cf_name])
1163 .set(
1164 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1165 .unwrap_or(METRICS_ERROR),
1166 );
1167 db_metrics
1168 .cf_metrics
1169 .rocksdb_num_immutable_mem_tables
1170 .with_label_values(&[cf_name])
1171 .set(
1172 Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1173 .unwrap_or(METRICS_ERROR),
1174 );
1175 db_metrics
1176 .cf_metrics
1177 .rocksdb_mem_table_flush_pending
1178 .with_label_values(&[cf_name])
1179 .set(
1180 Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1181 .unwrap_or(METRICS_ERROR),
1182 );
1183 db_metrics
1184 .cf_metrics
1185 .rocksdb_compaction_pending
1186 .with_label_values(&[cf_name])
1187 .set(
1188 Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1189 .unwrap_or(METRICS_ERROR),
1190 );
1191 db_metrics
1192 .cf_metrics
1193 .rocksdb_estimate_pending_compaction_bytes
1194 .with_label_values(&[cf_name])
1195 .set(
1196 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1197 .unwrap_or(METRICS_ERROR),
1198 );
1199 db_metrics
1200 .cf_metrics
1201 .rocksdb_num_running_compactions
1202 .with_label_values(&[cf_name])
1203 .set(
1204 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1205 .unwrap_or(METRICS_ERROR),
1206 );
1207 db_metrics
1208 .cf_metrics
1209 .rocksdb_num_running_flushes
1210 .with_label_values(&[cf_name])
1211 .set(
1212 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1213 .unwrap_or(METRICS_ERROR),
1214 );
1215 db_metrics
1216 .cf_metrics
1217 .rocksdb_estimate_oldest_key_time
1218 .with_label_values(&[cf_name])
1219 .set(
1220 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1221 .unwrap_or(METRICS_ERROR),
1222 );
1223 db_metrics
1224 .cf_metrics
1225 .rocksdb_background_errors
1226 .with_label_values(&[cf_name])
1227 .set(
1228 Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1229 .unwrap_or(METRICS_ERROR),
1230 );
1231 db_metrics
1232 .cf_metrics
1233 .rocksdb_base_level
1234 .with_label_values(&[cf_name])
1235 .set(
1236 Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1237 .unwrap_or(METRICS_ERROR),
1238 );
1239 }
1240
1241 pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1242 DBTransaction::new(&self.rocksdb)
1243 }
1244
1245 pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1246 DBTransaction::new_without_snapshot(&self.rocksdb)
1247 }
1248
1249 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1250 self.rocksdb.checkpoint(path)
1251 }
1252
1253 pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1254 Ok(self.rocksdb.snapshot())
1255 }
1256
1257 pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1258 let mut num_keys = 0;
1259 let mut key_bytes_total = 0;
1260 let mut value_bytes_total = 0;
1261 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1262 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1263 let iter = self.iterator_cf().map(Result::unwrap);
1264 for (key, value) in iter {
1265 num_keys += 1;
1266 key_bytes_total += key.len();
1267 value_bytes_total += value.len();
1268 key_hist.record(key.len() as u64)?;
1269 value_hist.record(value.len() as u64)?;
1270 }
1271 Ok(TableSummary {
1272 num_keys,
1273 key_bytes_total,
1274 value_bytes_total,
1275 key_hist,
1276 value_hist,
1277 })
1278 }
1279
1280 fn create_iter_context(
1282 &self,
1283 ) -> (
1284 Option<HistogramTimer>,
1285 Option<Histogram>,
1286 Option<Histogram>,
1287 Option<RocksDBPerfContext>,
1288 ) {
1289 let timer = self
1290 .db_metrics
1291 .op_metrics
1292 .rocksdb_iter_latency_seconds
1293 .with_label_values(&[&self.cf])
1294 .start_timer();
1295 let bytes_scanned = self
1296 .db_metrics
1297 .op_metrics
1298 .rocksdb_iter_bytes
1299 .with_label_values(&[&self.cf]);
1300 let keys_scanned = self
1301 .db_metrics
1302 .op_metrics
1303 .rocksdb_iter_keys
1304 .with_label_values(&[&self.cf]);
1305 let perf_ctx = if self.iter_sample_interval.sample() {
1306 Some(RocksDBPerfContext)
1307 } else {
1308 None
1309 };
1310 (
1311 Some(timer),
1312 Some(bytes_scanned),
1313 Some(keys_scanned),
1314 perf_ctx,
1315 )
1316 }
1317
1318 fn create_read_options_with_bounds(
1321 &self,
1322 lower_bound: Option<K>,
1323 upper_bound: Option<K>,
1324 ) -> ReadOptions
1325 where
1326 K: Serialize,
1327 {
1328 let mut readopts = self.opts.readopts();
1329 if let Some(lower_bound) = lower_bound {
1330 let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1331 readopts.set_iterate_lower_bound(key_buf);
1332 }
1333 if let Some(upper_bound) = upper_bound {
1334 let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1335 readopts.set_iterate_upper_bound(key_buf);
1336 }
1337 readopts
1338 }
1339
1340 fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1343 where
1344 K: Serialize,
1345 {
1346 let mut readopts = self.opts.readopts();
1347
1348 let lower_bound = range.start_bound();
1349 let upper_bound = range.end_bound();
1350
1351 match lower_bound {
1352 Bound::Included(lower_bound) => {
1353 let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1355 readopts.set_iterate_lower_bound(key_buf);
1356 }
1357 Bound::Excluded(lower_bound) => {
1358 let mut key_buf =
1359 be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1360
1361 big_endian_saturating_add_one(&mut key_buf);
1363 readopts.set_iterate_lower_bound(key_buf);
1364 }
1365 Bound::Unbounded => (),
1366 };
1367
1368 match upper_bound {
1369 Bound::Included(upper_bound) => {
1370 let mut key_buf =
1371 be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1372
1373 if !is_max(&key_buf) {
1376 big_endian_saturating_add_one(&mut key_buf);
1378 readopts.set_iterate_upper_bound(key_buf);
1379 }
1380 }
1381 Bound::Excluded(upper_bound) => {
1382 let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1384 readopts.set_iterate_upper_bound(key_buf);
1385 }
1386 Bound::Unbounded => (),
1387 };
1388
1389 readopts
1390 }
1391}
1392
1393pub struct DBBatch {
1460 rocksdb: Arc<RocksDB>,
1461 batch: RocksDBBatch,
1462 opts: WriteOptions,
1463 db_metrics: Arc<DBMetrics>,
1464 write_sample_interval: SamplingInterval,
1465}
1466
1467impl DBBatch {
1468 pub fn new(
1472 dbref: &Arc<RocksDB>,
1473 batch: RocksDBBatch,
1474 opts: WriteOptions,
1475 db_metrics: &Arc<DBMetrics>,
1476 write_sample_interval: &SamplingInterval,
1477 ) -> Self {
1478 DBBatch {
1479 rocksdb: dbref.clone(),
1480 batch,
1481 opts,
1482 db_metrics: db_metrics.clone(),
1483 write_sample_interval: write_sample_interval.clone(),
1484 }
1485 }
1486
1487 #[instrument(level = "trace", skip_all, err)]
1489 pub fn write(self) -> Result<(), TypedStoreError> {
1490 let db_name = self.rocksdb.db_name();
1491 let timer = self
1492 .db_metrics
1493 .op_metrics
1494 .rocksdb_batch_commit_latency_seconds
1495 .with_label_values(&[&db_name])
1496 .start_timer();
1497 let batch_size = self.batch.size_in_bytes();
1498
1499 let perf_ctx = if self.write_sample_interval.sample() {
1500 Some(RocksDBPerfContext)
1501 } else {
1502 None
1503 };
1504 self.rocksdb.write(self.batch, &self.opts)?;
1505 self.db_metrics
1506 .op_metrics
1507 .rocksdb_batch_commit_bytes
1508 .with_label_values(&[&db_name])
1509 .observe(batch_size as f64);
1510
1511 if perf_ctx.is_some() {
1512 self.db_metrics
1513 .write_perf_ctx_metrics
1514 .report_metrics(&db_name);
1515 }
1516 let elapsed = timer.stop_and_record();
1517 if elapsed > 1.0 {
1518 warn!(?elapsed, ?db_name, "very slow batch write");
1519 self.db_metrics
1520 .op_metrics
1521 .rocksdb_very_slow_batch_writes_count
1522 .with_label_values(&[&db_name])
1523 .inc();
1524 self.db_metrics
1525 .op_metrics
1526 .rocksdb_very_slow_batch_writes_duration_ms
1527 .with_label_values(&[&db_name])
1528 .inc_by((elapsed * 1000.0) as u64);
1529 }
1530 Ok(())
1531 }
1532
1533 pub fn size_in_bytes(&self) -> usize {
1534 self.batch.size_in_bytes()
1535 }
1536}
1537
1538impl DBBatch {
1539 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1540 &mut self,
1541 db: &DBMap<K, V>,
1542 purged_vals: impl IntoIterator<Item = J>,
1543 ) -> Result<(), TypedStoreError> {
1544 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1545 return Err(TypedStoreError::CrossDBBatch);
1546 }
1547
1548 purged_vals
1549 .into_iter()
1550 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1551 let k_buf = be_fix_int_ser(k.borrow())?;
1552 self.batch.delete_cf(&db.cf(), k_buf);
1553
1554 Ok(())
1555 })?;
1556 Ok(())
1557 }
1558
1559 pub fn schedule_delete_range<K: Serialize, V>(
1569 &mut self,
1570 db: &DBMap<K, V>,
1571 from: &K,
1572 to: &K,
1573 ) -> Result<(), TypedStoreError> {
1574 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1575 return Err(TypedStoreError::CrossDBBatch);
1576 }
1577
1578 let from_buf = be_fix_int_ser(from)?;
1579 let to_buf = be_fix_int_ser(to)?;
1580
1581 self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1582 Ok(())
1583 }
1584
1585 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1587 &mut self,
1588 db: &DBMap<K, V>,
1589 new_vals: impl IntoIterator<Item = (J, U)>,
1590 ) -> Result<&mut Self, TypedStoreError> {
1591 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1592 return Err(TypedStoreError::CrossDBBatch);
1593 }
1594 let mut total = 0usize;
1595 new_vals
1596 .into_iter()
1597 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1598 let k_buf = be_fix_int_ser(k.borrow())?;
1599 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1600 total += k_buf.len() + v_buf.len();
1601 self.batch.put_cf(&db.cf(), k_buf, v_buf);
1602 Ok(())
1603 })?;
1604 self.db_metrics
1605 .op_metrics
1606 .rocksdb_batch_put_bytes
1607 .with_label_values(&[&db.cf])
1608 .observe(total as f64);
1609 Ok(self)
1610 }
1611
1612 pub fn merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1614 &mut self,
1615 db: &DBMap<K, V>,
1616 new_vals: impl IntoIterator<Item = (J, U)>,
1617 ) -> Result<&mut Self, TypedStoreError> {
1618 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1619 return Err(TypedStoreError::CrossDBBatch);
1620 }
1621
1622 new_vals
1623 .into_iter()
1624 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1625 let k_buf = be_fix_int_ser(k.borrow())?;
1626 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1627 self.batch.merge_cf(&db.cf(), k_buf, v_buf);
1628 Ok(())
1629 })?;
1630 Ok(self)
1631 }
1632
1633 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, V: Serialize, B: AsRef<[u8]>>(
1635 &mut self,
1636 db: &DBMap<K, V>,
1637 new_vals: impl IntoIterator<Item = (J, B)>,
1638 ) -> Result<&mut Self, TypedStoreError> {
1639 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1640 return Err(TypedStoreError::CrossDBBatch);
1641 }
1642 new_vals
1643 .into_iter()
1644 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1645 let k_buf = be_fix_int_ser(k.borrow())?;
1646 self.batch.merge_cf(&db.cf(), k_buf, v);
1647 Ok(())
1648 })?;
1649 Ok(self)
1650 }
1651}
1652
1653pub struct DBTransaction<'a> {
1654 rocksdb: Arc<RocksDB>,
1655 transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1656}
1657
1658impl<'a> DBTransaction<'a> {
1659 pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1660 Ok(Self {
1661 rocksdb: db.clone(),
1662 transaction: db.transaction()?,
1663 })
1664 }
1665
1666 pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1667 Ok(Self {
1668 rocksdb: db.clone(),
1669 transaction: db.transaction_without_snapshot()?,
1670 })
1671 }
1672
1673 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1674 &mut self,
1675 db: &DBMap<K, V>,
1676 new_vals: impl IntoIterator<Item = (J, U)>,
1677 ) -> Result<&mut Self, TypedStoreError> {
1678 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1679 return Err(TypedStoreError::CrossDBBatch);
1680 }
1681
1682 new_vals
1683 .into_iter()
1684 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1685 let k_buf = be_fix_int_ser(k.borrow())?;
1686 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1687 self.transaction
1688 .put_cf(&db.cf(), k_buf, v_buf)
1689 .map_err(typed_store_err_from_rocks_err)?;
1690 Ok(())
1691 })?;
1692 Ok(self)
1693 }
1694
1695 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1697 &mut self,
1698 db: &DBMap<K, V>,
1699 purged_vals: impl IntoIterator<Item = J>,
1700 ) -> Result<&mut Self, TypedStoreError> {
1701 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1702 return Err(TypedStoreError::CrossDBBatch);
1703 }
1704 purged_vals
1705 .into_iter()
1706 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1707 let k_buf = be_fix_int_ser(k.borrow())?;
1708 self.transaction
1709 .delete_cf(&db.cf(), k_buf)
1710 .map_err(typed_store_err_from_rocks_err)?;
1711 Ok(())
1712 })?;
1713 Ok(self)
1714 }
1715
1716 pub fn snapshot(
1717 &self,
1718 ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1719 {
1720 self.transaction.snapshot()
1721 }
1722
1723 pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1724 &self,
1725 db: &DBMap<K, V>,
1726 key: &K,
1727 ) -> Result<Option<V>, TypedStoreError> {
1728 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1729 return Err(TypedStoreError::CrossDBBatch);
1730 }
1731 let k_buf = be_fix_int_ser(key)?;
1732 match self
1733 .transaction
1734 .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1735 .map_err(typed_store_err_from_rocks_err)?
1736 {
1737 Some(data) => Ok(Some(
1738 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1739 )),
1740 None => Ok(None),
1741 }
1742 }
1743
1744 pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1745 &self,
1746 db: &DBMap<K, V>,
1747 key: &K,
1748 ) -> Result<Option<V>, TypedStoreError> {
1749 let key_buf = be_fix_int_ser(key)?;
1750 self.transaction
1751 .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1752 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1753 .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1754 }
1755
1756 pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1757 &self,
1758 db: &DBMap<K, V>,
1759 keys: impl IntoIterator<Item = J>,
1760 ) -> Result<Vec<Option<V>>, TypedStoreError> {
1761 let cf = db.cf();
1762 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1763 .into_iter()
1764 .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1765 .collect();
1766
1767 let results = self
1768 .transaction
1769 .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1770
1771 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1772 .into_iter()
1773 .map(
1774 |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1775 Some(data) => Ok(Some(
1776 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1777 )),
1778 None => Ok(None),
1779 },
1780 )
1781 .collect();
1782
1783 values_parsed
1784 }
1785
1786 pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1787 &'a self,
1788 db: &DBMap<K, V>,
1789 ) -> Iter<'a, K, V> {
1790 let db_iter = self
1791 .transaction
1792 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1793 Iter::new(
1794 db.cf.clone(),
1795 RocksDBRawIter::OptimisticTransaction(db_iter),
1796 None,
1797 None,
1798 None,
1799 None,
1800 None,
1801 )
1802 }
1803
1804 pub fn keys<K: DeserializeOwned, V: DeserializeOwned>(
1805 &'a self,
1806 db: &DBMap<K, V>,
1807 ) -> Keys<'a, K> {
1808 let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1809 self.transaction
1810 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1811 );
1812 db_iter.seek_to_first();
1813
1814 Keys::new(db_iter)
1815 }
1816
1817 pub fn values<K: DeserializeOwned, V: DeserializeOwned>(
1818 &'a self,
1819 db: &DBMap<K, V>,
1820 ) -> Values<'a, V> {
1821 let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1822 self.transaction
1823 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1824 );
1825 db_iter.seek_to_first();
1826
1827 Values::new(db_iter)
1828 }
1829
1830 pub fn commit(self) -> Result<(), TypedStoreError> {
1831 fail_point!("transaction-commit");
1832 self.transaction.commit().map_err(|e| match e.kind() {
1833 ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1836 _ => typed_store_err_from_rocks_err(e),
1837 })?;
1838 Ok(())
1839 }
1840}
1841
1842macro_rules! delegate_iter_call {
1843 ($self:ident.$method:ident($($args:ident),*)) => {
1844 match $self {
1845 Self::DB(db) => db.$method($($args),*),
1846 Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1847 Self::OptimisticTransaction(db) => db.$method($($args),*),
1848 }
1849 }
1850}
1851
1852pub enum RocksDBRawIter<'a> {
1853 DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1854 OptimisticTransactionDB(
1855 rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1856 ),
1857 OptimisticTransaction(
1858 rocksdb::DBRawIteratorWithThreadMode<
1859 'a,
1860 Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1861 >,
1862 ),
1863}
1864
1865impl RocksDBRawIter<'_> {
1866 pub fn valid(&self) -> bool {
1867 delegate_iter_call!(self.valid())
1868 }
1869 pub fn key(&self) -> Option<&[u8]> {
1870 delegate_iter_call!(self.key())
1871 }
1872 pub fn value(&self) -> Option<&[u8]> {
1873 delegate_iter_call!(self.value())
1874 }
1875 pub fn next(&mut self) {
1876 delegate_iter_call!(self.next())
1877 }
1878 pub fn prev(&mut self) {
1879 delegate_iter_call!(self.prev())
1880 }
1881 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1882 delegate_iter_call!(self.seek(key))
1883 }
1884 pub fn seek_to_last(&mut self) {
1885 delegate_iter_call!(self.seek_to_last())
1886 }
1887 pub fn seek_to_first(&mut self) {
1888 delegate_iter_call!(self.seek_to_first())
1889 }
1890 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1891 delegate_iter_call!(self.seek_for_prev(key))
1892 }
1893 pub fn status(&self) -> Result<(), rocksdb::Error> {
1894 delegate_iter_call!(self.status())
1895 }
1896}
1897
1898pub enum RocksDBIter<'a> {
1899 DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1900 OptimisticTransactionDB(
1901 rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1902 ),
1903}
1904
1905impl Iterator for RocksDBIter<'_> {
1906 type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1907 fn next(&mut self) -> Option<Self::Item> {
1908 match self {
1909 Self::DB(db) => db.next(),
1910 Self::OptimisticTransactionDB(db) => db.next(),
1911 }
1912 }
1913}
1914
1915impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1916where
1917 K: Serialize + DeserializeOwned,
1918 V: Serialize + DeserializeOwned,
1919{
1920 type Error = TypedStoreError;
1921 type Iterator = Iter<'a, K, V>;
1922 type SafeIterator = SafeIter<'a, K, V>;
1923 type Keys = Keys<'a, K>;
1924 type Values = Values<'a, V>;
1925
1926 #[instrument(level = "trace", skip_all, err)]
1927 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1928 let key_buf = be_fix_int_ser(key)?;
1929 let readopts = self.opts.readopts();
1932 Ok(self
1933 .rocksdb
1934 .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1935 && self
1936 .rocksdb
1937 .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1938 .map_err(typed_store_err_from_rocks_err)?
1939 .is_some())
1940 }
1941
1942 #[instrument(level = "trace", skip_all, err)]
1943 fn multi_contains_keys<J>(
1944 &self,
1945 keys: impl IntoIterator<Item = J>,
1946 ) -> Result<Vec<bool>, Self::Error>
1947 where
1948 J: Borrow<K>,
1949 {
1950 let values = self.multi_get_pinned(keys)?;
1951 Ok(values.into_iter().map(|v| v.is_some()).collect())
1952 }
1953
1954 #[instrument(level = "trace", skip_all, err)]
1955 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1956 let _timer = self
1957 .db_metrics
1958 .op_metrics
1959 .rocksdb_get_latency_seconds
1960 .with_label_values(&[&self.cf])
1961 .start_timer();
1962 let perf_ctx = if self.get_sample_interval.sample() {
1963 Some(RocksDBPerfContext)
1964 } else {
1965 None
1966 };
1967 let key_buf = be_fix_int_ser(key)?;
1968 let res = self
1969 .rocksdb
1970 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1971 .map_err(typed_store_err_from_rocks_err)?;
1972 self.db_metrics
1973 .op_metrics
1974 .rocksdb_get_bytes
1975 .with_label_values(&[&self.cf])
1976 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1977 if perf_ctx.is_some() {
1978 self.db_metrics
1979 .read_perf_ctx_metrics
1980 .report_metrics(&self.cf);
1981 }
1982 match res {
1983 Some(data) => Ok(Some(
1984 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1985 )),
1986 None => Ok(None),
1987 }
1988 }
1989
1990 #[instrument(level = "trace", skip_all, err)]
1991 fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
1992 let _timer = self
1993 .db_metrics
1994 .op_metrics
1995 .rocksdb_get_latency_seconds
1996 .with_label_values(&[&self.cf])
1997 .start_timer();
1998 let perf_ctx = if self.get_sample_interval.sample() {
1999 Some(RocksDBPerfContext)
2000 } else {
2001 None
2002 };
2003 let key_buf = be_fix_int_ser(key)?;
2004 let res = self
2005 .rocksdb
2006 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
2007 .map_err(typed_store_err_from_rocks_err)?;
2008 self.db_metrics
2009 .op_metrics
2010 .rocksdb_get_bytes
2011 .with_label_values(&[&self.cf])
2012 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
2013 if perf_ctx.is_some() {
2014 self.db_metrics
2015 .read_perf_ctx_metrics
2016 .report_metrics(&self.cf);
2017 }
2018 match res {
2019 Some(data) => Ok(Some(data.to_vec())),
2020 None => Ok(None),
2021 }
2022 }
2023
2024 #[instrument(level = "trace", skip_all, err)]
2025 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
2026 let timer = self
2027 .db_metrics
2028 .op_metrics
2029 .rocksdb_put_latency_seconds
2030 .with_label_values(&[&self.cf])
2031 .start_timer();
2032 let perf_ctx = if self.write_sample_interval.sample() {
2033 Some(RocksDBPerfContext)
2034 } else {
2035 None
2036 };
2037 let key_buf = be_fix_int_ser(key)?;
2038 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
2039 self.db_metrics
2040 .op_metrics
2041 .rocksdb_put_bytes
2042 .with_label_values(&[&self.cf])
2043 .observe((key_buf.len() + value_buf.len()) as f64);
2044 if perf_ctx.is_some() {
2045 self.db_metrics
2046 .write_perf_ctx_metrics
2047 .report_metrics(&self.cf);
2048 }
2049 self.rocksdb
2050 .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
2051 .map_err(typed_store_err_from_rocks_err)?;
2052
2053 let elapsed = timer.stop_and_record();
2054 if elapsed > 1.0 {
2055 warn!(?elapsed, cf = ?self.cf, "very slow insert");
2056 self.db_metrics
2057 .op_metrics
2058 .rocksdb_very_slow_puts_count
2059 .with_label_values(&[&self.cf])
2060 .inc();
2061 self.db_metrics
2062 .op_metrics
2063 .rocksdb_very_slow_puts_duration_ms
2064 .with_label_values(&[&self.cf])
2065 .inc_by((elapsed * 1000.0) as u64);
2066 }
2067
2068 Ok(())
2069 }
2070
2071 #[instrument(level = "trace", skip_all, err)]
2072 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
2073 let _timer = self
2074 .db_metrics
2075 .op_metrics
2076 .rocksdb_delete_latency_seconds
2077 .with_label_values(&[&self.cf])
2078 .start_timer();
2079 let perf_ctx = if self.write_sample_interval.sample() {
2080 Some(RocksDBPerfContext)
2081 } else {
2082 None
2083 };
2084 let key_buf = be_fix_int_ser(key)?;
2085 self.rocksdb
2086 .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2087 .map_err(typed_store_err_from_rocks_err)?;
2088 self.db_metrics
2089 .op_metrics
2090 .rocksdb_deletes
2091 .with_label_values(&[&self.cf])
2092 .inc();
2093 if perf_ctx.is_some() {
2094 self.db_metrics
2095 .write_perf_ctx_metrics
2096 .report_metrics(&self.cf);
2097 }
2098 Ok(())
2099 }
2100
2101 #[instrument(level = "trace", skip_all, err)]
2111 fn delete_file_in_range(&self, from: &K, to: &K) -> Result<(), TypedStoreError> {
2112 let from_buf = be_fix_int_ser(from.borrow())?;
2113 let to_buf = be_fix_int_ser(to.borrow())?;
2114 self.rocksdb
2115 .delete_file_in_range(&self.cf(), from_buf, to_buf)
2116 .map_err(typed_store_err_from_rocks_err)?;
2117 Ok(())
2118 }
2119
2120 #[instrument(level = "trace", skip_all, err)]
2125 fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2126 let _ = self.rocksdb.drop_cf(&self.cf);
2127 self.rocksdb
2128 .create_cf(self.cf.clone(), &default_db_options().options)
2129 .map_err(typed_store_err_from_rocks_err)?;
2130 Ok(())
2131 }
2132
2133 #[instrument(level = "trace", skip_all, err)]
2142 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2143 let mut iter = self.unbounded_iter().seek_to_first();
2144 let first_key = iter.next().map(|(k, _v)| k);
2145 let last_key = iter.skip_to_last().next().map(|(k, _v)| k);
2146 if let Some((first_key, last_key)) = first_key.zip(last_key) {
2147 let mut batch = self.batch();
2148 batch.schedule_delete_range(self, &first_key, &last_key)?;
2149 batch.write()?;
2150 }
2151 Ok(())
2152 }
2153
2154 fn is_empty(&self) -> bool {
2155 self.safe_iter().next().is_none()
2156 }
2157
2158 fn unbounded_iter(&'a self) -> Self::Iterator {
2161 let db_iter = self
2162 .rocksdb
2163 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2164 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2165 Iter::new(
2166 self.cf.clone(),
2167 db_iter,
2168 _timer,
2169 _perf_ctx,
2170 bytes_scanned,
2171 keys_scanned,
2172 Some(self.db_metrics.clone()),
2173 )
2174 }
2175
2176 fn iter_with_bounds(
2180 &'a self,
2181 lower_bound: Option<K>,
2182 upper_bound: Option<K>,
2183 ) -> Self::Iterator {
2184 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2185 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2186 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2187 Iter::new(
2188 self.cf.clone(),
2189 db_iter,
2190 _timer,
2191 _perf_ctx,
2192 bytes_scanned,
2193 keys_scanned,
2194 Some(self.db_metrics.clone()),
2195 )
2196 }
2197
2198 fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2201 let readopts = self.create_read_options_with_range(range);
2202 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2203 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2204 Iter::new(
2205 self.cf.clone(),
2206 db_iter,
2207 _timer,
2208 _perf_ctx,
2209 bytes_scanned,
2210 keys_scanned,
2211 Some(self.db_metrics.clone()),
2212 )
2213 }
2214
2215 fn safe_iter(&'a self) -> Self::SafeIterator {
2216 let db_iter = self
2217 .rocksdb
2218 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2219 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2220 SafeIter::new(
2221 self.cf.clone(),
2222 db_iter,
2223 _timer,
2224 _perf_ctx,
2225 bytes_scanned,
2226 keys_scanned,
2227 Some(self.db_metrics.clone()),
2228 )
2229 }
2230
2231 fn safe_iter_with_bounds(
2232 &'a self,
2233 lower_bound: Option<K>,
2234 upper_bound: Option<K>,
2235 ) -> Self::SafeIterator {
2236 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2237 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2238 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2239 SafeIter::new(
2240 self.cf.clone(),
2241 db_iter,
2242 _timer,
2243 _perf_ctx,
2244 bytes_scanned,
2245 keys_scanned,
2246 Some(self.db_metrics.clone()),
2247 )
2248 }
2249
2250 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2251 let readopts = self.create_read_options_with_range(range);
2252 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2253 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2254 SafeIter::new(
2255 self.cf.clone(),
2256 db_iter,
2257 _timer,
2258 _perf_ctx,
2259 bytes_scanned,
2260 keys_scanned,
2261 Some(self.db_metrics.clone()),
2262 )
2263 }
2264
2265 fn keys(&'a self) -> Self::Keys {
2266 let mut db_iter = self
2267 .rocksdb
2268 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2269 db_iter.seek_to_first();
2270
2271 Keys::new(db_iter)
2272 }
2273
2274 fn values(&'a self) -> Self::Values {
2275 let mut db_iter = self
2276 .rocksdb
2277 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2278 db_iter.seek_to_first();
2279
2280 Values::new(db_iter)
2281 }
2282
2283 #[instrument(level = "trace", skip_all, err)]
2285 fn multi_get_raw_bytes<J>(
2286 &self,
2287 keys: impl IntoIterator<Item = J>,
2288 ) -> Result<Vec<Option<Vec<u8>>>, TypedStoreError>
2289 where
2290 J: Borrow<K>,
2291 {
2292 let results = self
2293 .multi_get_pinned(keys)?
2294 .into_iter()
2295 .map(|val| val.map(|v| v.to_vec()))
2296 .collect();
2297 Ok(results)
2298 }
2299
2300 #[instrument(level = "trace", skip_all, err)]
2302 fn multi_get<J>(
2303 &self,
2304 keys: impl IntoIterator<Item = J>,
2305 ) -> Result<Vec<Option<V>>, TypedStoreError>
2306 where
2307 J: Borrow<K>,
2308 {
2309 let results = self.multi_get_pinned(keys)?;
2310 let values_parsed: Result<Vec<_>, TypedStoreError> = results
2311 .into_iter()
2312 .map(|value_byte| match value_byte {
2313 Some(data) => Ok(Some(
2314 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2315 )),
2316 None => Ok(None),
2317 })
2318 .collect();
2319
2320 values_parsed
2321 }
2322
2323 #[instrument(level = "trace", skip_all, err)]
2325 fn chunked_multi_get<J>(
2326 &self,
2327 keys: impl IntoIterator<Item = J>,
2328 chunk_size: usize,
2329 ) -> Result<Vec<Option<V>>, TypedStoreError>
2330 where
2331 J: Borrow<K>,
2332 {
2333 let cf = self.cf();
2334 let keys_bytes = keys
2335 .into_iter()
2336 .map(|k| (&cf, be_fix_int_ser(k.borrow()).unwrap()));
2337 let chunked_keys = keys_bytes.into_iter().chunks(chunk_size);
2338 let snapshot = self.snapshot()?;
2339 let mut results = vec![];
2340 for chunk in chunked_keys.into_iter() {
2341 let chunk_result = snapshot.multi_get_cf(chunk);
2342 let values_parsed: Result<Vec<_>, TypedStoreError> = chunk_result
2343 .into_iter()
2344 .map(|value_byte| {
2345 let value_byte = value_byte.map_err(typed_store_err_from_rocks_err)?;
2346 match value_byte {
2347 Some(data) => Ok(Some(
2348 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2349 )),
2350 None => Ok(None),
2351 }
2352 })
2353 .collect();
2354 results.extend(values_parsed?);
2355 }
2356 Ok(results)
2357 }
2358
2359 #[instrument(level = "trace", skip_all, err)]
2361 fn multi_insert<J, U>(
2362 &self,
2363 key_val_pairs: impl IntoIterator<Item = (J, U)>,
2364 ) -> Result<(), Self::Error>
2365 where
2366 J: Borrow<K>,
2367 U: Borrow<V>,
2368 {
2369 let mut batch = self.batch();
2370 batch.insert_batch(self, key_val_pairs)?;
2371 batch.write()
2372 }
2373
2374 #[instrument(level = "trace", skip_all, err)]
2376 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2377 where
2378 J: Borrow<K>,
2379 {
2380 let mut batch = self.batch();
2381 batch.delete_batch(self, keys)?;
2382 batch.write()
2383 }
2384
2385 #[instrument(level = "trace", skip_all, err)]
2387 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2388 self.rocksdb
2389 .try_catch_up_with_primary()
2390 .map_err(typed_store_err_from_rocks_err)
2391 }
2392}
2393
2394impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2395where
2396 J: Borrow<K>,
2397 U: Borrow<V>,
2398 K: Serialize,
2399 V: Serialize,
2400{
2401 type Error = TypedStoreError;
2402
2403 fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2404 where
2405 T: Iterator<Item = (J, U)>,
2406 {
2407 let mut batch = self.batch();
2408 batch.insert_batch(self, iter)?;
2409 batch.write()
2410 }
2411
2412 fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2413 let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2414 let mut batch = self.batch();
2415 batch.insert_batch(self, slice_of_refs)?;
2416 batch.write()
2417 }
2418}
2419
2420pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2421 env::var(var_name)
2422 .ok()?
2423 .parse::<usize>()
2424 .tap_err(|e| {
2425 warn!(
2426 "Env var {} does not contain valid usize integer: {}",
2427 var_name, e
2428 )
2429 })
2430 .ok()
2431}
2432
2433#[derive(Clone, Debug)]
2434pub struct ReadWriteOptions {
2435 pub ignore_range_deletions: bool,
2436 sync_to_disk: bool,
2438}
2439
2440impl ReadWriteOptions {
2441 pub fn readopts(&self) -> ReadOptions {
2442 let mut readopts = ReadOptions::default();
2443 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2444 readopts
2445 }
2446
2447 pub fn writeopts(&self) -> WriteOptions {
2448 let mut opts = WriteOptions::default();
2449 opts.set_sync(self.sync_to_disk);
2450 opts
2451 }
2452
2453 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2454 self.ignore_range_deletions = ignore;
2455 self
2456 }
2457}
2458
2459impl Default for ReadWriteOptions {
2460 fn default() -> Self {
2461 Self {
2462 ignore_range_deletions: true,
2463 sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2464 }
2465 }
2466}
2467#[derive(Default, Clone)]
2470pub struct DBOptions {
2471 pub options: rocksdb::Options,
2472 pub rw_options: ReadWriteOptions,
2473}
2474
2475impl DBOptions {
2476 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2480 self.options
2482 .optimize_for_point_lookup(block_cache_size_mb as u64);
2483 self
2484 }
2485
2486 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2489 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2490 info!("Large value blob storage optimization is disabled via env var.");
2491 return self;
2492 }
2493
2494 self.options.set_enable_blob_files(true);
2496 self.options
2497 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2498 self.options.set_enable_blob_gc(true);
2499 self.options.set_min_blob_size(min_blob_size);
2503
2504 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2506 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2507 * 1024
2508 * 1024;
2509 self.options.set_write_buffer_size(write_buffer_size);
2510 let target_file_size_base = 64 << 20;
2513 self.options
2514 .set_target_file_size_base(target_file_size_base);
2515 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2517 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2518 self.options
2519 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2520
2521 self
2522 }
2523
2524 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2526 self.options
2527 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2528 self
2529 }
2530
2531 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2533 self.options
2534 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2535 self.options
2536 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2537 self
2538 }
2539
2540 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2542 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2544 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2545 * 1024
2546 * 1024;
2547 self.options.set_write_buffer_size(write_buffer_size);
2548 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2550 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2551 self.options
2552 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2553 self.options
2555 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2556
2557 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2559 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2560 self.options.set_level_zero_file_num_compaction_trigger(
2561 max_level_zero_file_num.try_into().unwrap(),
2562 );
2563 self.options.set_level_zero_slowdown_writes_trigger(
2564 (max_level_zero_file_num * 12).try_into().unwrap(),
2565 );
2566 self.options
2567 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2568
2569 self.options.set_target_file_size_base(
2571 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2572 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2573 * 1024
2574 * 1024,
2575 );
2576
2577 self.options
2579 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2580
2581 self
2582 }
2583
2584 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2588 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2590 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2591 * 1024
2592 * 1024;
2593 self.options.set_write_buffer_size(write_buffer_size);
2594 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2596 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2597 self.options
2598 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2599 self.options
2601 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2602
2603 self.options
2605 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2606 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2607 compaction_options.set_max_size_amplification_percent(10000);
2608 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2609 self.options
2610 .set_universal_compaction_options(&compaction_options);
2611
2612 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2613 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2614 self.options.set_level_zero_file_num_compaction_trigger(
2615 max_level_zero_file_num.try_into().unwrap(),
2616 );
2617 self.options.set_level_zero_slowdown_writes_trigger(
2618 (max_level_zero_file_num * 12).try_into().unwrap(),
2619 );
2620 self.options
2621 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2622
2623 self.options.set_target_file_size_base(
2625 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2626 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2627 * 1024
2628 * 1024,
2629 );
2630
2631 self.options
2633 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2634
2635 self
2636 }
2637
2638 pub fn set_block_options(
2640 mut self,
2641 block_cache_size_mb: usize,
2642 block_size_bytes: usize,
2643 ) -> DBOptions {
2644 self.options
2645 .set_block_based_table_factory(&get_block_options(
2646 block_cache_size_mb,
2647 block_size_bytes,
2648 ));
2649 self
2650 }
2651
2652 pub fn disable_write_throttling(mut self) -> DBOptions {
2654 self.options.set_soft_pending_compaction_bytes_limit(0);
2655 self.options.set_hard_pending_compaction_bytes_limit(0);
2656 self
2657 }
2658}
2659
2660pub fn default_db_options() -> DBOptions {
2663 let mut opt = rocksdb::Options::default();
2664
2665 if let Some(limit) = fdlimit::raise_fd_limit() {
2669 opt.set_max_open_files((limit / 8) as i32);
2671 }
2672
2673 opt.set_table_cache_num_shard_bits(10);
2676
2677 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2679 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2680 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2681
2682 opt.set_db_write_buffer_size(
2694 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2695 * 1024
2696 * 1024,
2697 );
2698 opt.set_max_total_wal_size(
2699 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2700 );
2701
2702 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2704
2705 opt.set_enable_pipelined_write(true);
2706
2707 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2710
2711 opt.set_memtable_prefix_bloom_ratio(0.02);
2713
2714 DBOptions {
2715 options: opt,
2716 rw_options: ReadWriteOptions::default(),
2717 }
2718}
2719
2720fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2721 let mut block_options = BlockBasedOptions::default();
2726 block_options.set_block_size(block_size_bytes);
2728 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2730 block_options.set_bloom_filter(10.0, false);
2732 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2734 block_options
2735}
2736
2737#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2740pub fn open_cf<P: AsRef<Path>>(
2741 path: P,
2742 db_options: Option<rocksdb::Options>,
2743 metric_conf: MetricConf,
2744 opt_cfs: &[&str],
2745) -> Result<Arc<RocksDB>, TypedStoreError> {
2746 let options = db_options.unwrap_or_else(|| default_db_options().options);
2747 let column_descriptors: Vec<_> = opt_cfs
2748 .iter()
2749 .map(|name| (*name, options.clone()))
2750 .collect();
2751 open_cf_opts(
2752 path,
2753 Some(options.clone()),
2754 metric_conf,
2755 &column_descriptors[..],
2756 )
2757}
2758
2759fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2760 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2762 options.create_if_missing(true);
2763 options.create_missing_column_families(true);
2764 options
2765}
2766
2767#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2770pub fn open_cf_opts<P: AsRef<Path>>(
2771 path: P,
2772 db_options: Option<rocksdb::Options>,
2773 metric_conf: MetricConf,
2774 opt_cfs: &[(&str, rocksdb::Options)],
2775) -> Result<Arc<RocksDB>, TypedStoreError> {
2776 let path = path.as_ref();
2777 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2787 nondeterministic!({
2788 let options = prepare_db_options(db_options);
2789 let rocksdb = {
2790 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2791 &options,
2792 path,
2793 cfs.into_iter()
2794 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2795 )
2796 .map_err(typed_store_err_from_rocks_err)?
2797 };
2798 Ok(Arc::new(RocksDB::DBWithThreadMode(
2799 DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2800 )))
2801 })
2802}
2803
2804#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2807pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2808 path: P,
2809 db_options: Option<rocksdb::Options>,
2810 metric_conf: MetricConf,
2811 opt_cfs: &[(&str, rocksdb::Options)],
2812) -> Result<Arc<RocksDB>, TypedStoreError> {
2813 let path = path.as_ref();
2814 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2815 nondeterministic!({
2817 let options = prepare_db_options(db_options);
2818 let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2819 &options,
2820 path,
2821 cfs.into_iter()
2822 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2823 )
2824 .map_err(typed_store_err_from_rocks_err)?;
2825 Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2826 OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2827 )))
2828 })
2829}
2830
2831pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2834 primary_path: P,
2835 secondary_path: Option<P>,
2836 db_options: Option<rocksdb::Options>,
2837 metric_conf: MetricConf,
2838 opt_cfs: &[(&str, rocksdb::Options)],
2839) -> Result<Arc<RocksDB>, TypedStoreError> {
2840 let primary_path = primary_path.as_ref();
2841 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2842 nondeterministic!({
2844 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2846
2847 fdlimit::raise_fd_limit();
2848 options.set_max_open_files(-1);
2850
2851 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2852 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2853 .ok()
2854 .unwrap_or_default();
2855
2856 let default_db_options = default_db_options();
2857 for cf_key in cfs.iter() {
2859 if !opt_cfs.contains_key(&cf_key[..]) {
2860 opt_cfs.insert(cf_key, default_db_options.options.clone());
2861 }
2862 }
2863
2864 let primary_path = primary_path.to_path_buf();
2865 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2866 let mut s = primary_path.clone();
2867 s.pop();
2868 s.push("SECONDARY");
2869 s.as_path().to_path_buf()
2870 });
2871
2872 let rocksdb = {
2873 options.create_if_missing(true);
2874 options.create_missing_column_families(true);
2875 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2876 &options,
2877 &primary_path,
2878 &secondary_path,
2879 opt_cfs
2880 .iter()
2881 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2882 )
2883 .map_err(typed_store_err_from_rocks_err)?;
2884 db.try_catch_up_with_primary()
2885 .map_err(typed_store_err_from_rocks_err)?;
2886 db
2887 };
2888 Ok(Arc::new(RocksDB::DBWithThreadMode(
2889 DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2890 )))
2891 })
2892}
2893
2894pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2895 const DB_DEFAULT_CF_NAME: &str = "default";
2896
2897 let opts = rocksdb::Options::default();
2898 rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2899 .map_err(|e| e.into())
2900 .map(|q| {
2901 q.iter()
2902 .filter_map(|s| {
2903 if s != DB_DEFAULT_CF_NAME {
2905 Some(s.clone())
2906 } else {
2907 None
2908 }
2909 })
2910 .collect()
2911 })
2912}
2913
2914#[inline]
2916pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2917where
2918 S: ?Sized + serde::Serialize,
2919{
2920 bincode::DefaultOptions::new()
2921 .with_big_endian()
2922 .with_fixint_encoding()
2923 .serialize(t)
2924 .map_err(typed_store_err_from_bincode_err)
2925}
2926
2927#[derive(Clone)]
2928pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2929impl DBMapTableConfigMap {
2930 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2931 Self(map)
2932 }
2933
2934 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2935 self.0.clone()
2936 }
2937}
2938
2939pub enum RocksDBAccessType {
2940 Primary,
2941 Secondary(Option<PathBuf>),
2942}
2943
2944pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2945 rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2946}
2947
2948fn populate_missing_cfs(
2949 input_cfs: &[(&str, rocksdb::Options)],
2950 path: &Path,
2951) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2952 let mut cfs = vec![];
2953 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2954 let existing_cfs =
2955 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2956 .ok()
2957 .unwrap_or_default();
2958
2959 for cf_name in existing_cfs {
2960 if !input_cf_index.contains(&cf_name[..]) {
2961 cfs.push((cf_name, rocksdb::Options::default()));
2962 }
2963 }
2964 cfs.extend(
2965 input_cfs
2966 .iter()
2967 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2968 );
2969 Ok(cfs)
2970}
2971
2972fn big_endian_saturating_add_one(v: &mut [u8]) {
2976 if is_max(v) {
2977 return;
2978 }
2979 for i in (0..v.len()).rev() {
2980 if v[i] == u8::MAX {
2981 v[i] = 0;
2982 } else {
2983 v[i] += 1;
2984 break;
2985 }
2986 }
2987}
2988
2989fn is_max(v: &[u8]) -> bool {
2991 v.iter().all(|&x| x == u8::MAX)
2992}
2993
2994#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2997#[test]
2998fn test_helpers() {
2999 let v = vec![];
3000 assert!(is_max(&v));
3001
3002 fn check_add(v: Vec<u8>) {
3003 let mut v = v;
3004 let num = Num32::from_big_endian(&v);
3005 big_endian_saturating_add_one(&mut v);
3006 assert!(num + 1 == Num32::from_big_endian(&v));
3007 }
3008
3009 uint::construct_uint! {
3010 struct Num32(4);
3012 }
3013
3014 let mut v = vec![255; 32];
3015 big_endian_saturating_add_one(&mut v);
3016 assert!(Num32::MAX == Num32::from_big_endian(&v));
3017
3018 check_add(vec![1; 32]);
3019 check_add(vec![6; 32]);
3020 check_add(vec![254; 32]);
3021
3022 }