1pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod keys;
8pub(crate) mod safe_iter;
9pub mod util;
10pub(crate) mod values;
11
12use std::{
13 borrow::Borrow,
14 collections::{BTreeMap, HashSet},
15 env,
16 ffi::CStr,
17 marker::PhantomData,
18 ops::{Bound, RangeBounds},
19 path::{Path, PathBuf},
20 sync::Arc,
21 time::Duration,
22};
23
24use bincode::Options;
25use collectable::TryExtend;
26use iota_macros::{fail_point, nondeterministic};
27use itertools::Itertools;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::{
30 AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
31 ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
32 IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
33 ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
34 WriteOptions, checkpoint::Checkpoint, properties, properties::num_files_at_level,
35};
36use serde::{Serialize, de::DeserializeOwned};
37use tap::TapFallible;
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, instrument, warn};
40
41use self::{iter::Iter, keys::Keys, values::Values};
42use crate::{
43 TypedStoreError,
44 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
45 rocks::{
46 errors::{
47 typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
48 typed_store_err_from_rocks_err,
49 },
50 safe_iter::SafeIter,
51 },
52 traits::{Map, TableSummary},
53};
54
55const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
58const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
59
60const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
63const DEFAULT_DB_WAL_SIZE: usize = 1024;
64
65const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
68const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
69const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
70const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
71const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
72const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
73const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
74const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
75const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
76
77const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
79
80const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
81
82const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
85 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
86
87#[cfg(test)]
88mod tests;
89
90#[macro_export]
128macro_rules! reopen {
129 ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
130 (
131 $(
132 DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
133 ),*
134 )
135 };
136}
137
138#[macro_export]
142macro_rules! retry_transaction {
143 ($transaction:expr) => {
144 retry_transaction!($transaction, Some(20))
145 };
146
147 (
148 $transaction:expr,
149 $max_retries:expr $(,)?
151
152 ) => {{
153 use rand::{
154 distributions::{Distribution, Uniform},
155 rngs::ThreadRng,
156 };
157 use tokio::time::{Duration, sleep};
158 use tracing::{error, info};
159
160 let mut retries = 0;
161 let max_retries = $max_retries;
162 loop {
163 let status = $transaction;
164 match status {
165 Err(TypedStoreError::RetryableTransaction) => {
166 retries += 1;
167 let delay = {
169 let mut rng = ThreadRng::default();
170 Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
171 };
172 if let Some(max_retries) = max_retries {
173 if retries > max_retries {
174 error!(?max_retries, "max retries exceeded");
175 break status;
176 }
177 }
178 if retries > 10 {
179 error!(?delay, ?retries, "excessive transaction retries...");
181 } else {
182 info!(
183 ?delay,
184 ?retries,
185 "transaction write conflict detected, sleeping"
186 );
187 }
188 sleep(delay).await;
189 }
190 _ => break status,
191 }
192 }
193 }};
194}
195
196#[macro_export]
197macro_rules! retry_transaction_forever {
198 ($transaction:expr) => {
199 $crate::retry_transaction!($transaction, None)
200 };
201}
202
203#[derive(Debug)]
204pub struct DBWithThreadModeWrapper {
205 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
206 pub metric_conf: MetricConf,
207 pub db_path: PathBuf,
208}
209
210impl DBWithThreadModeWrapper {
211 fn new(
212 underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
213 metric_conf: MetricConf,
214 db_path: PathBuf,
215 ) -> Self {
216 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
217 Self {
218 underlying,
219 metric_conf,
220 db_path,
221 }
222 }
223}
224
225impl Drop for DBWithThreadModeWrapper {
226 fn drop(&mut self) {
227 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
228 }
229}
230
231#[derive(Debug)]
232pub struct OptimisticTransactionDBWrapper {
233 pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
234 pub metric_conf: MetricConf,
235 pub db_path: PathBuf,
236}
237
238impl OptimisticTransactionDBWrapper {
239 fn new(
240 underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
241 metric_conf: MetricConf,
242 db_path: PathBuf,
243 ) -> Self {
244 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
245 Self {
246 underlying,
247 metric_conf,
248 db_path,
249 }
250 }
251}
252
253impl Drop for OptimisticTransactionDBWrapper {
254 fn drop(&mut self) {
255 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
256 }
257}
258
259#[derive(Debug)]
261pub enum RocksDB {
262 DBWithThreadMode(DBWithThreadModeWrapper),
263 OptimisticTransactionDB(OptimisticTransactionDBWrapper),
264}
265
266macro_rules! delegate_call {
267 ($self:ident.$method:ident($($args:ident),*)) => {
268 match $self {
269 Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
270 Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
271 }
272 }
273}
274
275impl Drop for RocksDB {
276 fn drop(&mut self) {
277 delegate_call!(self.cancel_all_background_work(true))
278 }
279}
280
281impl RocksDB {
282 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
283 delegate_call!(self.get(key))
284 }
285
286 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
287 &'a self,
288 keys: I,
289 readopts: &ReadOptions,
290 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
291 where
292 K: AsRef<[u8]>,
293 I: IntoIterator<Item = (&'b W, K)>,
294 W: 'b + AsColumnFamilyRef,
295 {
296 delegate_call!(self.multi_get_cf_opt(keys, readopts))
297 }
298
299 pub fn batched_multi_get_cf_opt<I, K>(
300 &self,
301 cf: &impl AsColumnFamilyRef,
302 keys: I,
303 sorted_input: bool,
304 readopts: &ReadOptions,
305 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
306 where
307 I: IntoIterator<Item = K>,
308 K: AsRef<[u8]>,
309 {
310 delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
311 }
312
313 pub fn property_int_value_cf(
314 &self,
315 cf: &impl AsColumnFamilyRef,
316 name: impl CStrLike,
317 ) -> Result<Option<u64>, rocksdb::Error> {
318 delegate_call!(self.property_int_value_cf(cf, name))
319 }
320
321 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
322 &self,
323 cf: &impl AsColumnFamilyRef,
324 key: K,
325 readopts: &ReadOptions,
326 ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
327 delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
328 }
329
330 pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
331 delegate_call!(self.cf_handle(name))
332 }
333
334 pub fn create_cf<N: AsRef<str>>(
335 &self,
336 name: N,
337 opts: &rocksdb::Options,
338 ) -> Result<(), rocksdb::Error> {
339 delegate_call!(self.create_cf(name, opts))
340 }
341
342 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
343 delegate_call!(self.drop_cf(name))
344 }
345
346 pub fn delete_file_in_range<K: AsRef<[u8]>>(
347 &self,
348 cf: &impl AsColumnFamilyRef,
349 from: K,
350 to: K,
351 ) -> Result<(), rocksdb::Error> {
352 delegate_call!(self.delete_file_in_range_cf(cf, from, to))
353 }
354
355 pub fn delete_cf<K: AsRef<[u8]>>(
356 &self,
357 cf: &impl AsColumnFamilyRef,
358 key: K,
359 writeopts: &WriteOptions,
360 ) -> Result<(), rocksdb::Error> {
361 fail_point!("delete-cf-before");
362 let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
363 fail_point!("delete-cf-after");
364 #[expect(clippy::let_and_return)]
365 ret
366 }
367
368 pub fn path(&self) -> &Path {
369 delegate_call!(self.path())
370 }
371
372 pub fn put_cf<K, V>(
373 &self,
374 cf: &impl AsColumnFamilyRef,
375 key: K,
376 value: V,
377 writeopts: &WriteOptions,
378 ) -> Result<(), rocksdb::Error>
379 where
380 K: AsRef<[u8]>,
381 V: AsRef<[u8]>,
382 {
383 fail_point!("put-cf-before");
384 let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
385 fail_point!("put-cf-after");
386 #[expect(clippy::let_and_return)]
387 ret
388 }
389
390 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
391 &self,
392 cf: &impl AsColumnFamilyRef,
393 key: K,
394 readopts: &ReadOptions,
395 ) -> bool {
396 delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
397 }
398
399 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
400 delegate_call!(self.try_catch_up_with_primary())
401 }
402
403 pub fn write(
404 &self,
405 batch: RocksDBBatch,
406 writeopts: &WriteOptions,
407 ) -> Result<(), TypedStoreError> {
408 fail_point!("batch-write-before");
409 let ret = match (self, batch) {
410 (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
411 db.underlying
412 .write_opt(batch, writeopts)
413 .map_err(typed_store_err_from_rocks_err)?;
414 Ok(())
415 }
416 (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
417 db.underlying
418 .write_opt(batch, writeopts)
419 .map_err(typed_store_err_from_rocks_err)?;
420 Ok(())
421 }
422 _ => Err(TypedStoreError::RocksDB(
423 "using invalid batch type for the database".to_string(),
424 )),
425 };
426 fail_point!("batch-write-after");
427 #[expect(clippy::let_and_return)]
428 ret
429 }
430
431 pub fn transaction_without_snapshot(
432 &self,
433 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
434 match self {
435 Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
436 Self::DBWithThreadMode(_) => panic!(),
437 }
438 }
439
440 pub fn transaction(
441 &self,
442 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
443 match self {
444 Self::OptimisticTransactionDB(db) => {
445 let mut tx_opts = OptimisticTransactionOptions::new();
446 tx_opts.set_snapshot(true);
447
448 Ok(db
449 .underlying
450 .transaction_opt(&WriteOptions::default(), &tx_opts))
451 }
452 Self::DBWithThreadMode(_) => panic!(),
453 }
454 }
455
456 pub fn raw_iterator_cf<'a: 'b, 'b>(
457 &'a self,
458 cf_handle: &impl AsColumnFamilyRef,
459 readopts: ReadOptions,
460 ) -> RocksDBRawIter<'b> {
461 match self {
462 Self::DBWithThreadMode(db) => {
463 RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
464 }
465 Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
466 db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
467 ),
468 }
469 }
470
471 pub fn iterator_cf<'a: 'b, 'b>(
472 &'a self,
473 cf_handle: &impl AsColumnFamilyRef,
474 readopts: ReadOptions,
475 mode: IteratorMode<'_>,
476 ) -> RocksDBIter<'b> {
477 match self {
478 Self::DBWithThreadMode(db) => {
479 RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
480 }
481 Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
482 db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
483 ),
484 }
485 }
486
487 pub fn compact_range_cf<K: AsRef<[u8]>>(
488 &self,
489 cf: &impl AsColumnFamilyRef,
490 start: Option<K>,
491 end: Option<K>,
492 ) {
493 delegate_call!(self.compact_range_cf(cf, start, end))
494 }
495
496 pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
497 &self,
498 cf: &impl AsColumnFamilyRef,
499 start: Option<K>,
500 end: Option<K>,
501 ) {
502 let opt = &mut CompactOptions::default();
503 opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
504 delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
505 }
506
507 pub fn flush(&self) -> Result<(), TypedStoreError> {
508 delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
509 }
510
511 pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
512 match self {
513 Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
514 Self::OptimisticTransactionDB(d) => {
515 RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
516 }
517 }
518 }
519
520 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
521 let checkpoint = match self {
522 Self::DBWithThreadMode(d) => {
523 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
524 }
525 Self::OptimisticTransactionDB(d) => {
526 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
527 }
528 };
529 checkpoint
530 .create_checkpoint(path)
531 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
532 Ok(())
533 }
534
535 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
536 delegate_call!(self.flush_cf(cf))
537 }
538
539 pub fn set_options_cf(
540 &self,
541 cf: &impl AsColumnFamilyRef,
542 opts: &[(&str, &str)],
543 ) -> Result<(), rocksdb::Error> {
544 delegate_call!(self.set_options_cf(cf, opts))
545 }
546
547 pub fn get_sampling_interval(&self) -> SamplingInterval {
548 match self {
549 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
550 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
551 }
552 }
553
554 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
555 match self {
556 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
557 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
558 }
559 }
560
561 pub fn write_sampling_interval(&self) -> SamplingInterval {
562 match self {
563 Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
564 Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
565 }
566 }
567
568 pub fn iter_sampling_interval(&self) -> SamplingInterval {
569 match self {
570 Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
571 Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
572 }
573 }
574
575 pub fn db_name(&self) -> String {
576 let name = match self {
577 Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
578 Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
579 };
580 if name.is_empty() {
581 self.default_db_name()
582 } else {
583 name.clone()
584 }
585 }
586
587 fn default_db_name(&self) -> String {
588 self.path()
589 .file_name()
590 .and_then(|f| f.to_str())
591 .unwrap_or("unknown")
592 .to_string()
593 }
594
595 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
596 delegate_call!(self.live_files())
597 }
598}
599
600pub enum RocksDBSnapshot<'a> {
601 DBWithThreadMode(rocksdb::Snapshot<'a>),
602 OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
603}
604
605impl<'a> RocksDBSnapshot<'a> {
606 pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
607 &'a self,
608 keys: I,
609 readopts: ReadOptions,
610 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
611 where
612 K: AsRef<[u8]>,
613 I: IntoIterator<Item = (&'b W, K)>,
614 W: 'b + AsColumnFamilyRef,
615 {
616 match self {
617 Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
618 Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
619 }
620 }
621 pub fn multi_get_cf<'b: 'a, K, I, W>(
622 &'a self,
623 keys: I,
624 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
625 where
626 K: AsRef<[u8]>,
627 I: IntoIterator<Item = (&'b W, K)>,
628 W: 'b + AsColumnFamilyRef,
629 {
630 match self {
631 Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
632 Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
633 }
634 }
635}
636
637pub enum RocksDBBatch {
638 Regular(rocksdb::WriteBatch),
639 Transactional(rocksdb::WriteBatchWithTransaction<true>),
640}
641
642macro_rules! delegate_batch_call {
643 ($self:ident.$method:ident($($args:ident),*)) => {
644 match $self {
645 Self::Regular(b) => b.$method($($args),*),
646 Self::Transactional(b) => b.$method($($args),*),
647 }
648 }
649}
650
651impl RocksDBBatch {
652 fn size_in_bytes(&self) -> usize {
653 delegate_batch_call!(self.size_in_bytes())
654 }
655
656 pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
657 delegate_batch_call!(self.delete_cf(cf, key))
658 }
659
660 pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
661 where
662 K: AsRef<[u8]>,
663 V: AsRef<[u8]>,
664 {
665 delegate_batch_call!(self.put_cf(cf, key, value))
666 }
667
668 pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
669 where
670 K: AsRef<[u8]>,
671 V: AsRef<[u8]>,
672 {
673 delegate_batch_call!(self.merge_cf(cf, key, value))
674 }
675
676 pub fn delete_range_cf<K: AsRef<[u8]>>(
677 &mut self,
678 cf: &impl AsColumnFamilyRef,
679 from: K,
680 to: K,
681 ) -> Result<(), TypedStoreError> {
682 match self {
683 Self::Regular(batch) => {
684 batch.delete_range_cf(cf, from, to);
685 Ok(())
686 }
687 Self::Transactional(_) => panic!(),
688 }
689 }
690}
691
692#[derive(Debug, Default)]
693pub struct MetricConf {
694 pub db_name: String,
695 pub read_sample_interval: SamplingInterval,
696 pub write_sample_interval: SamplingInterval,
697 pub iter_sample_interval: SamplingInterval,
698}
699
700impl MetricConf {
701 pub fn new(db_name: &str) -> Self {
702 if db_name.is_empty() {
703 error!("A meaningful db name should be used for metrics reporting.")
704 }
705 Self {
706 db_name: db_name.to_string(),
707 read_sample_interval: SamplingInterval::default(),
708 write_sample_interval: SamplingInterval::default(),
709 iter_sample_interval: SamplingInterval::default(),
710 }
711 }
712
713 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
714 Self {
715 db_name: self.db_name,
716 read_sample_interval: read_interval,
717 write_sample_interval: SamplingInterval::default(),
718 iter_sample_interval: SamplingInterval::default(),
719 }
720 }
721}
722const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
723const METRICS_ERROR: i64 = -1;
724
725#[derive(Clone, Debug)]
727pub struct DBMap<K, V> {
728 pub rocksdb: Arc<RocksDB>,
729 _phantom: PhantomData<fn(K) -> V>,
730 cf: String,
732 pub opts: ReadWriteOptions,
733 db_metrics: Arc<DBMetrics>,
734 get_sample_interval: SamplingInterval,
735 multiget_sample_interval: SamplingInterval,
736 write_sample_interval: SamplingInterval,
737 iter_sample_interval: SamplingInterval,
738 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
739}
740
741unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
742
743impl<K, V> DBMap<K, V> {
744 pub(crate) fn new(
745 db: Arc<RocksDB>,
746 opts: &ReadWriteOptions,
747 opt_cf: &str,
748 is_deprecated: bool,
749 ) -> Self {
750 let db_cloned = db.clone();
751 let db_metrics = DBMetrics::get();
752 let db_metrics_cloned = db_metrics.clone();
753 let cf = opt_cf.to_string();
754 let (sender, mut recv) = tokio::sync::oneshot::channel();
755 if !is_deprecated {
756 tokio::task::spawn(async move {
757 let mut interval =
758 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
759 loop {
760 tokio::select! {
761 _ = interval.tick() => {
762 let db = db_cloned.clone();
763 let cf = cf.clone();
764 let db_metrics = db_metrics.clone();
765 if let Err(e) = tokio::task::spawn_blocking(move || {
766 Self::report_metrics(&db, &cf, &db_metrics);
767 }).await {
768 error!("Failed to log metrics with error: {}", e);
769 }
770 }
771 _ = &mut recv => break,
772 }
773 }
774 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
775 });
776 }
777 DBMap {
778 rocksdb: db.clone(),
779 opts: opts.clone(),
780 _phantom: PhantomData,
781 cf: opt_cf.to_string(),
782 db_metrics: db_metrics_cloned,
783 _metrics_task_cancel_handle: Arc::new(sender),
784 get_sample_interval: db.get_sampling_interval(),
785 multiget_sample_interval: db.multiget_sampling_interval(),
786 write_sample_interval: db.write_sampling_interval(),
787 iter_sample_interval: db.iter_sampling_interval(),
788 }
789 }
790
791 #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
798 pub fn open<P: AsRef<Path>>(
799 path: P,
800 metric_conf: MetricConf,
801 db_options: Option<rocksdb::Options>,
802 opt_cf: Option<&str>,
803 rw_options: &ReadWriteOptions,
804 ) -> Result<Self, TypedStoreError> {
805 let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
806 let cfs = vec![cf_key];
807 let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
808 Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
809 }
810
811 #[instrument(level = "debug", skip(db), err)]
851 pub fn reopen(
852 db: &Arc<RocksDB>,
853 opt_cf: Option<&str>,
854 rw_options: &ReadWriteOptions,
855 is_deprecated: bool,
856 ) -> Result<Self, TypedStoreError> {
857 let cf_key = opt_cf
858 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
859 .to_owned();
860
861 db.cf_handle(&cf_key)
862 .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
863
864 Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
865 }
866
867 pub fn batch(&self) -> DBBatch {
868 let batch = match *self.rocksdb {
869 RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
870 RocksDB::OptimisticTransactionDB(_) => {
871 RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
872 }
873 };
874 DBBatch::new(
875 &self.rocksdb,
876 batch,
877 self.opts.writeopts(),
878 &self.db_metrics,
879 &self.write_sample_interval,
880 )
881 }
882
883 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
884 let from_buf = be_fix_int_ser(start)?;
885 let to_buf = be_fix_int_ser(end)?;
886 self.rocksdb
887 .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
888 Ok(())
889 }
890
891 pub fn compact_range_raw(
892 &self,
893 cf_name: &str,
894 start: Vec<u8>,
895 end: Vec<u8>,
896 ) -> Result<(), TypedStoreError> {
897 let cf = self
898 .rocksdb
899 .cf_handle(cf_name)
900 .expect("compact range: column family does not exist");
901 self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
902 Ok(())
903 }
904
905 pub fn compact_range_to_bottom<J: Serialize>(
906 &self,
907 start: &J,
908 end: &J,
909 ) -> Result<(), TypedStoreError> {
910 let from_buf = be_fix_int_ser(start)?;
911 let to_buf = be_fix_int_ser(end)?;
912 self.rocksdb
913 .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
914 Ok(())
915 }
916
917 pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
918 self.rocksdb
919 .cf_handle(&self.cf)
920 .expect("Map-keying column family should have been checked at DB creation")
921 }
922
923 pub fn iterator_cf(&self) -> RocksDBIter<'_> {
924 self.rocksdb
925 .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
926 }
927
928 pub fn flush(&self) -> Result<(), TypedStoreError> {
929 self.rocksdb
930 .flush_cf(&self.cf())
931 .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
932 }
933
934 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
935 self.rocksdb.set_options_cf(&self.cf(), opts)
936 }
937
938 fn get_int_property(
939 rocksdb: &RocksDB,
940 cf: &impl AsColumnFamilyRef,
941 property_name: &std::ffi::CStr,
942 ) -> Result<i64, TypedStoreError> {
943 match rocksdb.property_int_value_cf(cf, property_name) {
944 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
945 Ok(None) => Ok(0),
946 Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
947 }
948 }
949
950 fn multi_get_pinned<J>(
952 &self,
953 keys: impl IntoIterator<Item = J>,
954 ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
955 where
956 J: Borrow<K>,
957 K: Serialize,
958 {
959 let _timer = self
960 .db_metrics
961 .op_metrics
962 .rocksdb_multiget_latency_seconds
963 .with_label_values(&[&self.cf])
964 .start_timer();
965 let perf_ctx = if self.multiget_sample_interval.sample() {
966 Some(RocksDBPerfContext)
967 } else {
968 None
969 };
970 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
971 .into_iter()
972 .map(|k| be_fix_int_ser(k.borrow()))
973 .collect();
974 let results: Result<Vec<_>, TypedStoreError> = self
975 .rocksdb
976 .batched_multi_get_cf_opt(
977 &self.cf(),
978 keys_bytes?,
979 false,
981 &self.opts.readopts(),
982 )
983 .into_iter()
984 .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
985 .collect();
986 let entries = results?;
987 let entry_size = entries
988 .iter()
989 .flatten()
990 .map(|entry| entry.len())
991 .sum::<usize>();
992 self.db_metrics
993 .op_metrics
994 .rocksdb_multiget_bytes
995 .with_label_values(&[&self.cf])
996 .observe(entry_size as f64);
997 if perf_ctx.is_some() {
998 self.db_metrics
999 .read_perf_ctx_metrics
1000 .report_metrics(&self.cf);
1001 }
1002 Ok(entries)
1003 }
1004
1005 fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
1006 let cf = rocksdb.cf_handle(cf_name).expect("Failed to get cf");
1007 db_metrics
1008 .cf_metrics
1009 .rocksdb_total_sst_files_size
1010 .with_label_values(&[cf_name])
1011 .set(
1012 Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1013 .unwrap_or(METRICS_ERROR),
1014 );
1015 db_metrics
1016 .cf_metrics
1017 .rocksdb_total_blob_files_size
1018 .with_label_values(&[cf_name])
1019 .set(
1020 Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1021 .unwrap_or(METRICS_ERROR),
1022 );
1023 let total_num_files: i64 = (0..=6)
1027 .map(|level| {
1028 Self::get_int_property(rocksdb, &cf, &num_files_at_level(level))
1029 .unwrap_or(METRICS_ERROR)
1030 })
1031 .sum();
1032 db_metrics
1033 .cf_metrics
1034 .rocksdb_total_num_files
1035 .with_label_values(&[cf_name])
1036 .set(total_num_files);
1037 db_metrics
1038 .cf_metrics
1039 .rocksdb_num_level0_files
1040 .with_label_values(&[cf_name])
1041 .set(
1042 Self::get_int_property(rocksdb, &cf, &num_files_at_level(0))
1043 .unwrap_or(METRICS_ERROR),
1044 );
1045 db_metrics
1046 .cf_metrics
1047 .rocksdb_current_size_active_mem_tables
1048 .with_label_values(&[cf_name])
1049 .set(
1050 Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1051 .unwrap_or(METRICS_ERROR),
1052 );
1053 db_metrics
1054 .cf_metrics
1055 .rocksdb_size_all_mem_tables
1056 .with_label_values(&[cf_name])
1057 .set(
1058 Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1059 .unwrap_or(METRICS_ERROR),
1060 );
1061 db_metrics
1062 .cf_metrics
1063 .rocksdb_num_snapshots
1064 .with_label_values(&[cf_name])
1065 .set(
1066 Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1067 .unwrap_or(METRICS_ERROR),
1068 );
1069 db_metrics
1070 .cf_metrics
1071 .rocksdb_oldest_snapshot_time
1072 .with_label_values(&[cf_name])
1073 .set(
1074 Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1075 .unwrap_or(METRICS_ERROR),
1076 );
1077 db_metrics
1078 .cf_metrics
1079 .rocksdb_actual_delayed_write_rate
1080 .with_label_values(&[cf_name])
1081 .set(
1082 Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1083 .unwrap_or(METRICS_ERROR),
1084 );
1085 db_metrics
1086 .cf_metrics
1087 .rocksdb_is_write_stopped
1088 .with_label_values(&[cf_name])
1089 .set(
1090 Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1091 .unwrap_or(METRICS_ERROR),
1092 );
1093 db_metrics
1094 .cf_metrics
1095 .rocksdb_block_cache_capacity
1096 .with_label_values(&[cf_name])
1097 .set(
1098 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1099 .unwrap_or(METRICS_ERROR),
1100 );
1101 db_metrics
1102 .cf_metrics
1103 .rocksdb_block_cache_usage
1104 .with_label_values(&[cf_name])
1105 .set(
1106 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1107 .unwrap_or(METRICS_ERROR),
1108 );
1109 db_metrics
1110 .cf_metrics
1111 .rocksdb_block_cache_pinned_usage
1112 .with_label_values(&[cf_name])
1113 .set(
1114 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1115 .unwrap_or(METRICS_ERROR),
1116 );
1117 db_metrics
1118 .cf_metrics
1119 .rocksdb_estimate_table_readers_mem
1120 .with_label_values(&[cf_name])
1121 .set(
1122 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1123 .unwrap_or(METRICS_ERROR),
1124 );
1125 db_metrics
1126 .cf_metrics
1127 .rocksdb_estimated_num_keys
1128 .with_label_values(&[cf_name])
1129 .set(
1130 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1131 .unwrap_or(METRICS_ERROR),
1132 );
1133 db_metrics
1134 .cf_metrics
1135 .rocksdb_num_immutable_mem_tables
1136 .with_label_values(&[cf_name])
1137 .set(
1138 Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1139 .unwrap_or(METRICS_ERROR),
1140 );
1141 db_metrics
1142 .cf_metrics
1143 .rocksdb_mem_table_flush_pending
1144 .with_label_values(&[cf_name])
1145 .set(
1146 Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1147 .unwrap_or(METRICS_ERROR),
1148 );
1149 db_metrics
1150 .cf_metrics
1151 .rocksdb_compaction_pending
1152 .with_label_values(&[cf_name])
1153 .set(
1154 Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1155 .unwrap_or(METRICS_ERROR),
1156 );
1157 db_metrics
1158 .cf_metrics
1159 .rocksdb_estimate_pending_compaction_bytes
1160 .with_label_values(&[cf_name])
1161 .set(
1162 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1163 .unwrap_or(METRICS_ERROR),
1164 );
1165 db_metrics
1166 .cf_metrics
1167 .rocksdb_num_running_compactions
1168 .with_label_values(&[cf_name])
1169 .set(
1170 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1171 .unwrap_or(METRICS_ERROR),
1172 );
1173 db_metrics
1174 .cf_metrics
1175 .rocksdb_num_running_flushes
1176 .with_label_values(&[cf_name])
1177 .set(
1178 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1179 .unwrap_or(METRICS_ERROR),
1180 );
1181 db_metrics
1182 .cf_metrics
1183 .rocksdb_estimate_oldest_key_time
1184 .with_label_values(&[cf_name])
1185 .set(
1186 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1187 .unwrap_or(METRICS_ERROR),
1188 );
1189 db_metrics
1190 .cf_metrics
1191 .rocksdb_background_errors
1192 .with_label_values(&[cf_name])
1193 .set(
1194 Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1195 .unwrap_or(METRICS_ERROR),
1196 );
1197 db_metrics
1198 .cf_metrics
1199 .rocksdb_base_level
1200 .with_label_values(&[cf_name])
1201 .set(
1202 Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1203 .unwrap_or(METRICS_ERROR),
1204 );
1205 }
1206
1207 pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1208 DBTransaction::new(&self.rocksdb)
1209 }
1210
1211 pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1212 DBTransaction::new_without_snapshot(&self.rocksdb)
1213 }
1214
1215 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1216 self.rocksdb.checkpoint(path)
1217 }
1218
1219 pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1220 Ok(self.rocksdb.snapshot())
1221 }
1222
1223 pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1224 let mut num_keys = 0;
1225 let mut key_bytes_total = 0;
1226 let mut value_bytes_total = 0;
1227 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1228 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1229 let iter = self.iterator_cf().map(Result::unwrap);
1230 for (key, value) in iter {
1231 num_keys += 1;
1232 key_bytes_total += key.len();
1233 value_bytes_total += value.len();
1234 key_hist.record(key.len() as u64)?;
1235 value_hist.record(value.len() as u64)?;
1236 }
1237 Ok(TableSummary {
1238 num_keys,
1239 key_bytes_total,
1240 value_bytes_total,
1241 key_hist,
1242 value_hist,
1243 })
1244 }
1245
1246 fn create_iter_context(
1248 &self,
1249 ) -> (
1250 Option<HistogramTimer>,
1251 Option<Histogram>,
1252 Option<Histogram>,
1253 Option<RocksDBPerfContext>,
1254 ) {
1255 let timer = self
1256 .db_metrics
1257 .op_metrics
1258 .rocksdb_iter_latency_seconds
1259 .with_label_values(&[&self.cf])
1260 .start_timer();
1261 let bytes_scanned = self
1262 .db_metrics
1263 .op_metrics
1264 .rocksdb_iter_bytes
1265 .with_label_values(&[&self.cf]);
1266 let keys_scanned = self
1267 .db_metrics
1268 .op_metrics
1269 .rocksdb_iter_keys
1270 .with_label_values(&[&self.cf]);
1271 let perf_ctx = if self.iter_sample_interval.sample() {
1272 Some(RocksDBPerfContext)
1273 } else {
1274 None
1275 };
1276 (
1277 Some(timer),
1278 Some(bytes_scanned),
1279 Some(keys_scanned),
1280 perf_ctx,
1281 )
1282 }
1283
1284 fn create_read_options_with_bounds(
1287 &self,
1288 lower_bound: Option<K>,
1289 upper_bound: Option<K>,
1290 ) -> ReadOptions
1291 where
1292 K: Serialize,
1293 {
1294 let mut readopts = self.opts.readopts();
1295 if let Some(lower_bound) = lower_bound {
1296 let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1297 readopts.set_iterate_lower_bound(key_buf);
1298 }
1299 if let Some(upper_bound) = upper_bound {
1300 let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1301 readopts.set_iterate_upper_bound(key_buf);
1302 }
1303 readopts
1304 }
1305
1306 fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1309 where
1310 K: Serialize,
1311 {
1312 let mut readopts = self.opts.readopts();
1313
1314 let lower_bound = range.start_bound();
1315 let upper_bound = range.end_bound();
1316
1317 match lower_bound {
1318 Bound::Included(lower_bound) => {
1319 let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1321 readopts.set_iterate_lower_bound(key_buf);
1322 }
1323 Bound::Excluded(lower_bound) => {
1324 let mut key_buf =
1325 be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1326
1327 big_endian_saturating_add_one(&mut key_buf);
1329 readopts.set_iterate_lower_bound(key_buf);
1330 }
1331 Bound::Unbounded => (),
1332 };
1333
1334 match upper_bound {
1335 Bound::Included(upper_bound) => {
1336 let mut key_buf =
1337 be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1338
1339 if !is_max(&key_buf) {
1342 big_endian_saturating_add_one(&mut key_buf);
1344 readopts.set_iterate_upper_bound(key_buf);
1345 }
1346 }
1347 Bound::Excluded(upper_bound) => {
1348 let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1350 readopts.set_iterate_upper_bound(key_buf);
1351 }
1352 Bound::Unbounded => (),
1353 };
1354
1355 readopts
1356 }
1357}
1358
1359pub struct DBBatch {
1426 rocksdb: Arc<RocksDB>,
1427 batch: RocksDBBatch,
1428 opts: WriteOptions,
1429 db_metrics: Arc<DBMetrics>,
1430 write_sample_interval: SamplingInterval,
1431}
1432
1433impl DBBatch {
1434 pub fn new(
1438 dbref: &Arc<RocksDB>,
1439 batch: RocksDBBatch,
1440 opts: WriteOptions,
1441 db_metrics: &Arc<DBMetrics>,
1442 write_sample_interval: &SamplingInterval,
1443 ) -> Self {
1444 DBBatch {
1445 rocksdb: dbref.clone(),
1446 batch,
1447 opts,
1448 db_metrics: db_metrics.clone(),
1449 write_sample_interval: write_sample_interval.clone(),
1450 }
1451 }
1452
1453 #[instrument(level = "trace", skip_all, err)]
1455 pub fn write(self) -> Result<(), TypedStoreError> {
1456 let db_name = self.rocksdb.db_name();
1457 let timer = self
1458 .db_metrics
1459 .op_metrics
1460 .rocksdb_batch_commit_latency_seconds
1461 .with_label_values(&[&db_name])
1462 .start_timer();
1463 let batch_size = self.batch.size_in_bytes();
1464
1465 let perf_ctx = if self.write_sample_interval.sample() {
1466 Some(RocksDBPerfContext)
1467 } else {
1468 None
1469 };
1470 self.rocksdb.write(self.batch, &self.opts)?;
1471 self.db_metrics
1472 .op_metrics
1473 .rocksdb_batch_commit_bytes
1474 .with_label_values(&[&db_name])
1475 .observe(batch_size as f64);
1476
1477 if perf_ctx.is_some() {
1478 self.db_metrics
1479 .write_perf_ctx_metrics
1480 .report_metrics(&db_name);
1481 }
1482 let elapsed = timer.stop_and_record();
1483 if elapsed > 1.0 {
1484 warn!(?elapsed, ?db_name, "very slow batch write");
1485 self.db_metrics
1486 .op_metrics
1487 .rocksdb_very_slow_batch_writes_count
1488 .with_label_values(&[&db_name])
1489 .inc();
1490 self.db_metrics
1491 .op_metrics
1492 .rocksdb_very_slow_batch_writes_duration_ms
1493 .with_label_values(&[&db_name])
1494 .inc_by((elapsed * 1000.0) as u64);
1495 }
1496 Ok(())
1497 }
1498
1499 pub fn size_in_bytes(&self) -> usize {
1500 self.batch.size_in_bytes()
1501 }
1502}
1503
1504impl DBBatch {
1506 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1507 &mut self,
1508 db: &DBMap<K, V>,
1509 purged_vals: impl IntoIterator<Item = J>,
1510 ) -> Result<(), TypedStoreError> {
1511 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1512 return Err(TypedStoreError::CrossDBBatch);
1513 }
1514
1515 purged_vals
1516 .into_iter()
1517 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1518 let k_buf = be_fix_int_ser(k.borrow())?;
1519 self.batch.delete_cf(&db.cf(), k_buf);
1520
1521 Ok(())
1522 })?;
1523 Ok(())
1524 }
1525
1526 pub fn schedule_delete_range<K: Serialize, V>(
1536 &mut self,
1537 db: &DBMap<K, V>,
1538 from: &K,
1539 to: &K,
1540 ) -> Result<(), TypedStoreError> {
1541 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1542 return Err(TypedStoreError::CrossDBBatch);
1543 }
1544
1545 let from_buf = be_fix_int_ser(from)?;
1546 let to_buf = be_fix_int_ser(to)?;
1547
1548 self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1549 Ok(())
1550 }
1551
1552 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1554 &mut self,
1555 db: &DBMap<K, V>,
1556 new_vals: impl IntoIterator<Item = (J, U)>,
1557 ) -> Result<&mut Self, TypedStoreError> {
1558 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1559 return Err(TypedStoreError::CrossDBBatch);
1560 }
1561 let mut total = 0usize;
1562 new_vals
1563 .into_iter()
1564 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1565 let k_buf = be_fix_int_ser(k.borrow())?;
1566 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1567 total += k_buf.len() + v_buf.len();
1568 self.batch.put_cf(&db.cf(), k_buf, v_buf);
1569 Ok(())
1570 })?;
1571 self.db_metrics
1572 .op_metrics
1573 .rocksdb_batch_put_bytes
1574 .with_label_values(&[&db.cf])
1575 .observe(total as f64);
1576 Ok(self)
1577 }
1578
1579 pub fn merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1581 &mut self,
1582 db: &DBMap<K, V>,
1583 new_vals: impl IntoIterator<Item = (J, U)>,
1584 ) -> Result<&mut Self, TypedStoreError> {
1585 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1586 return Err(TypedStoreError::CrossDBBatch);
1587 }
1588
1589 new_vals
1590 .into_iter()
1591 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1592 let k_buf = be_fix_int_ser(k.borrow())?;
1593 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1594 self.batch.merge_cf(&db.cf(), k_buf, v_buf);
1595 Ok(())
1596 })?;
1597 Ok(self)
1598 }
1599
1600 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, V: Serialize, B: AsRef<[u8]>>(
1602 &mut self,
1603 db: &DBMap<K, V>,
1604 new_vals: impl IntoIterator<Item = (J, B)>,
1605 ) -> Result<&mut Self, TypedStoreError> {
1606 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1607 return Err(TypedStoreError::CrossDBBatch);
1608 }
1609 new_vals
1610 .into_iter()
1611 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1612 let k_buf = be_fix_int_ser(k.borrow())?;
1613 self.batch.merge_cf(&db.cf(), k_buf, v);
1614 Ok(())
1615 })?;
1616 Ok(self)
1617 }
1618}
1619
1620pub struct DBTransaction<'a> {
1621 rocksdb: Arc<RocksDB>,
1622 transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1623}
1624
1625impl<'a> DBTransaction<'a> {
1626 pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1627 Ok(Self {
1628 rocksdb: db.clone(),
1629 transaction: db.transaction()?,
1630 })
1631 }
1632
1633 pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1634 Ok(Self {
1635 rocksdb: db.clone(),
1636 transaction: db.transaction_without_snapshot()?,
1637 })
1638 }
1639
1640 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1641 &mut self,
1642 db: &DBMap<K, V>,
1643 new_vals: impl IntoIterator<Item = (J, U)>,
1644 ) -> Result<&mut Self, TypedStoreError> {
1645 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1646 return Err(TypedStoreError::CrossDBBatch);
1647 }
1648
1649 new_vals
1650 .into_iter()
1651 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1652 let k_buf = be_fix_int_ser(k.borrow())?;
1653 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1654 self.transaction
1655 .put_cf(&db.cf(), k_buf, v_buf)
1656 .map_err(typed_store_err_from_rocks_err)?;
1657 Ok(())
1658 })?;
1659 Ok(self)
1660 }
1661
1662 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1664 &mut self,
1665 db: &DBMap<K, V>,
1666 purged_vals: impl IntoIterator<Item = J>,
1667 ) -> Result<&mut Self, TypedStoreError> {
1668 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1669 return Err(TypedStoreError::CrossDBBatch);
1670 }
1671 purged_vals
1672 .into_iter()
1673 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1674 let k_buf = be_fix_int_ser(k.borrow())?;
1675 self.transaction
1676 .delete_cf(&db.cf(), k_buf)
1677 .map_err(typed_store_err_from_rocks_err)?;
1678 Ok(())
1679 })?;
1680 Ok(self)
1681 }
1682
1683 pub fn snapshot(
1684 &self,
1685 ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1686 {
1687 self.transaction.snapshot()
1688 }
1689
1690 pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1691 &self,
1692 db: &DBMap<K, V>,
1693 key: &K,
1694 ) -> Result<Option<V>, TypedStoreError> {
1695 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1696 return Err(TypedStoreError::CrossDBBatch);
1697 }
1698 let k_buf = be_fix_int_ser(key)?;
1699 match self
1700 .transaction
1701 .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1702 .map_err(typed_store_err_from_rocks_err)?
1703 {
1704 Some(data) => Ok(Some(
1705 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1706 )),
1707 None => Ok(None),
1708 }
1709 }
1710
1711 pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1712 &self,
1713 db: &DBMap<K, V>,
1714 key: &K,
1715 ) -> Result<Option<V>, TypedStoreError> {
1716 let key_buf = be_fix_int_ser(key)?;
1717 self.transaction
1718 .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1719 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1720 .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1721 }
1722
1723 pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1724 &self,
1725 db: &DBMap<K, V>,
1726 keys: impl IntoIterator<Item = J>,
1727 ) -> Result<Vec<Option<V>>, TypedStoreError> {
1728 let cf = db.cf();
1729 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1730 .into_iter()
1731 .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1732 .collect();
1733
1734 let results = self
1735 .transaction
1736 .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1737
1738 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1739 .into_iter()
1740 .map(
1741 |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1742 Some(data) => Ok(Some(
1743 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1744 )),
1745 None => Ok(None),
1746 },
1747 )
1748 .collect();
1749
1750 values_parsed
1751 }
1752
1753 pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1754 &'a self,
1755 db: &DBMap<K, V>,
1756 ) -> Iter<'a, K, V> {
1757 let db_iter = self
1758 .transaction
1759 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1760 Iter::new(
1761 db.cf.clone(),
1762 RocksDBRawIter::OptimisticTransaction(db_iter),
1763 None,
1764 None,
1765 None,
1766 None,
1767 None,
1768 )
1769 }
1770
1771 pub fn keys<K: DeserializeOwned, V: DeserializeOwned>(
1772 &'a self,
1773 db: &DBMap<K, V>,
1774 ) -> Keys<'a, K> {
1775 let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1776 self.transaction
1777 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1778 );
1779 db_iter.seek_to_first();
1780
1781 Keys::new(db_iter)
1782 }
1783
1784 pub fn values<K: DeserializeOwned, V: DeserializeOwned>(
1785 &'a self,
1786 db: &DBMap<K, V>,
1787 ) -> Values<'a, V> {
1788 let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1789 self.transaction
1790 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1791 );
1792 db_iter.seek_to_first();
1793
1794 Values::new(db_iter)
1795 }
1796
1797 pub fn commit(self) -> Result<(), TypedStoreError> {
1798 fail_point!("transaction-commit");
1799 self.transaction.commit().map_err(|e| match e.kind() {
1800 ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1803 _ => typed_store_err_from_rocks_err(e),
1804 })?;
1805 Ok(())
1806 }
1807}
1808
1809macro_rules! delegate_iter_call {
1810 ($self:ident.$method:ident($($args:ident),*)) => {
1811 match $self {
1812 Self::DB(db) => db.$method($($args),*),
1813 Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1814 Self::OptimisticTransaction(db) => db.$method($($args),*),
1815 }
1816 }
1817}
1818
1819pub enum RocksDBRawIter<'a> {
1820 DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1821 OptimisticTransactionDB(
1822 rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1823 ),
1824 OptimisticTransaction(
1825 rocksdb::DBRawIteratorWithThreadMode<
1826 'a,
1827 Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1828 >,
1829 ),
1830}
1831
1832impl RocksDBRawIter<'_> {
1833 pub fn valid(&self) -> bool {
1834 delegate_iter_call!(self.valid())
1835 }
1836 pub fn key(&self) -> Option<&[u8]> {
1837 delegate_iter_call!(self.key())
1838 }
1839 pub fn value(&self) -> Option<&[u8]> {
1840 delegate_iter_call!(self.value())
1841 }
1842 pub fn next(&mut self) {
1843 delegate_iter_call!(self.next())
1844 }
1845 pub fn prev(&mut self) {
1846 delegate_iter_call!(self.prev())
1847 }
1848 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1849 delegate_iter_call!(self.seek(key))
1850 }
1851 pub fn seek_to_last(&mut self) {
1852 delegate_iter_call!(self.seek_to_last())
1853 }
1854 pub fn seek_to_first(&mut self) {
1855 delegate_iter_call!(self.seek_to_first())
1856 }
1857 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1858 delegate_iter_call!(self.seek_for_prev(key))
1859 }
1860 pub fn status(&self) -> Result<(), rocksdb::Error> {
1861 delegate_iter_call!(self.status())
1862 }
1863}
1864
1865pub enum RocksDBIter<'a> {
1866 DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1867 OptimisticTransactionDB(
1868 rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1869 ),
1870}
1871
1872impl Iterator for RocksDBIter<'_> {
1873 type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1874 fn next(&mut self) -> Option<Self::Item> {
1875 match self {
1876 Self::DB(db) => db.next(),
1877 Self::OptimisticTransactionDB(db) => db.next(),
1878 }
1879 }
1880}
1881
1882impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1883where
1884 K: Serialize + DeserializeOwned,
1885 V: Serialize + DeserializeOwned,
1886{
1887 type Error = TypedStoreError;
1888 type Iterator = Iter<'a, K, V>;
1889 type SafeIterator = SafeIter<'a, K, V>;
1890 type Keys = Keys<'a, K>;
1891 type Values = Values<'a, V>;
1892
1893 #[instrument(level = "trace", skip_all, err)]
1894 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1895 let key_buf = be_fix_int_ser(key)?;
1896 let readopts = self.opts.readopts();
1899 Ok(self
1900 .rocksdb
1901 .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1902 && self
1903 .rocksdb
1904 .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1905 .map_err(typed_store_err_from_rocks_err)?
1906 .is_some())
1907 }
1908
1909 #[instrument(level = "trace", skip_all, err)]
1910 fn multi_contains_keys<J>(
1911 &self,
1912 keys: impl IntoIterator<Item = J>,
1913 ) -> Result<Vec<bool>, Self::Error>
1914 where
1915 J: Borrow<K>,
1916 {
1917 let values = self.multi_get_pinned(keys)?;
1918 Ok(values.into_iter().map(|v| v.is_some()).collect())
1919 }
1920
1921 #[instrument(level = "trace", skip_all, err)]
1922 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1923 let _timer = self
1924 .db_metrics
1925 .op_metrics
1926 .rocksdb_get_latency_seconds
1927 .with_label_values(&[&self.cf])
1928 .start_timer();
1929 let perf_ctx = if self.get_sample_interval.sample() {
1930 Some(RocksDBPerfContext)
1931 } else {
1932 None
1933 };
1934 let key_buf = be_fix_int_ser(key)?;
1935 let res = self
1936 .rocksdb
1937 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1938 .map_err(typed_store_err_from_rocks_err)?;
1939 self.db_metrics
1940 .op_metrics
1941 .rocksdb_get_bytes
1942 .with_label_values(&[&self.cf])
1943 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1944 if perf_ctx.is_some() {
1945 self.db_metrics
1946 .read_perf_ctx_metrics
1947 .report_metrics(&self.cf);
1948 }
1949 match res {
1950 Some(data) => Ok(Some(
1951 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1952 )),
1953 None => Ok(None),
1954 }
1955 }
1956
1957 #[instrument(level = "trace", skip_all, err)]
1958 fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
1959 let _timer = self
1960 .db_metrics
1961 .op_metrics
1962 .rocksdb_get_latency_seconds
1963 .with_label_values(&[&self.cf])
1964 .start_timer();
1965 let perf_ctx = if self.get_sample_interval.sample() {
1966 Some(RocksDBPerfContext)
1967 } else {
1968 None
1969 };
1970 let key_buf = be_fix_int_ser(key)?;
1971 let res = self
1972 .rocksdb
1973 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1974 .map_err(typed_store_err_from_rocks_err)?;
1975 self.db_metrics
1976 .op_metrics
1977 .rocksdb_get_bytes
1978 .with_label_values(&[&self.cf])
1979 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1980 if perf_ctx.is_some() {
1981 self.db_metrics
1982 .read_perf_ctx_metrics
1983 .report_metrics(&self.cf);
1984 }
1985 match res {
1986 Some(data) => Ok(Some(data.to_vec())),
1987 None => Ok(None),
1988 }
1989 }
1990
1991 #[instrument(level = "trace", skip_all, err)]
1992 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1993 let timer = self
1994 .db_metrics
1995 .op_metrics
1996 .rocksdb_put_latency_seconds
1997 .with_label_values(&[&self.cf])
1998 .start_timer();
1999 let perf_ctx = if self.write_sample_interval.sample() {
2000 Some(RocksDBPerfContext)
2001 } else {
2002 None
2003 };
2004 let key_buf = be_fix_int_ser(key)?;
2005 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
2006 self.db_metrics
2007 .op_metrics
2008 .rocksdb_put_bytes
2009 .with_label_values(&[&self.cf])
2010 .observe((key_buf.len() + value_buf.len()) as f64);
2011 if perf_ctx.is_some() {
2012 self.db_metrics
2013 .write_perf_ctx_metrics
2014 .report_metrics(&self.cf);
2015 }
2016 self.rocksdb
2017 .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
2018 .map_err(typed_store_err_from_rocks_err)?;
2019
2020 let elapsed = timer.stop_and_record();
2021 if elapsed > 1.0 {
2022 warn!(?elapsed, cf = ?self.cf, "very slow insert");
2023 self.db_metrics
2024 .op_metrics
2025 .rocksdb_very_slow_puts_count
2026 .with_label_values(&[&self.cf])
2027 .inc();
2028 self.db_metrics
2029 .op_metrics
2030 .rocksdb_very_slow_puts_duration_ms
2031 .with_label_values(&[&self.cf])
2032 .inc_by((elapsed * 1000.0) as u64);
2033 }
2034
2035 Ok(())
2036 }
2037
2038 #[instrument(level = "trace", skip_all, err)]
2039 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
2040 let _timer = self
2041 .db_metrics
2042 .op_metrics
2043 .rocksdb_delete_latency_seconds
2044 .with_label_values(&[&self.cf])
2045 .start_timer();
2046 let perf_ctx = if self.write_sample_interval.sample() {
2047 Some(RocksDBPerfContext)
2048 } else {
2049 None
2050 };
2051 let key_buf = be_fix_int_ser(key)?;
2052 self.rocksdb
2053 .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2054 .map_err(typed_store_err_from_rocks_err)?;
2055 self.db_metrics
2056 .op_metrics
2057 .rocksdb_deletes
2058 .with_label_values(&[&self.cf])
2059 .inc();
2060 if perf_ctx.is_some() {
2061 self.db_metrics
2062 .write_perf_ctx_metrics
2063 .report_metrics(&self.cf);
2064 }
2065 Ok(())
2066 }
2067
2068 #[instrument(level = "trace", skip_all, err)]
2078 fn delete_file_in_range(&self, from: &K, to: &K) -> Result<(), TypedStoreError> {
2079 let from_buf = be_fix_int_ser(from.borrow())?;
2080 let to_buf = be_fix_int_ser(to.borrow())?;
2081 self.rocksdb
2082 .delete_file_in_range(&self.cf(), from_buf, to_buf)
2083 .map_err(typed_store_err_from_rocks_err)?;
2084 Ok(())
2085 }
2086
2087 #[instrument(level = "trace", skip_all, err)]
2092 fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2093 let _ = self.rocksdb.drop_cf(&self.cf);
2094 self.rocksdb
2095 .create_cf(self.cf.clone(), &default_db_options().options)
2096 .map_err(typed_store_err_from_rocks_err)?;
2097 Ok(())
2098 }
2099
2100 #[instrument(level = "trace", skip_all, err)]
2109 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2110 let mut iter = self.unbounded_iter().seek_to_first();
2111 let first_key = iter.next().map(|(k, _v)| k);
2112 let last_key = iter.skip_to_last().next().map(|(k, _v)| k);
2113 if let Some((first_key, last_key)) = first_key.zip(last_key) {
2114 let mut batch = self.batch();
2115 batch.schedule_delete_range(self, &first_key, &last_key)?;
2116 batch.write()?;
2117 }
2118 Ok(())
2119 }
2120
2121 fn is_empty(&self) -> bool {
2122 self.safe_iter().next().is_none()
2123 }
2124
2125 fn unbounded_iter(&'a self) -> Self::Iterator {
2128 let db_iter = self
2129 .rocksdb
2130 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2131 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2132 Iter::new(
2133 self.cf.clone(),
2134 db_iter,
2135 _timer,
2136 _perf_ctx,
2137 bytes_scanned,
2138 keys_scanned,
2139 Some(self.db_metrics.clone()),
2140 )
2141 }
2142
2143 fn iter_with_bounds(
2147 &'a self,
2148 lower_bound: Option<K>,
2149 upper_bound: Option<K>,
2150 ) -> Self::Iterator {
2151 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2152 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2153 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2154 Iter::new(
2155 self.cf.clone(),
2156 db_iter,
2157 _timer,
2158 _perf_ctx,
2159 bytes_scanned,
2160 keys_scanned,
2161 Some(self.db_metrics.clone()),
2162 )
2163 }
2164
2165 fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2168 let readopts = self.create_read_options_with_range(range);
2169 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2170 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2171 Iter::new(
2172 self.cf.clone(),
2173 db_iter,
2174 _timer,
2175 _perf_ctx,
2176 bytes_scanned,
2177 keys_scanned,
2178 Some(self.db_metrics.clone()),
2179 )
2180 }
2181
2182 fn safe_iter(&'a self) -> Self::SafeIterator {
2183 let db_iter = self
2184 .rocksdb
2185 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2186 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2187 SafeIter::new(
2188 self.cf.clone(),
2189 db_iter,
2190 _timer,
2191 _perf_ctx,
2192 bytes_scanned,
2193 keys_scanned,
2194 Some(self.db_metrics.clone()),
2195 )
2196 }
2197
2198 fn safe_iter_with_bounds(
2199 &'a self,
2200 lower_bound: Option<K>,
2201 upper_bound: Option<K>,
2202 ) -> Self::SafeIterator {
2203 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2204 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2205 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2206 SafeIter::new(
2207 self.cf.clone(),
2208 db_iter,
2209 _timer,
2210 _perf_ctx,
2211 bytes_scanned,
2212 keys_scanned,
2213 Some(self.db_metrics.clone()),
2214 )
2215 }
2216
2217 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2218 let readopts = self.create_read_options_with_range(range);
2219 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2220 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2221 SafeIter::new(
2222 self.cf.clone(),
2223 db_iter,
2224 _timer,
2225 _perf_ctx,
2226 bytes_scanned,
2227 keys_scanned,
2228 Some(self.db_metrics.clone()),
2229 )
2230 }
2231
2232 fn keys(&'a self) -> Self::Keys {
2233 let mut db_iter = self
2234 .rocksdb
2235 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2236 db_iter.seek_to_first();
2237
2238 Keys::new(db_iter)
2239 }
2240
2241 fn values(&'a self) -> Self::Values {
2242 let mut db_iter = self
2243 .rocksdb
2244 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2245 db_iter.seek_to_first();
2246
2247 Values::new(db_iter)
2248 }
2249
2250 #[instrument(level = "trace", skip_all, err)]
2252 fn multi_get_raw_bytes<J>(
2253 &self,
2254 keys: impl IntoIterator<Item = J>,
2255 ) -> Result<Vec<Option<Vec<u8>>>, TypedStoreError>
2256 where
2257 J: Borrow<K>,
2258 {
2259 let results = self
2260 .multi_get_pinned(keys)?
2261 .into_iter()
2262 .map(|val| val.map(|v| v.to_vec()))
2263 .collect();
2264 Ok(results)
2265 }
2266
2267 #[instrument(level = "trace", skip_all, err)]
2269 fn multi_get<J>(
2270 &self,
2271 keys: impl IntoIterator<Item = J>,
2272 ) -> Result<Vec<Option<V>>, TypedStoreError>
2273 where
2274 J: Borrow<K>,
2275 {
2276 let results = self.multi_get_pinned(keys)?;
2277 let values_parsed: Result<Vec<_>, TypedStoreError> = results
2278 .into_iter()
2279 .map(|value_byte| match value_byte {
2280 Some(data) => Ok(Some(
2281 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2282 )),
2283 None => Ok(None),
2284 })
2285 .collect();
2286
2287 values_parsed
2288 }
2289
2290 #[instrument(level = "trace", skip_all, err)]
2292 fn chunked_multi_get<J>(
2293 &self,
2294 keys: impl IntoIterator<Item = J>,
2295 chunk_size: usize,
2296 ) -> Result<Vec<Option<V>>, TypedStoreError>
2297 where
2298 J: Borrow<K>,
2299 {
2300 let cf = self.cf();
2301 let keys_bytes = keys
2302 .into_iter()
2303 .map(|k| (&cf, be_fix_int_ser(k.borrow()).unwrap()));
2304 let chunked_keys = keys_bytes.into_iter().chunks(chunk_size);
2305 let snapshot = self.snapshot()?;
2306 let mut results = vec![];
2307 for chunk in chunked_keys.into_iter() {
2308 let chunk_result = snapshot.multi_get_cf(chunk);
2309 let values_parsed: Result<Vec<_>, TypedStoreError> = chunk_result
2310 .into_iter()
2311 .map(|value_byte| {
2312 let value_byte = value_byte.map_err(typed_store_err_from_rocks_err)?;
2313 match value_byte {
2314 Some(data) => Ok(Some(
2315 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2316 )),
2317 None => Ok(None),
2318 }
2319 })
2320 .collect();
2321 results.extend(values_parsed?);
2322 }
2323 Ok(results)
2324 }
2325
2326 #[instrument(level = "trace", skip_all, err)]
2328 fn multi_insert<J, U>(
2329 &self,
2330 key_val_pairs: impl IntoIterator<Item = (J, U)>,
2331 ) -> Result<(), Self::Error>
2332 where
2333 J: Borrow<K>,
2334 U: Borrow<V>,
2335 {
2336 let mut batch = self.batch();
2337 batch.insert_batch(self, key_val_pairs)?;
2338 batch.write()
2339 }
2340
2341 #[instrument(level = "trace", skip_all, err)]
2343 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2344 where
2345 J: Borrow<K>,
2346 {
2347 let mut batch = self.batch();
2348 batch.delete_batch(self, keys)?;
2349 batch.write()
2350 }
2351
2352 #[instrument(level = "trace", skip_all, err)]
2354 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2355 self.rocksdb
2356 .try_catch_up_with_primary()
2357 .map_err(typed_store_err_from_rocks_err)
2358 }
2359}
2360
2361impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2362where
2363 J: Borrow<K>,
2364 U: Borrow<V>,
2365 K: Serialize,
2366 V: Serialize,
2367{
2368 type Error = TypedStoreError;
2369
2370 fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2371 where
2372 T: Iterator<Item = (J, U)>,
2373 {
2374 let mut batch = self.batch();
2375 batch.insert_batch(self, iter)?;
2376 batch.write()
2377 }
2378
2379 fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2380 let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2381 let mut batch = self.batch();
2382 batch.insert_batch(self, slice_of_refs)?;
2383 batch.write()
2384 }
2385}
2386
2387pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2388 env::var(var_name)
2389 .ok()?
2390 .parse::<usize>()
2391 .tap_err(|e| {
2392 warn!(
2393 "Env var {} does not contain valid usize integer: {}",
2394 var_name, e
2395 )
2396 })
2397 .ok()
2398}
2399
2400#[derive(Clone, Debug)]
2401pub struct ReadWriteOptions {
2402 pub ignore_range_deletions: bool,
2403 sync_to_disk: bool,
2405}
2406
2407impl ReadWriteOptions {
2408 pub fn readopts(&self) -> ReadOptions {
2409 let mut readopts = ReadOptions::default();
2410 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2411 readopts
2412 }
2413
2414 pub fn writeopts(&self) -> WriteOptions {
2415 let mut opts = WriteOptions::default();
2416 opts.set_sync(self.sync_to_disk);
2417 opts
2418 }
2419
2420 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2421 self.ignore_range_deletions = ignore;
2422 self
2423 }
2424}
2425
2426impl Default for ReadWriteOptions {
2427 fn default() -> Self {
2428 Self {
2429 ignore_range_deletions: true,
2430 sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2431 }
2432 }
2433}
2434#[derive(Default, Clone)]
2437pub struct DBOptions {
2438 pub options: rocksdb::Options,
2439 pub rw_options: ReadWriteOptions,
2440}
2441
2442impl DBOptions {
2443 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2447 self.options
2449 .optimize_for_point_lookup(block_cache_size_mb as u64);
2450 self
2451 }
2452
2453 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2456 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2457 info!("Large value blob storage optimization is disabled via env var.");
2458 return self;
2459 }
2460
2461 self.options.set_enable_blob_files(true);
2463 self.options
2464 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2465 self.options.set_enable_blob_gc(true);
2466 self.options.set_min_blob_size(min_blob_size);
2470
2471 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2473 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2474 * 1024
2475 * 1024;
2476 self.options.set_write_buffer_size(write_buffer_size);
2477 let target_file_size_base = 64 << 20;
2480 self.options
2481 .set_target_file_size_base(target_file_size_base);
2482 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2484 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2485 self.options
2486 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2487
2488 self
2489 }
2490
2491 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2493 self.options
2494 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2495 self
2496 }
2497
2498 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2500 self.options
2501 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2502 self.options
2503 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2504 self
2505 }
2506
2507 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2509 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2511 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2512 * 1024
2513 * 1024;
2514 self.options.set_write_buffer_size(write_buffer_size);
2515 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2517 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2518 self.options
2519 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2520 self.options
2522 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2523
2524 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2526 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2527 self.options.set_level_zero_file_num_compaction_trigger(
2528 max_level_zero_file_num.try_into().unwrap(),
2529 );
2530 self.options.set_level_zero_slowdown_writes_trigger(
2531 (max_level_zero_file_num * 12).try_into().unwrap(),
2532 );
2533 self.options
2534 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2535
2536 self.options.set_target_file_size_base(
2538 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2539 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2540 * 1024
2541 * 1024,
2542 );
2543
2544 self.options
2546 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2547
2548 self
2549 }
2550
2551 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2555 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2557 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2558 * 1024
2559 * 1024;
2560 self.options.set_write_buffer_size(write_buffer_size);
2561 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2563 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2564 self.options
2565 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2566 self.options
2568 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2569
2570 self.options
2572 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2573 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2574 compaction_options.set_max_size_amplification_percent(10000);
2575 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2576 self.options
2577 .set_universal_compaction_options(&compaction_options);
2578
2579 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2580 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2581 self.options.set_level_zero_file_num_compaction_trigger(
2582 max_level_zero_file_num.try_into().unwrap(),
2583 );
2584 self.options.set_level_zero_slowdown_writes_trigger(
2585 (max_level_zero_file_num * 12).try_into().unwrap(),
2586 );
2587 self.options
2588 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2589
2590 self.options.set_target_file_size_base(
2592 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2593 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2594 * 1024
2595 * 1024,
2596 );
2597
2598 self.options
2600 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2601
2602 self
2603 }
2604
2605 pub fn set_block_options(
2607 mut self,
2608 block_cache_size_mb: usize,
2609 block_size_bytes: usize,
2610 ) -> DBOptions {
2611 self.options
2612 .set_block_based_table_factory(&get_block_options(
2613 block_cache_size_mb,
2614 block_size_bytes,
2615 ));
2616 self
2617 }
2618
2619 pub fn disable_write_throttling(mut self) -> DBOptions {
2621 self.options.set_soft_pending_compaction_bytes_limit(0);
2622 self.options.set_hard_pending_compaction_bytes_limit(0);
2623 self
2624 }
2625}
2626
2627pub fn default_db_options() -> DBOptions {
2630 let mut opt = rocksdb::Options::default();
2631
2632 if let Some(limit) = fdlimit::raise_fd_limit() {
2636 opt.set_max_open_files((limit / 8) as i32);
2638 }
2639
2640 opt.set_table_cache_num_shard_bits(10);
2643
2644 opt.set_min_level_to_compress(2);
2646 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2647 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2648 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2649
2650 opt.set_db_write_buffer_size(
2662 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2663 * 1024
2664 * 1024,
2665 );
2666 opt.set_max_total_wal_size(
2667 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2668 );
2669
2670 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2672
2673 opt.set_enable_pipelined_write(true);
2674
2675 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2678
2679 opt.set_memtable_prefix_bloom_ratio(0.02);
2681
2682 DBOptions {
2683 options: opt,
2684 rw_options: ReadWriteOptions::default(),
2685 }
2686}
2687
2688fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2689 let mut block_options = BlockBasedOptions::default();
2694 block_options.set_block_size(block_size_bytes);
2696 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2698 block_options.set_bloom_filter(10.0, false);
2700 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2702 block_options
2703}
2704
2705#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2708pub fn open_cf<P: AsRef<Path>>(
2709 path: P,
2710 db_options: Option<rocksdb::Options>,
2711 metric_conf: MetricConf,
2712 opt_cfs: &[&str],
2713) -> Result<Arc<RocksDB>, TypedStoreError> {
2714 let options = db_options.unwrap_or_else(|| default_db_options().options);
2715 let column_descriptors: Vec<_> = opt_cfs
2716 .iter()
2717 .map(|name| (*name, options.clone()))
2718 .collect();
2719 open_cf_opts(
2720 path,
2721 Some(options.clone()),
2722 metric_conf,
2723 &column_descriptors[..],
2724 )
2725}
2726
2727fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2728 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2730 options.create_if_missing(true);
2731 options.create_missing_column_families(true);
2732 options
2733}
2734
2735#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2738pub fn open_cf_opts<P: AsRef<Path>>(
2739 path: P,
2740 db_options: Option<rocksdb::Options>,
2741 metric_conf: MetricConf,
2742 opt_cfs: &[(&str, rocksdb::Options)],
2743) -> Result<Arc<RocksDB>, TypedStoreError> {
2744 let path = path.as_ref();
2745 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2755 nondeterministic!({
2756 let options = prepare_db_options(db_options);
2757 let rocksdb = {
2758 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2759 &options,
2760 path,
2761 cfs.into_iter()
2762 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2763 )
2764 .map_err(typed_store_err_from_rocks_err)?
2765 };
2766 Ok(Arc::new(RocksDB::DBWithThreadMode(
2767 DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2768 )))
2769 })
2770}
2771
2772#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2775pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2776 path: P,
2777 db_options: Option<rocksdb::Options>,
2778 metric_conf: MetricConf,
2779 opt_cfs: &[(&str, rocksdb::Options)],
2780) -> Result<Arc<RocksDB>, TypedStoreError> {
2781 let path = path.as_ref();
2782 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2783 nondeterministic!({
2785 let options = prepare_db_options(db_options);
2786 let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2787 &options,
2788 path,
2789 cfs.into_iter()
2790 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2791 )
2792 .map_err(typed_store_err_from_rocks_err)?;
2793 Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2794 OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2795 )))
2796 })
2797}
2798
2799pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2802 primary_path: P,
2803 secondary_path: Option<P>,
2804 db_options: Option<rocksdb::Options>,
2805 metric_conf: MetricConf,
2806 opt_cfs: &[(&str, rocksdb::Options)],
2807) -> Result<Arc<RocksDB>, TypedStoreError> {
2808 let primary_path = primary_path.as_ref();
2809 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2810 nondeterministic!({
2812 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2814
2815 fdlimit::raise_fd_limit();
2816 options.set_max_open_files(-1);
2818
2819 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2820 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2821 .ok()
2822 .unwrap_or_default();
2823
2824 let default_db_options = default_db_options();
2825 for cf_key in cfs.iter() {
2827 if !opt_cfs.contains_key(&cf_key[..]) {
2828 opt_cfs.insert(cf_key, default_db_options.options.clone());
2829 }
2830 }
2831
2832 let primary_path = primary_path.to_path_buf();
2833 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2834 let mut s = primary_path.clone();
2835 s.pop();
2836 s.push("SECONDARY");
2837 s.as_path().to_path_buf()
2838 });
2839
2840 let rocksdb = {
2841 options.create_if_missing(true);
2842 options.create_missing_column_families(true);
2843 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2844 &options,
2845 &primary_path,
2846 &secondary_path,
2847 opt_cfs
2848 .iter()
2849 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2850 )
2851 .map_err(typed_store_err_from_rocks_err)?;
2852 db.try_catch_up_with_primary()
2853 .map_err(typed_store_err_from_rocks_err)?;
2854 db
2855 };
2856 Ok(Arc::new(RocksDB::DBWithThreadMode(
2857 DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2858 )))
2859 })
2860}
2861
2862pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2863 const DB_DEFAULT_CF_NAME: &str = "default";
2864
2865 let opts = rocksdb::Options::default();
2866 rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2867 .map_err(|e| e.into())
2868 .map(|q| {
2869 q.iter()
2870 .filter_map(|s| {
2871 if s != DB_DEFAULT_CF_NAME {
2873 Some(s.clone())
2874 } else {
2875 None
2876 }
2877 })
2878 .collect()
2879 })
2880}
2881
2882#[inline]
2884pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2885where
2886 S: ?Sized + serde::Serialize,
2887{
2888 bincode::DefaultOptions::new()
2889 .with_big_endian()
2890 .with_fixint_encoding()
2891 .serialize(t)
2892 .map_err(typed_store_err_from_bincode_err)
2893}
2894
2895#[derive(Clone)]
2896pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2897impl DBMapTableConfigMap {
2898 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2899 Self(map)
2900 }
2901
2902 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2903 self.0.clone()
2904 }
2905}
2906
2907pub enum RocksDBAccessType {
2908 Primary,
2909 Secondary(Option<PathBuf>),
2910}
2911
2912pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2913 rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2914}
2915
2916fn populate_missing_cfs(
2917 input_cfs: &[(&str, rocksdb::Options)],
2918 path: &Path,
2919) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2920 let mut cfs = vec![];
2921 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2922 let existing_cfs =
2923 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2924 .ok()
2925 .unwrap_or_default();
2926
2927 for cf_name in existing_cfs {
2928 if !input_cf_index.contains(&cf_name[..]) {
2929 cfs.push((cf_name, rocksdb::Options::default()));
2930 }
2931 }
2932 cfs.extend(
2933 input_cfs
2934 .iter()
2935 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2936 );
2937 Ok(cfs)
2938}
2939
2940fn big_endian_saturating_add_one(v: &mut [u8]) {
2944 if is_max(v) {
2945 return;
2946 }
2947 for i in (0..v.len()).rev() {
2948 if v[i] == u8::MAX {
2949 v[i] = 0;
2950 } else {
2951 v[i] += 1;
2952 break;
2953 }
2954 }
2955}
2956
2957fn is_max(v: &[u8]) -> bool {
2959 v.iter().all(|&x| x == u8::MAX)
2960}
2961
2962#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2965#[test]
2966fn test_helpers() {
2967 let v = vec![];
2968 assert!(is_max(&v));
2969
2970 fn check_add(v: Vec<u8>) {
2971 let mut v = v;
2972 let num = Num32::from_big_endian(&v);
2973 big_endian_saturating_add_one(&mut v);
2974 assert!(num + 1 == Num32::from_big_endian(&v));
2975 }
2976
2977 uint::construct_uint! {
2978 struct Num32(4);
2980 }
2981
2982 let mut v = vec![255; 32];
2983 big_endian_saturating_add_one(&mut v);
2984 assert!(Num32::MAX == Num32::from_big_endian(&v));
2985
2986 check_add(vec![1; 32]);
2987 check_add(vec![6; 32]);
2988 check_add(vec![254; 32]);
2989
2990 }