1pub mod errors;
6pub(crate) mod iter;
7pub(crate) mod keys;
8pub(crate) mod safe_iter;
9pub mod util;
10pub(crate) mod values;
11
12use std::{
13 borrow::Borrow,
14 collections::{BTreeMap, HashSet},
15 env,
16 ffi::CStr,
17 marker::PhantomData,
18 ops::{Bound, RangeBounds},
19 path::{Path, PathBuf},
20 sync::Arc,
21 time::Duration,
22};
23
24use bincode::Options;
25use collectable::TryExtend;
26use iota_macros::{fail_point, nondeterministic};
27use itertools::Itertools;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::{
30 AsColumnFamilyRef, BlockBasedOptions, BottommostLevelCompaction, CStrLike, Cache,
31 ColumnFamilyDescriptor, CompactOptions, DBPinnableSlice, DBWithThreadMode, Error, ErrorKind,
32 IteratorMode, LiveFile, MultiThreaded, OptimisticTransactionDB, OptimisticTransactionOptions,
33 ReadOptions, SnapshotWithThreadMode, Transaction, WriteBatch, WriteBatchWithTransaction,
34 WriteOptions, checkpoint::Checkpoint, properties,
35};
36use serde::{Serialize, de::DeserializeOwned};
37use tap::TapFallible;
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, instrument, warn};
40
41use self::{iter::Iter, keys::Keys, values::Values};
42use crate::{
43 TypedStoreError,
44 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
45 rocks::{
46 errors::{
47 typed_store_err_from_bcs_err, typed_store_err_from_bincode_err,
48 typed_store_err_from_rocks_err,
49 },
50 safe_iter::SafeIter,
51 },
52 traits::{Map, TableSummary},
53};
54
55const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
58const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
59
60const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
63const DEFAULT_DB_WAL_SIZE: usize = 1024;
64
65const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
68const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
69const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
70const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
71const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
72const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
73const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
74const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
75const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
76
77const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
79
80const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
81
82const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
85 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
86
87#[cfg(test)]
88mod tests;
89
90#[macro_export]
128macro_rules! reopen {
129 ( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
130 (
131 $(
132 DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
133 ),*
134 )
135 };
136}
137
138#[macro_export]
142macro_rules! retry_transaction {
143 ($transaction:expr) => {
144 retry_transaction!($transaction, Some(20))
145 };
146
147 (
148 $transaction:expr,
149 $max_retries:expr $(,)?
151
152 ) => {{
153 use rand::{
154 distributions::{Distribution, Uniform},
155 rngs::ThreadRng,
156 };
157 use tokio::time::{Duration, sleep};
158 use tracing::{error, info};
159
160 let mut retries = 0;
161 let max_retries = $max_retries;
162 loop {
163 let status = $transaction;
164 match status {
165 Err(TypedStoreError::RetryableTransaction) => {
166 retries += 1;
167 let delay = {
169 let mut rng = ThreadRng::default();
170 Duration::from_millis(Uniform::new(0, 50).sample(&mut rng))
171 };
172 if let Some(max_retries) = max_retries {
173 if retries > max_retries {
174 error!(?max_retries, "max retries exceeded");
175 break status;
176 }
177 }
178 if retries > 10 {
179 error!(?delay, ?retries, "excessive transaction retries...");
181 } else {
182 info!(
183 ?delay,
184 ?retries,
185 "transaction write conflict detected, sleeping"
186 );
187 }
188 sleep(delay).await;
189 }
190 _ => break status,
191 }
192 }
193 }};
194}
195
196#[macro_export]
197macro_rules! retry_transaction_forever {
198 ($transaction:expr) => {
199 $crate::retry_transaction!($transaction, None)
200 };
201}
202
203#[derive(Debug)]
204pub struct DBWithThreadModeWrapper {
205 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
206 pub metric_conf: MetricConf,
207 pub db_path: PathBuf,
208}
209
210impl DBWithThreadModeWrapper {
211 fn new(
212 underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
213 metric_conf: MetricConf,
214 db_path: PathBuf,
215 ) -> Self {
216 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
217 Self {
218 underlying,
219 metric_conf,
220 db_path,
221 }
222 }
223}
224
225impl Drop for DBWithThreadModeWrapper {
226 fn drop(&mut self) {
227 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
228 }
229}
230
231#[derive(Debug)]
232pub struct OptimisticTransactionDBWrapper {
233 pub underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
234 pub metric_conf: MetricConf,
235 pub db_path: PathBuf,
236}
237
238impl OptimisticTransactionDBWrapper {
239 fn new(
240 underlying: rocksdb::OptimisticTransactionDB<MultiThreaded>,
241 metric_conf: MetricConf,
242 db_path: PathBuf,
243 ) -> Self {
244 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
245 Self {
246 underlying,
247 metric_conf,
248 db_path,
249 }
250 }
251}
252
253impl Drop for OptimisticTransactionDBWrapper {
254 fn drop(&mut self) {
255 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
256 }
257}
258
259#[derive(Debug)]
261pub enum RocksDB {
262 DBWithThreadMode(DBWithThreadModeWrapper),
263 OptimisticTransactionDB(OptimisticTransactionDBWrapper),
264}
265
266macro_rules! delegate_call {
267 ($self:ident.$method:ident($($args:ident),*)) => {
268 match $self {
269 Self::DBWithThreadMode(d) => d.underlying.$method($($args),*),
270 Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*),
271 }
272 }
273}
274
275impl Drop for RocksDB {
276 fn drop(&mut self) {
277 delegate_call!(self.cancel_all_background_work(true))
278 }
279}
280
281impl RocksDB {
282 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, rocksdb::Error> {
283 delegate_call!(self.get(key))
284 }
285
286 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
287 &'a self,
288 keys: I,
289 readopts: &ReadOptions,
290 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
291 where
292 K: AsRef<[u8]>,
293 I: IntoIterator<Item = (&'b W, K)>,
294 W: 'b + AsColumnFamilyRef,
295 {
296 delegate_call!(self.multi_get_cf_opt(keys, readopts))
297 }
298
299 pub fn batched_multi_get_cf_opt<I, K>(
300 &self,
301 cf: &impl AsColumnFamilyRef,
302 keys: I,
303 sorted_input: bool,
304 readopts: &ReadOptions,
305 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
306 where
307 I: IntoIterator<Item = K>,
308 K: AsRef<[u8]>,
309 {
310 delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts))
311 }
312
313 pub fn property_int_value_cf(
314 &self,
315 cf: &impl AsColumnFamilyRef,
316 name: impl CStrLike,
317 ) -> Result<Option<u64>, rocksdb::Error> {
318 delegate_call!(self.property_int_value_cf(cf, name))
319 }
320
321 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
322 &self,
323 cf: &impl AsColumnFamilyRef,
324 key: K,
325 readopts: &ReadOptions,
326 ) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error> {
327 delegate_call!(self.get_pinned_cf_opt(cf, key, readopts))
328 }
329
330 pub fn cf_handle(&self, name: &str) -> Option<Arc<rocksdb::BoundColumnFamily<'_>>> {
331 delegate_call!(self.cf_handle(name))
332 }
333
334 pub fn create_cf<N: AsRef<str>>(
335 &self,
336 name: N,
337 opts: &rocksdb::Options,
338 ) -> Result<(), rocksdb::Error> {
339 delegate_call!(self.create_cf(name, opts))
340 }
341
342 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
343 delegate_call!(self.drop_cf(name))
344 }
345
346 pub fn delete_cf<K: AsRef<[u8]>>(
347 &self,
348 cf: &impl AsColumnFamilyRef,
349 key: K,
350 writeopts: &WriteOptions,
351 ) -> Result<(), rocksdb::Error> {
352 fail_point!("delete-cf-before");
353 let ret = delegate_call!(self.delete_cf_opt(cf, key, writeopts));
354 fail_point!("delete-cf-after");
355 #[expect(clippy::let_and_return)]
356 ret
357 }
358
359 pub fn path(&self) -> &Path {
360 delegate_call!(self.path())
361 }
362
363 pub fn put_cf<K, V>(
364 &self,
365 cf: &impl AsColumnFamilyRef,
366 key: K,
367 value: V,
368 writeopts: &WriteOptions,
369 ) -> Result<(), rocksdb::Error>
370 where
371 K: AsRef<[u8]>,
372 V: AsRef<[u8]>,
373 {
374 fail_point!("put-cf-before");
375 let ret = delegate_call!(self.put_cf_opt(cf, key, value, writeopts));
376 fail_point!("put-cf-after");
377 #[expect(clippy::let_and_return)]
378 ret
379 }
380
381 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
382 &self,
383 cf: &impl AsColumnFamilyRef,
384 key: K,
385 readopts: &ReadOptions,
386 ) -> bool {
387 delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts))
388 }
389
390 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
391 delegate_call!(self.try_catch_up_with_primary())
392 }
393
394 pub fn write(
395 &self,
396 batch: RocksDBBatch,
397 writeopts: &WriteOptions,
398 ) -> Result<(), TypedStoreError> {
399 fail_point!("batch-write-before");
400 let ret = match (self, batch) {
401 (RocksDB::DBWithThreadMode(db), RocksDBBatch::Regular(batch)) => {
402 db.underlying
403 .write_opt(batch, writeopts)
404 .map_err(typed_store_err_from_rocks_err)?;
405 Ok(())
406 }
407 (RocksDB::OptimisticTransactionDB(db), RocksDBBatch::Transactional(batch)) => {
408 db.underlying
409 .write_opt(batch, writeopts)
410 .map_err(typed_store_err_from_rocks_err)?;
411 Ok(())
412 }
413 _ => Err(TypedStoreError::RocksDB(
414 "using invalid batch type for the database".to_string(),
415 )),
416 };
417 fail_point!("batch-write-after");
418 #[expect(clippy::let_and_return)]
419 ret
420 }
421
422 pub fn transaction_without_snapshot(
423 &self,
424 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
425 match self {
426 Self::OptimisticTransactionDB(db) => Ok(db.underlying.transaction()),
427 Self::DBWithThreadMode(_) => panic!(),
428 }
429 }
430
431 pub fn transaction(
432 &self,
433 ) -> Result<Transaction<'_, rocksdb::OptimisticTransactionDB>, TypedStoreError> {
434 match self {
435 Self::OptimisticTransactionDB(db) => {
436 let mut tx_opts = OptimisticTransactionOptions::new();
437 tx_opts.set_snapshot(true);
438
439 Ok(db
440 .underlying
441 .transaction_opt(&WriteOptions::default(), &tx_opts))
442 }
443 Self::DBWithThreadMode(_) => panic!(),
444 }
445 }
446
447 pub fn raw_iterator_cf<'a: 'b, 'b>(
448 &'a self,
449 cf_handle: &impl AsColumnFamilyRef,
450 readopts: ReadOptions,
451 ) -> RocksDBRawIter<'b> {
452 match self {
453 Self::DBWithThreadMode(db) => {
454 RocksDBRawIter::DB(db.underlying.raw_iterator_cf_opt(cf_handle, readopts))
455 }
456 Self::OptimisticTransactionDB(db) => RocksDBRawIter::OptimisticTransactionDB(
457 db.underlying.raw_iterator_cf_opt(cf_handle, readopts),
458 ),
459 }
460 }
461
462 pub fn iterator_cf<'a: 'b, 'b>(
463 &'a self,
464 cf_handle: &impl AsColumnFamilyRef,
465 readopts: ReadOptions,
466 mode: IteratorMode<'_>,
467 ) -> RocksDBIter<'b> {
468 match self {
469 Self::DBWithThreadMode(db) => {
470 RocksDBIter::DB(db.underlying.iterator_cf_opt(cf_handle, readopts, mode))
471 }
472 Self::OptimisticTransactionDB(db) => RocksDBIter::OptimisticTransactionDB(
473 db.underlying.iterator_cf_opt(cf_handle, readopts, mode),
474 ),
475 }
476 }
477
478 pub fn compact_range_cf<K: AsRef<[u8]>>(
479 &self,
480 cf: &impl AsColumnFamilyRef,
481 start: Option<K>,
482 end: Option<K>,
483 ) {
484 delegate_call!(self.compact_range_cf(cf, start, end))
485 }
486
487 pub fn compact_range_to_bottom<K: AsRef<[u8]>>(
488 &self,
489 cf: &impl AsColumnFamilyRef,
490 start: Option<K>,
491 end: Option<K>,
492 ) {
493 let opt = &mut CompactOptions::default();
494 opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized);
495 delegate_call!(self.compact_range_cf_opt(cf, start, end, opt))
496 }
497
498 pub fn flush(&self) -> Result<(), TypedStoreError> {
499 delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDB(e.into_string()))
500 }
501
502 pub fn snapshot(&self) -> RocksDBSnapshot<'_> {
503 match self {
504 Self::DBWithThreadMode(d) => RocksDBSnapshot::DBWithThreadMode(d.underlying.snapshot()),
505 Self::OptimisticTransactionDB(d) => {
506 RocksDBSnapshot::OptimisticTransactionDB(d.underlying.snapshot())
507 }
508 }
509 }
510
511 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
512 let checkpoint = match self {
513 Self::DBWithThreadMode(d) => {
514 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
515 }
516 Self::OptimisticTransactionDB(d) => {
517 Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err)?
518 }
519 };
520 checkpoint
521 .create_checkpoint(path)
522 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))?;
523 Ok(())
524 }
525
526 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> {
527 delegate_call!(self.flush_cf(cf))
528 }
529
530 pub fn set_options_cf(
531 &self,
532 cf: &impl AsColumnFamilyRef,
533 opts: &[(&str, &str)],
534 ) -> Result<(), rocksdb::Error> {
535 delegate_call!(self.set_options_cf(cf, opts))
536 }
537
538 pub fn get_sampling_interval(&self) -> SamplingInterval {
539 match self {
540 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
541 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
542 }
543 }
544
545 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
546 match self {
547 Self::DBWithThreadMode(d) => d.metric_conf.read_sample_interval.new_from_self(),
548 Self::OptimisticTransactionDB(d) => d.metric_conf.read_sample_interval.new_from_self(),
549 }
550 }
551
552 pub fn write_sampling_interval(&self) -> SamplingInterval {
553 match self {
554 Self::DBWithThreadMode(d) => d.metric_conf.write_sample_interval.new_from_self(),
555 Self::OptimisticTransactionDB(d) => d.metric_conf.write_sample_interval.new_from_self(),
556 }
557 }
558
559 pub fn iter_sampling_interval(&self) -> SamplingInterval {
560 match self {
561 Self::DBWithThreadMode(d) => d.metric_conf.iter_sample_interval.new_from_self(),
562 Self::OptimisticTransactionDB(d) => d.metric_conf.iter_sample_interval.new_from_self(),
563 }
564 }
565
566 pub fn db_name(&self) -> String {
567 let name = match self {
568 Self::DBWithThreadMode(d) => &d.metric_conf.db_name,
569 Self::OptimisticTransactionDB(d) => &d.metric_conf.db_name,
570 };
571 if name.is_empty() {
572 self.default_db_name()
573 } else {
574 name.clone()
575 }
576 }
577
578 fn default_db_name(&self) -> String {
579 self.path()
580 .file_name()
581 .and_then(|f| f.to_str())
582 .unwrap_or("unknown")
583 .to_string()
584 }
585
586 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
587 delegate_call!(self.live_files())
588 }
589}
590
591pub enum RocksDBSnapshot<'a> {
592 DBWithThreadMode(rocksdb::Snapshot<'a>),
593 OptimisticTransactionDB(SnapshotWithThreadMode<'a, OptimisticTransactionDB>),
594}
595
596impl<'a> RocksDBSnapshot<'a> {
597 pub fn multi_get_cf_opt<'b: 'a, K, I, W>(
598 &'a self,
599 keys: I,
600 readopts: ReadOptions,
601 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
602 where
603 K: AsRef<[u8]>,
604 I: IntoIterator<Item = (&'b W, K)>,
605 W: 'b + AsColumnFamilyRef,
606 {
607 match self {
608 Self::DBWithThreadMode(s) => s.multi_get_cf_opt(keys, readopts),
609 Self::OptimisticTransactionDB(s) => s.multi_get_cf_opt(keys, readopts),
610 }
611 }
612 pub fn multi_get_cf<'b: 'a, K, I, W>(
613 &'a self,
614 keys: I,
615 ) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
616 where
617 K: AsRef<[u8]>,
618 I: IntoIterator<Item = (&'b W, K)>,
619 W: 'b + AsColumnFamilyRef,
620 {
621 match self {
622 Self::DBWithThreadMode(s) => s.multi_get_cf(keys),
623 Self::OptimisticTransactionDB(s) => s.multi_get_cf(keys),
624 }
625 }
626}
627
628pub enum RocksDBBatch {
629 Regular(rocksdb::WriteBatch),
630 Transactional(rocksdb::WriteBatchWithTransaction<true>),
631}
632
633macro_rules! delegate_batch_call {
634 ($self:ident.$method:ident($($args:ident),*)) => {
635 match $self {
636 Self::Regular(b) => b.$method($($args),*),
637 Self::Transactional(b) => b.$method($($args),*),
638 }
639 }
640}
641
642impl RocksDBBatch {
643 fn size_in_bytes(&self) -> usize {
644 delegate_batch_call!(self.size_in_bytes())
645 }
646
647 pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
648 delegate_batch_call!(self.delete_cf(cf, key))
649 }
650
651 pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
652 where
653 K: AsRef<[u8]>,
654 V: AsRef<[u8]>,
655 {
656 delegate_batch_call!(self.put_cf(cf, key, value))
657 }
658
659 pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
660 where
661 K: AsRef<[u8]>,
662 V: AsRef<[u8]>,
663 {
664 delegate_batch_call!(self.merge_cf(cf, key, value))
665 }
666
667 pub fn delete_range_cf<K: AsRef<[u8]>>(
668 &mut self,
669 cf: &impl AsColumnFamilyRef,
670 from: K,
671 to: K,
672 ) -> Result<(), TypedStoreError> {
673 match self {
674 Self::Regular(batch) => {
675 batch.delete_range_cf(cf, from, to);
676 Ok(())
677 }
678 Self::Transactional(_) => panic!(),
679 }
680 }
681}
682
683#[derive(Debug, Default)]
684pub struct MetricConf {
685 pub db_name: String,
686 pub read_sample_interval: SamplingInterval,
687 pub write_sample_interval: SamplingInterval,
688 pub iter_sample_interval: SamplingInterval,
689}
690
691impl MetricConf {
692 pub fn new(db_name: &str) -> Self {
693 if db_name.is_empty() {
694 error!("A meaningful db name should be used for metrics reporting.")
695 }
696 Self {
697 db_name: db_name.to_string(),
698 read_sample_interval: SamplingInterval::default(),
699 write_sample_interval: SamplingInterval::default(),
700 iter_sample_interval: SamplingInterval::default(),
701 }
702 }
703
704 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
705 Self {
706 db_name: self.db_name,
707 read_sample_interval: read_interval,
708 write_sample_interval: SamplingInterval::default(),
709 iter_sample_interval: SamplingInterval::default(),
710 }
711 }
712}
713const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
714const METRICS_ERROR: i64 = -1;
715
716#[derive(Clone, Debug)]
718pub struct DBMap<K, V> {
719 pub rocksdb: Arc<RocksDB>,
720 _phantom: PhantomData<fn(K) -> V>,
721 cf: String,
723 pub opts: ReadWriteOptions,
724 db_metrics: Arc<DBMetrics>,
725 get_sample_interval: SamplingInterval,
726 multiget_sample_interval: SamplingInterval,
727 write_sample_interval: SamplingInterval,
728 iter_sample_interval: SamplingInterval,
729 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
730}
731
732unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
733
734impl<K, V> DBMap<K, V> {
735 pub(crate) fn new(
736 db: Arc<RocksDB>,
737 opts: &ReadWriteOptions,
738 opt_cf: &str,
739 is_deprecated: bool,
740 ) -> Self {
741 let db_cloned = db.clone();
742 let db_metrics = DBMetrics::get();
743 let db_metrics_cloned = db_metrics.clone();
744 let cf = opt_cf.to_string();
745 let (sender, mut recv) = tokio::sync::oneshot::channel();
746 if !is_deprecated {
747 tokio::task::spawn(async move {
748 let mut interval =
749 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
750 loop {
751 tokio::select! {
752 _ = interval.tick() => {
753 let db = db_cloned.clone();
754 let cf = cf.clone();
755 let db_metrics = db_metrics.clone();
756 if let Err(e) = tokio::task::spawn_blocking(move || {
757 Self::report_metrics(&db, &cf, &db_metrics);
758 }).await {
759 error!("Failed to log metrics with error: {}", e);
760 }
761 }
762 _ = &mut recv => break,
763 }
764 }
765 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
766 });
767 }
768 DBMap {
769 rocksdb: db.clone(),
770 opts: opts.clone(),
771 _phantom: PhantomData,
772 cf: opt_cf.to_string(),
773 db_metrics: db_metrics_cloned,
774 _metrics_task_cancel_handle: Arc::new(sender),
775 get_sample_interval: db.get_sampling_interval(),
776 multiget_sample_interval: db.multiget_sampling_interval(),
777 write_sample_interval: db.write_sampling_interval(),
778 iter_sample_interval: db.iter_sampling_interval(),
779 }
780 }
781
782 #[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cf), err)]
789 pub fn open<P: AsRef<Path>>(
790 path: P,
791 metric_conf: MetricConf,
792 db_options: Option<rocksdb::Options>,
793 opt_cf: Option<&str>,
794 rw_options: &ReadWriteOptions,
795 ) -> Result<Self, TypedStoreError> {
796 let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
797 let cfs = vec![cf_key];
798 let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
799 Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
800 }
801
802 #[instrument(level = "debug", skip(db), err)]
842 pub fn reopen(
843 db: &Arc<RocksDB>,
844 opt_cf: Option<&str>,
845 rw_options: &ReadWriteOptions,
846 is_deprecated: bool,
847 ) -> Result<Self, TypedStoreError> {
848 let cf_key = opt_cf
849 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
850 .to_owned();
851
852 db.cf_handle(&cf_key)
853 .ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;
854
855 Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
856 }
857
858 pub fn batch(&self) -> DBBatch {
859 let batch = match *self.rocksdb {
860 RocksDB::DBWithThreadMode(_) => RocksDBBatch::Regular(WriteBatch::default()),
861 RocksDB::OptimisticTransactionDB(_) => {
862 RocksDBBatch::Transactional(WriteBatchWithTransaction::<true>::default())
863 }
864 };
865 DBBatch::new(
866 &self.rocksdb,
867 batch,
868 self.opts.writeopts(),
869 &self.db_metrics,
870 &self.write_sample_interval,
871 )
872 }
873
874 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
875 let from_buf = be_fix_int_ser(start)?;
876 let to_buf = be_fix_int_ser(end)?;
877 self.rocksdb
878 .compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
879 Ok(())
880 }
881
882 pub fn compact_range_raw(
883 &self,
884 cf_name: &str,
885 start: Vec<u8>,
886 end: Vec<u8>,
887 ) -> Result<(), TypedStoreError> {
888 let cf = self
889 .rocksdb
890 .cf_handle(cf_name)
891 .expect("compact range: column family does not exist");
892 self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
893 Ok(())
894 }
895
896 pub fn compact_range_to_bottom<J: Serialize>(
897 &self,
898 start: &J,
899 end: &J,
900 ) -> Result<(), TypedStoreError> {
901 let from_buf = be_fix_int_ser(start)?;
902 let to_buf = be_fix_int_ser(end)?;
903 self.rocksdb
904 .compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
905 Ok(())
906 }
907
908 pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
909 self.rocksdb
910 .cf_handle(&self.cf)
911 .expect("Map-keying column family should have been checked at DB creation")
912 }
913
914 pub fn iterator_cf(&self) -> RocksDBIter<'_> {
915 self.rocksdb
916 .iterator_cf(&self.cf(), self.opts.readopts(), IteratorMode::Start)
917 }
918
919 pub fn flush(&self) -> Result<(), TypedStoreError> {
920 self.rocksdb
921 .flush_cf(&self.cf())
922 .map_err(|e| TypedStoreError::RocksDB(e.into_string()))
923 }
924
925 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
926 self.rocksdb.set_options_cf(&self.cf(), opts)
927 }
928
929 fn get_int_property(
930 rocksdb: &RocksDB,
931 cf: &impl AsColumnFamilyRef,
932 property_name: &'static std::ffi::CStr,
933 ) -> Result<i64, TypedStoreError> {
934 match rocksdb.property_int_value_cf(cf, property_name) {
935 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
936 Ok(None) => Ok(0),
937 Err(e) => Err(TypedStoreError::RocksDB(e.into_string())),
938 }
939 }
940
941 fn multi_get_pinned<J>(
943 &self,
944 keys: impl IntoIterator<Item = J>,
945 ) -> Result<Vec<Option<DBPinnableSlice<'_>>>, TypedStoreError>
946 where
947 J: Borrow<K>,
948 K: Serialize,
949 {
950 let _timer = self
951 .db_metrics
952 .op_metrics
953 .rocksdb_multiget_latency_seconds
954 .with_label_values(&[&self.cf])
955 .start_timer();
956 let perf_ctx = if self.multiget_sample_interval.sample() {
957 Some(RocksDBPerfContext)
958 } else {
959 None
960 };
961 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
962 .into_iter()
963 .map(|k| be_fix_int_ser(k.borrow()))
964 .collect();
965 let results: Result<Vec<_>, TypedStoreError> = self
966 .rocksdb
967 .batched_multi_get_cf_opt(
968 &self.cf(),
969 keys_bytes?,
970 false,
972 &self.opts.readopts(),
973 )
974 .into_iter()
975 .map(|r| r.map_err(|e| TypedStoreError::RocksDB(e.into_string())))
976 .collect();
977 let entries = results?;
978 let entry_size = entries
979 .iter()
980 .flatten()
981 .map(|entry| entry.len())
982 .sum::<usize>();
983 self.db_metrics
984 .op_metrics
985 .rocksdb_multiget_bytes
986 .with_label_values(&[&self.cf])
987 .observe(entry_size as f64);
988 if perf_ctx.is_some() {
989 self.db_metrics
990 .read_perf_ctx_metrics
991 .report_metrics(&self.cf);
992 }
993 Ok(entries)
994 }
995
996 fn report_metrics(rocksdb: &Arc<RocksDB>, cf_name: &str, db_metrics: &Arc<DBMetrics>) {
997 let cf = rocksdb.cf_handle(cf_name).expect("Failed to get cf");
998 db_metrics
999 .cf_metrics
1000 .rocksdb_total_sst_files_size
1001 .with_label_values(&[cf_name])
1002 .set(
1003 Self::get_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
1004 .unwrap_or(METRICS_ERROR),
1005 );
1006 db_metrics
1007 .cf_metrics
1008 .rocksdb_total_blob_files_size
1009 .with_label_values(&[cf_name])
1010 .set(
1011 Self::get_int_property(rocksdb, &cf, ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE)
1012 .unwrap_or(METRICS_ERROR),
1013 );
1014 db_metrics
1015 .cf_metrics
1016 .rocksdb_current_size_active_mem_tables
1017 .with_label_values(&[cf_name])
1018 .set(
1019 Self::get_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
1020 .unwrap_or(METRICS_ERROR),
1021 );
1022 db_metrics
1023 .cf_metrics
1024 .rocksdb_size_all_mem_tables
1025 .with_label_values(&[cf_name])
1026 .set(
1027 Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
1028 .unwrap_or(METRICS_ERROR),
1029 );
1030 db_metrics
1031 .cf_metrics
1032 .rocksdb_num_snapshots
1033 .with_label_values(&[cf_name])
1034 .set(
1035 Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
1036 .unwrap_or(METRICS_ERROR),
1037 );
1038 db_metrics
1039 .cf_metrics
1040 .rocksdb_oldest_snapshot_time
1041 .with_label_values(&[cf_name])
1042 .set(
1043 Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
1044 .unwrap_or(METRICS_ERROR),
1045 );
1046 db_metrics
1047 .cf_metrics
1048 .rocksdb_actual_delayed_write_rate
1049 .with_label_values(&[cf_name])
1050 .set(
1051 Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
1052 .unwrap_or(METRICS_ERROR),
1053 );
1054 db_metrics
1055 .cf_metrics
1056 .rocksdb_is_write_stopped
1057 .with_label_values(&[cf_name])
1058 .set(
1059 Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
1060 .unwrap_or(METRICS_ERROR),
1061 );
1062 db_metrics
1063 .cf_metrics
1064 .rocksdb_block_cache_capacity
1065 .with_label_values(&[cf_name])
1066 .set(
1067 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
1068 .unwrap_or(METRICS_ERROR),
1069 );
1070 db_metrics
1071 .cf_metrics
1072 .rocksdb_block_cache_usage
1073 .with_label_values(&[cf_name])
1074 .set(
1075 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
1076 .unwrap_or(METRICS_ERROR),
1077 );
1078 db_metrics
1079 .cf_metrics
1080 .rocksdb_block_cache_pinned_usage
1081 .with_label_values(&[cf_name])
1082 .set(
1083 Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
1084 .unwrap_or(METRICS_ERROR),
1085 );
1086 db_metrics
1087 .cf_metrics
1088 .rocksdb_estimate_table_readers_mem
1089 .with_label_values(&[cf_name])
1090 .set(
1091 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM)
1092 .unwrap_or(METRICS_ERROR),
1093 );
1094 db_metrics
1095 .cf_metrics
1096 .rocksdb_estimated_num_keys
1097 .with_label_values(&[cf_name])
1098 .set(
1099 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1100 .unwrap_or(METRICS_ERROR),
1101 );
1102 db_metrics
1103 .cf_metrics
1104 .rocksdb_num_immutable_mem_tables
1105 .with_label_values(&[cf_name])
1106 .set(
1107 Self::get_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1108 .unwrap_or(METRICS_ERROR),
1109 );
1110 db_metrics
1111 .cf_metrics
1112 .rocksdb_mem_table_flush_pending
1113 .with_label_values(&[cf_name])
1114 .set(
1115 Self::get_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1116 .unwrap_or(METRICS_ERROR),
1117 );
1118 db_metrics
1119 .cf_metrics
1120 .rocksdb_compaction_pending
1121 .with_label_values(&[cf_name])
1122 .set(
1123 Self::get_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1124 .unwrap_or(METRICS_ERROR),
1125 );
1126 db_metrics
1127 .cf_metrics
1128 .rocksdb_estimate_pending_compaction_bytes
1129 .with_label_values(&[cf_name])
1130 .set(
1131 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_PENDING_COMPACTION_BYTES)
1132 .unwrap_or(METRICS_ERROR),
1133 );
1134 db_metrics
1135 .cf_metrics
1136 .rocksdb_num_running_compactions
1137 .with_label_values(&[cf_name])
1138 .set(
1139 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1140 .unwrap_or(METRICS_ERROR),
1141 );
1142 db_metrics
1143 .cf_metrics
1144 .rocksdb_num_running_flushes
1145 .with_label_values(&[cf_name])
1146 .set(
1147 Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1148 .unwrap_or(METRICS_ERROR),
1149 );
1150 db_metrics
1151 .cf_metrics
1152 .rocksdb_estimate_oldest_key_time
1153 .with_label_values(&[cf_name])
1154 .set(
1155 Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1156 .unwrap_or(METRICS_ERROR),
1157 );
1158 db_metrics
1159 .cf_metrics
1160 .rocksdb_background_errors
1161 .with_label_values(&[cf_name])
1162 .set(
1163 Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1164 .unwrap_or(METRICS_ERROR),
1165 );
1166 db_metrics
1167 .cf_metrics
1168 .rocksdb_base_level
1169 .with_label_values(&[cf_name])
1170 .set(
1171 Self::get_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1172 .unwrap_or(METRICS_ERROR),
1173 );
1174 }
1175
1176 pub fn transaction(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1177 DBTransaction::new(&self.rocksdb)
1178 }
1179
1180 pub fn transaction_without_snapshot(&self) -> Result<DBTransaction<'_>, TypedStoreError> {
1181 DBTransaction::new_without_snapshot(&self.rocksdb)
1182 }
1183
1184 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1185 self.rocksdb.checkpoint(path)
1186 }
1187
1188 pub fn snapshot(&self) -> Result<RocksDBSnapshot<'_>, TypedStoreError> {
1189 Ok(self.rocksdb.snapshot())
1190 }
1191
1192 pub fn table_summary(&self) -> eyre::Result<TableSummary> {
1193 let mut num_keys = 0;
1194 let mut key_bytes_total = 0;
1195 let mut value_bytes_total = 0;
1196 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1197 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1198 let iter = self.iterator_cf().map(Result::unwrap);
1199 for (key, value) in iter {
1200 num_keys += 1;
1201 key_bytes_total += key.len();
1202 value_bytes_total += value.len();
1203 key_hist.record(key.len() as u64)?;
1204 value_hist.record(value.len() as u64)?;
1205 }
1206 Ok(TableSummary {
1207 num_keys,
1208 key_bytes_total,
1209 value_bytes_total,
1210 key_hist,
1211 value_hist,
1212 })
1213 }
1214
1215 fn create_iter_context(
1217 &self,
1218 ) -> (
1219 Option<HistogramTimer>,
1220 Option<Histogram>,
1221 Option<Histogram>,
1222 Option<RocksDBPerfContext>,
1223 ) {
1224 let timer = self
1225 .db_metrics
1226 .op_metrics
1227 .rocksdb_iter_latency_seconds
1228 .with_label_values(&[&self.cf])
1229 .start_timer();
1230 let bytes_scanned = self
1231 .db_metrics
1232 .op_metrics
1233 .rocksdb_iter_bytes
1234 .with_label_values(&[&self.cf]);
1235 let keys_scanned = self
1236 .db_metrics
1237 .op_metrics
1238 .rocksdb_iter_keys
1239 .with_label_values(&[&self.cf]);
1240 let perf_ctx = if self.iter_sample_interval.sample() {
1241 Some(RocksDBPerfContext)
1242 } else {
1243 None
1244 };
1245 (
1246 Some(timer),
1247 Some(bytes_scanned),
1248 Some(keys_scanned),
1249 perf_ctx,
1250 )
1251 }
1252
1253 fn create_read_options_with_bounds(
1256 &self,
1257 lower_bound: Option<K>,
1258 upper_bound: Option<K>,
1259 ) -> ReadOptions
1260 where
1261 K: Serialize,
1262 {
1263 let mut readopts = self.opts.readopts();
1264 if let Some(lower_bound) = lower_bound {
1265 let key_buf = be_fix_int_ser(&lower_bound).unwrap();
1266 readopts.set_iterate_lower_bound(key_buf);
1267 }
1268 if let Some(upper_bound) = upper_bound {
1269 let key_buf = be_fix_int_ser(&upper_bound).unwrap();
1270 readopts.set_iterate_upper_bound(key_buf);
1271 }
1272 readopts
1273 }
1274
1275 fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
1278 where
1279 K: Serialize,
1280 {
1281 let mut readopts = self.opts.readopts();
1282
1283 let lower_bound = range.start_bound();
1284 let upper_bound = range.end_bound();
1285
1286 match lower_bound {
1287 Bound::Included(lower_bound) => {
1288 let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1290 readopts.set_iterate_lower_bound(key_buf);
1291 }
1292 Bound::Excluded(lower_bound) => {
1293 let mut key_buf =
1294 be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
1295
1296 big_endian_saturating_add_one(&mut key_buf);
1298 readopts.set_iterate_lower_bound(key_buf);
1299 }
1300 Bound::Unbounded => (),
1301 };
1302
1303 match upper_bound {
1304 Bound::Included(upper_bound) => {
1305 let mut key_buf =
1306 be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1307
1308 if !is_max(&key_buf) {
1311 big_endian_saturating_add_one(&mut key_buf);
1313 readopts.set_iterate_upper_bound(key_buf);
1314 }
1315 }
1316 Bound::Excluded(upper_bound) => {
1317 let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
1319 readopts.set_iterate_upper_bound(key_buf);
1320 }
1321 Bound::Unbounded => (),
1322 };
1323
1324 readopts
1325 }
1326}
1327
1328pub struct DBBatch {
1395 rocksdb: Arc<RocksDB>,
1396 batch: RocksDBBatch,
1397 opts: WriteOptions,
1398 db_metrics: Arc<DBMetrics>,
1399 write_sample_interval: SamplingInterval,
1400}
1401
1402impl DBBatch {
1403 pub fn new(
1407 dbref: &Arc<RocksDB>,
1408 batch: RocksDBBatch,
1409 opts: WriteOptions,
1410 db_metrics: &Arc<DBMetrics>,
1411 write_sample_interval: &SamplingInterval,
1412 ) -> Self {
1413 DBBatch {
1414 rocksdb: dbref.clone(),
1415 batch,
1416 opts,
1417 db_metrics: db_metrics.clone(),
1418 write_sample_interval: write_sample_interval.clone(),
1419 }
1420 }
1421
1422 #[instrument(level = "trace", skip_all, err)]
1424 pub fn write(self) -> Result<(), TypedStoreError> {
1425 let db_name = self.rocksdb.db_name();
1426 let timer = self
1427 .db_metrics
1428 .op_metrics
1429 .rocksdb_batch_commit_latency_seconds
1430 .with_label_values(&[&db_name])
1431 .start_timer();
1432 let batch_size = self.batch.size_in_bytes();
1433
1434 let perf_ctx = if self.write_sample_interval.sample() {
1435 Some(RocksDBPerfContext)
1436 } else {
1437 None
1438 };
1439 self.rocksdb.write(self.batch, &self.opts)?;
1440 self.db_metrics
1441 .op_metrics
1442 .rocksdb_batch_commit_bytes
1443 .with_label_values(&[&db_name])
1444 .observe(batch_size as f64);
1445
1446 if perf_ctx.is_some() {
1447 self.db_metrics
1448 .write_perf_ctx_metrics
1449 .report_metrics(&db_name);
1450 }
1451 let elapsed = timer.stop_and_record();
1452 if elapsed > 1.0 {
1453 warn!(?elapsed, ?db_name, "very slow batch write");
1454 self.db_metrics
1455 .op_metrics
1456 .rocksdb_very_slow_batch_writes_count
1457 .with_label_values(&[&db_name])
1458 .inc();
1459 self.db_metrics
1460 .op_metrics
1461 .rocksdb_very_slow_batch_writes_duration_ms
1462 .with_label_values(&[&db_name])
1463 .inc_by((elapsed * 1000.0) as u64);
1464 }
1465 Ok(())
1466 }
1467
1468 pub fn size_in_bytes(&self) -> usize {
1469 self.batch.size_in_bytes()
1470 }
1471}
1472
1473impl DBBatch {
1475 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1476 &mut self,
1477 db: &DBMap<K, V>,
1478 purged_vals: impl IntoIterator<Item = J>,
1479 ) -> Result<(), TypedStoreError> {
1480 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1481 return Err(TypedStoreError::CrossDBBatch);
1482 }
1483
1484 purged_vals
1485 .into_iter()
1486 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1487 let k_buf = be_fix_int_ser(k.borrow())?;
1488 self.batch.delete_cf(&db.cf(), k_buf);
1489
1490 Ok(())
1491 })?;
1492 Ok(())
1493 }
1494
1495 pub fn schedule_delete_range<K: Serialize, V>(
1505 &mut self,
1506 db: &DBMap<K, V>,
1507 from: &K,
1508 to: &K,
1509 ) -> Result<(), TypedStoreError> {
1510 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1511 return Err(TypedStoreError::CrossDBBatch);
1512 }
1513
1514 let from_buf = be_fix_int_ser(from)?;
1515 let to_buf = be_fix_int_ser(to)?;
1516
1517 self.batch.delete_range_cf(&db.cf(), from_buf, to_buf)?;
1518 Ok(())
1519 }
1520
1521 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1523 &mut self,
1524 db: &DBMap<K, V>,
1525 new_vals: impl IntoIterator<Item = (J, U)>,
1526 ) -> Result<&mut Self, TypedStoreError> {
1527 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1528 return Err(TypedStoreError::CrossDBBatch);
1529 }
1530 let mut total = 0usize;
1531 new_vals
1532 .into_iter()
1533 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1534 let k_buf = be_fix_int_ser(k.borrow())?;
1535 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1536 total += k_buf.len() + v_buf.len();
1537 self.batch.put_cf(&db.cf(), k_buf, v_buf);
1538 Ok(())
1539 })?;
1540 self.db_metrics
1541 .op_metrics
1542 .rocksdb_batch_put_bytes
1543 .with_label_values(&[&db.cf])
1544 .observe(total as f64);
1545 Ok(self)
1546 }
1547
1548 pub fn merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1550 &mut self,
1551 db: &DBMap<K, V>,
1552 new_vals: impl IntoIterator<Item = (J, U)>,
1553 ) -> Result<&mut Self, TypedStoreError> {
1554 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1555 return Err(TypedStoreError::CrossDBBatch);
1556 }
1557
1558 new_vals
1559 .into_iter()
1560 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1561 let k_buf = be_fix_int_ser(k.borrow())?;
1562 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1563 self.batch.merge_cf(&db.cf(), k_buf, v_buf);
1564 Ok(())
1565 })?;
1566 Ok(self)
1567 }
1568
1569 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, V: Serialize, B: AsRef<[u8]>>(
1571 &mut self,
1572 db: &DBMap<K, V>,
1573 new_vals: impl IntoIterator<Item = (J, B)>,
1574 ) -> Result<&mut Self, TypedStoreError> {
1575 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1576 return Err(TypedStoreError::CrossDBBatch);
1577 }
1578 new_vals
1579 .into_iter()
1580 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1581 let k_buf = be_fix_int_ser(k.borrow())?;
1582 self.batch.merge_cf(&db.cf(), k_buf, v);
1583 Ok(())
1584 })?;
1585 Ok(self)
1586 }
1587}
1588
1589pub struct DBTransaction<'a> {
1590 rocksdb: Arc<RocksDB>,
1591 transaction: Transaction<'a, rocksdb::OptimisticTransactionDB>,
1592}
1593
1594impl<'a> DBTransaction<'a> {
1595 pub fn new(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1596 Ok(Self {
1597 rocksdb: db.clone(),
1598 transaction: db.transaction()?,
1599 })
1600 }
1601
1602 pub fn new_without_snapshot(db: &'a Arc<RocksDB>) -> Result<Self, TypedStoreError> {
1603 Ok(Self {
1604 rocksdb: db.clone(),
1605 transaction: db.transaction_without_snapshot()?,
1606 })
1607 }
1608
1609 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1610 &mut self,
1611 db: &DBMap<K, V>,
1612 new_vals: impl IntoIterator<Item = (J, U)>,
1613 ) -> Result<&mut Self, TypedStoreError> {
1614 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1615 return Err(TypedStoreError::CrossDBBatch);
1616 }
1617
1618 new_vals
1619 .into_iter()
1620 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1621 let k_buf = be_fix_int_ser(k.borrow())?;
1622 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1623 self.transaction
1624 .put_cf(&db.cf(), k_buf, v_buf)
1625 .map_err(typed_store_err_from_rocks_err)?;
1626 Ok(())
1627 })?;
1628 Ok(self)
1629 }
1630
1631 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1633 &mut self,
1634 db: &DBMap<K, V>,
1635 purged_vals: impl IntoIterator<Item = J>,
1636 ) -> Result<&mut Self, TypedStoreError> {
1637 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1638 return Err(TypedStoreError::CrossDBBatch);
1639 }
1640 purged_vals
1641 .into_iter()
1642 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1643 let k_buf = be_fix_int_ser(k.borrow())?;
1644 self.transaction
1645 .delete_cf(&db.cf(), k_buf)
1646 .map_err(typed_store_err_from_rocks_err)?;
1647 Ok(())
1648 })?;
1649 Ok(self)
1650 }
1651
1652 pub fn snapshot(
1653 &self,
1654 ) -> rocksdb::SnapshotWithThreadMode<'_, Transaction<'a, rocksdb::OptimisticTransactionDB>>
1655 {
1656 self.transaction.snapshot()
1657 }
1658
1659 pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
1660 &self,
1661 db: &DBMap<K, V>,
1662 key: &K,
1663 ) -> Result<Option<V>, TypedStoreError> {
1664 if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
1665 return Err(TypedStoreError::CrossDBBatch);
1666 }
1667 let k_buf = be_fix_int_ser(key)?;
1668 match self
1669 .transaction
1670 .get_for_update_cf_opt(&db.cf(), k_buf, true, &db.opts.readopts())
1671 .map_err(typed_store_err_from_rocks_err)?
1672 {
1673 Some(data) => Ok(Some(
1674 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1675 )),
1676 None => Ok(None),
1677 }
1678 }
1679
1680 pub fn get<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
1681 &self,
1682 db: &DBMap<K, V>,
1683 key: &K,
1684 ) -> Result<Option<V>, TypedStoreError> {
1685 let key_buf = be_fix_int_ser(key)?;
1686 self.transaction
1687 .get_cf_opt(&db.cf(), key_buf, &db.opts.readopts())
1688 .map_err(|e| TypedStoreError::RocksDB(e.to_string()))
1689 .map(|res| res.and_then(|bytes| bcs::from_bytes::<V>(&bytes).ok()))
1690 }
1691
1692 pub fn multi_get<J: Borrow<K>, K: Serialize + DeserializeOwned, V: DeserializeOwned>(
1693 &self,
1694 db: &DBMap<K, V>,
1695 keys: impl IntoIterator<Item = J>,
1696 ) -> Result<Vec<Option<V>>, TypedStoreError> {
1697 let cf = db.cf();
1698 let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
1699 .into_iter()
1700 .map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
1701 .collect();
1702
1703 let results = self
1704 .transaction
1705 .multi_get_cf_opt(keys_bytes?, &db.opts.readopts());
1706
1707 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1708 .into_iter()
1709 .map(
1710 |value_byte| match value_byte.map_err(typed_store_err_from_rocks_err)? {
1711 Some(data) => Ok(Some(
1712 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1713 )),
1714 None => Ok(None),
1715 },
1716 )
1717 .collect();
1718
1719 values_parsed
1720 }
1721
1722 pub fn iter<K: DeserializeOwned, V: DeserializeOwned>(
1723 &'a self,
1724 db: &DBMap<K, V>,
1725 ) -> Iter<'a, K, V> {
1726 let db_iter = self
1727 .transaction
1728 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts());
1729 Iter::new(
1730 db.cf.clone(),
1731 RocksDBRawIter::OptimisticTransaction(db_iter),
1732 None,
1733 None,
1734 None,
1735 None,
1736 None,
1737 )
1738 }
1739
1740 pub fn keys<K: DeserializeOwned, V: DeserializeOwned>(
1741 &'a self,
1742 db: &DBMap<K, V>,
1743 ) -> Keys<'a, K> {
1744 let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1745 self.transaction
1746 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1747 );
1748 db_iter.seek_to_first();
1749
1750 Keys::new(db_iter)
1751 }
1752
1753 pub fn values<K: DeserializeOwned, V: DeserializeOwned>(
1754 &'a self,
1755 db: &DBMap<K, V>,
1756 ) -> Values<'a, V> {
1757 let mut db_iter = RocksDBRawIter::OptimisticTransaction(
1758 self.transaction
1759 .raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
1760 );
1761 db_iter.seek_to_first();
1762
1763 Values::new(db_iter)
1764 }
1765
1766 pub fn commit(self) -> Result<(), TypedStoreError> {
1767 fail_point!("transaction-commit");
1768 self.transaction.commit().map_err(|e| match e.kind() {
1769 ErrorKind::Busy | ErrorKind::TryAgain => TypedStoreError::RetryableTransaction,
1772 _ => typed_store_err_from_rocks_err(e),
1773 })?;
1774 Ok(())
1775 }
1776}
1777
1778macro_rules! delegate_iter_call {
1779 ($self:ident.$method:ident($($args:ident),*)) => {
1780 match $self {
1781 Self::DB(db) => db.$method($($args),*),
1782 Self::OptimisticTransactionDB(db) => db.$method($($args),*),
1783 Self::OptimisticTransaction(db) => db.$method($($args),*),
1784 }
1785 }
1786}
1787
1788pub enum RocksDBRawIter<'a> {
1789 DB(rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1790 OptimisticTransactionDB(
1791 rocksdb::DBRawIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1792 ),
1793 OptimisticTransaction(
1794 rocksdb::DBRawIteratorWithThreadMode<
1795 'a,
1796 Transaction<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1797 >,
1798 ),
1799}
1800
1801impl RocksDBRawIter<'_> {
1802 pub fn valid(&self) -> bool {
1803 delegate_iter_call!(self.valid())
1804 }
1805 pub fn key(&self) -> Option<&[u8]> {
1806 delegate_iter_call!(self.key())
1807 }
1808 pub fn value(&self) -> Option<&[u8]> {
1809 delegate_iter_call!(self.value())
1810 }
1811 pub fn next(&mut self) {
1812 delegate_iter_call!(self.next())
1813 }
1814 pub fn prev(&mut self) {
1815 delegate_iter_call!(self.prev())
1816 }
1817 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
1818 delegate_iter_call!(self.seek(key))
1819 }
1820 pub fn seek_to_last(&mut self) {
1821 delegate_iter_call!(self.seek_to_last())
1822 }
1823 pub fn seek_to_first(&mut self) {
1824 delegate_iter_call!(self.seek_to_first())
1825 }
1826 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
1827 delegate_iter_call!(self.seek_for_prev(key))
1828 }
1829 pub fn status(&self) -> Result<(), rocksdb::Error> {
1830 delegate_iter_call!(self.status())
1831 }
1832}
1833
1834pub enum RocksDBIter<'a> {
1835 DB(rocksdb::DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>),
1836 OptimisticTransactionDB(
1837 rocksdb::DBIteratorWithThreadMode<'a, rocksdb::OptimisticTransactionDB<MultiThreaded>>,
1838 ),
1839}
1840
1841impl Iterator for RocksDBIter<'_> {
1842 type Item = Result<(Box<[u8]>, Box<[u8]>), Error>;
1843 fn next(&mut self) -> Option<Self::Item> {
1844 match self {
1845 Self::DB(db) => db.next(),
1846 Self::OptimisticTransactionDB(db) => db.next(),
1847 }
1848 }
1849}
1850
1851impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1852where
1853 K: Serialize + DeserializeOwned,
1854 V: Serialize + DeserializeOwned,
1855{
1856 type Error = TypedStoreError;
1857 type Iterator = Iter<'a, K, V>;
1858 type SafeIterator = SafeIter<'a, K, V>;
1859 type Keys = Keys<'a, K>;
1860 type Values = Values<'a, V>;
1861
1862 #[instrument(level = "trace", skip_all, err)]
1863 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1864 let key_buf = be_fix_int_ser(key)?;
1865 let readopts = self.opts.readopts();
1868 Ok(self
1869 .rocksdb
1870 .key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1871 && self
1872 .rocksdb
1873 .get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1874 .map_err(typed_store_err_from_rocks_err)?
1875 .is_some())
1876 }
1877
1878 #[instrument(level = "trace", skip_all, err)]
1879 fn multi_contains_keys<J>(
1880 &self,
1881 keys: impl IntoIterator<Item = J>,
1882 ) -> Result<Vec<bool>, Self::Error>
1883 where
1884 J: Borrow<K>,
1885 {
1886 let values = self.multi_get_pinned(keys)?;
1887 Ok(values.into_iter().map(|v| v.is_some()).collect())
1888 }
1889
1890 #[instrument(level = "trace", skip_all, err)]
1891 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1892 let _timer = self
1893 .db_metrics
1894 .op_metrics
1895 .rocksdb_get_latency_seconds
1896 .with_label_values(&[&self.cf])
1897 .start_timer();
1898 let perf_ctx = if self.get_sample_interval.sample() {
1899 Some(RocksDBPerfContext)
1900 } else {
1901 None
1902 };
1903 let key_buf = be_fix_int_ser(key)?;
1904 let res = self
1905 .rocksdb
1906 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1907 .map_err(typed_store_err_from_rocks_err)?;
1908 self.db_metrics
1909 .op_metrics
1910 .rocksdb_get_bytes
1911 .with_label_values(&[&self.cf])
1912 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1913 if perf_ctx.is_some() {
1914 self.db_metrics
1915 .read_perf_ctx_metrics
1916 .report_metrics(&self.cf);
1917 }
1918 match res {
1919 Some(data) => Ok(Some(
1920 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1921 )),
1922 None => Ok(None),
1923 }
1924 }
1925
1926 #[instrument(level = "trace", skip_all, err)]
1927 fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
1928 let _timer = self
1929 .db_metrics
1930 .op_metrics
1931 .rocksdb_get_latency_seconds
1932 .with_label_values(&[&self.cf])
1933 .start_timer();
1934 let perf_ctx = if self.get_sample_interval.sample() {
1935 Some(RocksDBPerfContext)
1936 } else {
1937 None
1938 };
1939 let key_buf = be_fix_int_ser(key)?;
1940 let res = self
1941 .rocksdb
1942 .get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1943 .map_err(typed_store_err_from_rocks_err)?;
1944 self.db_metrics
1945 .op_metrics
1946 .rocksdb_get_bytes
1947 .with_label_values(&[&self.cf])
1948 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1949 if perf_ctx.is_some() {
1950 self.db_metrics
1951 .read_perf_ctx_metrics
1952 .report_metrics(&self.cf);
1953 }
1954 match res {
1955 Some(data) => Ok(Some(data.to_vec())),
1956 None => Ok(None),
1957 }
1958 }
1959
1960 #[instrument(level = "trace", skip_all, err)]
1961 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1962 let timer = self
1963 .db_metrics
1964 .op_metrics
1965 .rocksdb_put_latency_seconds
1966 .with_label_values(&[&self.cf])
1967 .start_timer();
1968 let perf_ctx = if self.write_sample_interval.sample() {
1969 Some(RocksDBPerfContext)
1970 } else {
1971 None
1972 };
1973 let key_buf = be_fix_int_ser(key)?;
1974 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1975 self.db_metrics
1976 .op_metrics
1977 .rocksdb_put_bytes
1978 .with_label_values(&[&self.cf])
1979 .observe((key_buf.len() + value_buf.len()) as f64);
1980 if perf_ctx.is_some() {
1981 self.db_metrics
1982 .write_perf_ctx_metrics
1983 .report_metrics(&self.cf);
1984 }
1985 self.rocksdb
1986 .put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
1987 .map_err(typed_store_err_from_rocks_err)?;
1988
1989 let elapsed = timer.stop_and_record();
1990 if elapsed > 1.0 {
1991 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1992 self.db_metrics
1993 .op_metrics
1994 .rocksdb_very_slow_puts_count
1995 .with_label_values(&[&self.cf])
1996 .inc();
1997 self.db_metrics
1998 .op_metrics
1999 .rocksdb_very_slow_puts_duration_ms
2000 .with_label_values(&[&self.cf])
2001 .inc_by((elapsed * 1000.0) as u64);
2002 }
2003
2004 Ok(())
2005 }
2006
2007 #[instrument(level = "trace", skip_all, err)]
2008 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
2009 let _timer = self
2010 .db_metrics
2011 .op_metrics
2012 .rocksdb_delete_latency_seconds
2013 .with_label_values(&[&self.cf])
2014 .start_timer();
2015 let perf_ctx = if self.write_sample_interval.sample() {
2016 Some(RocksDBPerfContext)
2017 } else {
2018 None
2019 };
2020 let key_buf = be_fix_int_ser(key)?;
2021 self.rocksdb
2022 .delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
2023 .map_err(typed_store_err_from_rocks_err)?;
2024 self.db_metrics
2025 .op_metrics
2026 .rocksdb_deletes
2027 .with_label_values(&[&self.cf])
2028 .inc();
2029 if perf_ctx.is_some() {
2030 self.db_metrics
2031 .write_perf_ctx_metrics
2032 .report_metrics(&self.cf);
2033 }
2034 Ok(())
2035 }
2036
2037 #[instrument(level = "trace", skip_all, err)]
2042 fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
2043 let _ = self.rocksdb.drop_cf(&self.cf);
2044 self.rocksdb
2045 .create_cf(self.cf.clone(), &default_db_options().options)
2046 .map_err(typed_store_err_from_rocks_err)?;
2047 Ok(())
2048 }
2049
2050 #[instrument(level = "trace", skip_all, err)]
2059 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
2060 let mut iter = self.unbounded_iter().seek_to_first();
2061 let first_key = iter.next().map(|(k, _v)| k);
2062 let last_key = iter.skip_to_last().next().map(|(k, _v)| k);
2063 if let Some((first_key, last_key)) = first_key.zip(last_key) {
2064 let mut batch = self.batch();
2065 batch.schedule_delete_range(self, &first_key, &last_key)?;
2066 batch.write()?;
2067 }
2068 Ok(())
2069 }
2070
2071 fn is_empty(&self) -> bool {
2072 self.safe_iter().next().is_none()
2073 }
2074
2075 fn unbounded_iter(&'a self) -> Self::Iterator {
2078 let db_iter = self
2079 .rocksdb
2080 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2081 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2082 Iter::new(
2083 self.cf.clone(),
2084 db_iter,
2085 _timer,
2086 _perf_ctx,
2087 bytes_scanned,
2088 keys_scanned,
2089 Some(self.db_metrics.clone()),
2090 )
2091 }
2092
2093 fn iter_with_bounds(
2097 &'a self,
2098 lower_bound: Option<K>,
2099 upper_bound: Option<K>,
2100 ) -> Self::Iterator {
2101 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2102 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2103 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2104 Iter::new(
2105 self.cf.clone(),
2106 db_iter,
2107 _timer,
2108 _perf_ctx,
2109 bytes_scanned,
2110 keys_scanned,
2111 Some(self.db_metrics.clone()),
2112 )
2113 }
2114
2115 fn range_iter(&'a self, range: impl RangeBounds<K>) -> Self::Iterator {
2118 let readopts = self.create_read_options_with_range(range);
2119 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2120 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2121 Iter::new(
2122 self.cf.clone(),
2123 db_iter,
2124 _timer,
2125 _perf_ctx,
2126 bytes_scanned,
2127 keys_scanned,
2128 Some(self.db_metrics.clone()),
2129 )
2130 }
2131
2132 fn safe_iter(&'a self) -> Self::SafeIterator {
2133 let db_iter = self
2134 .rocksdb
2135 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2136 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2137 SafeIter::new(
2138 self.cf.clone(),
2139 db_iter,
2140 _timer,
2141 _perf_ctx,
2142 bytes_scanned,
2143 keys_scanned,
2144 Some(self.db_metrics.clone()),
2145 )
2146 }
2147
2148 fn safe_iter_with_bounds(
2149 &'a self,
2150 lower_bound: Option<K>,
2151 upper_bound: Option<K>,
2152 ) -> Self::SafeIterator {
2153 let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
2154 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2155 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2156 SafeIter::new(
2157 self.cf.clone(),
2158 db_iter,
2159 _timer,
2160 _perf_ctx,
2161 bytes_scanned,
2162 keys_scanned,
2163 Some(self.db_metrics.clone()),
2164 )
2165 }
2166
2167 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
2168 let readopts = self.create_read_options_with_range(range);
2169 let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
2170 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2171 SafeIter::new(
2172 self.cf.clone(),
2173 db_iter,
2174 _timer,
2175 _perf_ctx,
2176 bytes_scanned,
2177 keys_scanned,
2178 Some(self.db_metrics.clone()),
2179 )
2180 }
2181
2182 fn keys(&'a self) -> Self::Keys {
2183 let mut db_iter = self
2184 .rocksdb
2185 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2186 db_iter.seek_to_first();
2187
2188 Keys::new(db_iter)
2189 }
2190
2191 fn values(&'a self) -> Self::Values {
2192 let mut db_iter = self
2193 .rocksdb
2194 .raw_iterator_cf(&self.cf(), self.opts.readopts());
2195 db_iter.seek_to_first();
2196
2197 Values::new(db_iter)
2198 }
2199
2200 #[instrument(level = "trace", skip_all, err)]
2202 fn multi_get_raw_bytes<J>(
2203 &self,
2204 keys: impl IntoIterator<Item = J>,
2205 ) -> Result<Vec<Option<Vec<u8>>>, TypedStoreError>
2206 where
2207 J: Borrow<K>,
2208 {
2209 let results = self
2210 .multi_get_pinned(keys)?
2211 .into_iter()
2212 .map(|val| val.map(|v| v.to_vec()))
2213 .collect();
2214 Ok(results)
2215 }
2216
2217 #[instrument(level = "trace", skip_all, err)]
2219 fn multi_get<J>(
2220 &self,
2221 keys: impl IntoIterator<Item = J>,
2222 ) -> Result<Vec<Option<V>>, TypedStoreError>
2223 where
2224 J: Borrow<K>,
2225 {
2226 let results = self.multi_get_pinned(keys)?;
2227 let values_parsed: Result<Vec<_>, TypedStoreError> = results
2228 .into_iter()
2229 .map(|value_byte| match value_byte {
2230 Some(data) => Ok(Some(
2231 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2232 )),
2233 None => Ok(None),
2234 })
2235 .collect();
2236
2237 values_parsed
2238 }
2239
2240 #[instrument(level = "trace", skip_all, err)]
2242 fn chunked_multi_get<J>(
2243 &self,
2244 keys: impl IntoIterator<Item = J>,
2245 chunk_size: usize,
2246 ) -> Result<Vec<Option<V>>, TypedStoreError>
2247 where
2248 J: Borrow<K>,
2249 {
2250 let cf = self.cf();
2251 let keys_bytes = keys
2252 .into_iter()
2253 .map(|k| (&cf, be_fix_int_ser(k.borrow()).unwrap()));
2254 let chunked_keys = keys_bytes.into_iter().chunks(chunk_size);
2255 let snapshot = self.snapshot()?;
2256 let mut results = vec![];
2257 for chunk in chunked_keys.into_iter() {
2258 let chunk_result = snapshot.multi_get_cf(chunk);
2259 let values_parsed: Result<Vec<_>, TypedStoreError> = chunk_result
2260 .into_iter()
2261 .map(|value_byte| {
2262 let value_byte = value_byte.map_err(typed_store_err_from_rocks_err)?;
2263 match value_byte {
2264 Some(data) => Ok(Some(
2265 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2266 )),
2267 None => Ok(None),
2268 }
2269 })
2270 .collect();
2271 results.extend(values_parsed?);
2272 }
2273 Ok(results)
2274 }
2275
2276 #[instrument(level = "trace", skip_all, err)]
2278 fn multi_insert<J, U>(
2279 &self,
2280 key_val_pairs: impl IntoIterator<Item = (J, U)>,
2281 ) -> Result<(), Self::Error>
2282 where
2283 J: Borrow<K>,
2284 U: Borrow<V>,
2285 {
2286 let mut batch = self.batch();
2287 batch.insert_batch(self, key_val_pairs)?;
2288 batch.write()
2289 }
2290
2291 #[instrument(level = "trace", skip_all, err)]
2293 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2294 where
2295 J: Borrow<K>,
2296 {
2297 let mut batch = self.batch();
2298 batch.delete_batch(self, keys)?;
2299 batch.write()
2300 }
2301
2302 #[instrument(level = "trace", skip_all, err)]
2304 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2305 self.rocksdb
2306 .try_catch_up_with_primary()
2307 .map_err(typed_store_err_from_rocks_err)
2308 }
2309}
2310
2311impl<J, K, U, V> TryExtend<(J, U)> for DBMap<K, V>
2312where
2313 J: Borrow<K>,
2314 U: Borrow<V>,
2315 K: Serialize,
2316 V: Serialize,
2317{
2318 type Error = TypedStoreError;
2319
2320 fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
2321 where
2322 T: Iterator<Item = (J, U)>,
2323 {
2324 let mut batch = self.batch();
2325 batch.insert_batch(self, iter)?;
2326 batch.write()
2327 }
2328
2329 fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
2330 let slice_of_refs = slice.iter().map(|(k, v)| (k.borrow(), v.borrow()));
2331 let mut batch = self.batch();
2332 batch.insert_batch(self, slice_of_refs)?;
2333 batch.write()
2334 }
2335}
2336
2337pub fn read_size_from_env(var_name: &str) -> Option<usize> {
2338 env::var(var_name)
2339 .ok()?
2340 .parse::<usize>()
2341 .tap_err(|e| {
2342 warn!(
2343 "Env var {} does not contain valid usize integer: {}",
2344 var_name, e
2345 )
2346 })
2347 .ok()
2348}
2349
2350#[derive(Clone, Debug)]
2351pub struct ReadWriteOptions {
2352 pub ignore_range_deletions: bool,
2353 sync_to_disk: bool,
2355}
2356
2357impl ReadWriteOptions {
2358 pub fn readopts(&self) -> ReadOptions {
2359 let mut readopts = ReadOptions::default();
2360 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
2361 readopts
2362 }
2363
2364 pub fn writeopts(&self) -> WriteOptions {
2365 let mut opts = WriteOptions::default();
2366 opts.set_sync(self.sync_to_disk);
2367 opts
2368 }
2369
2370 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
2371 self.ignore_range_deletions = ignore;
2372 self
2373 }
2374}
2375
2376impl Default for ReadWriteOptions {
2377 fn default() -> Self {
2378 Self {
2379 ignore_range_deletions: true,
2380 sync_to_disk: std::env::var("IOTA_DB_SYNC_TO_DISK").is_ok_and(|v| v != "0"),
2381 }
2382 }
2383}
2384#[derive(Default, Clone)]
2387pub struct DBOptions {
2388 pub options: rocksdb::Options,
2389 pub rw_options: ReadWriteOptions,
2390}
2391
2392impl DBOptions {
2393 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
2397 self.options
2399 .optimize_for_point_lookup(block_cache_size_mb as u64);
2400 self
2401 }
2402
2403 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
2406 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
2407 info!("Large value blob storage optimization is disabled via env var.");
2408 return self;
2409 }
2410
2411 self.options.set_enable_blob_files(true);
2413 self.options
2414 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
2415 self.options.set_enable_blob_gc(true);
2416 self.options.set_min_blob_size(min_blob_size);
2420
2421 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2423 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2424 * 1024
2425 * 1024;
2426 self.options.set_write_buffer_size(write_buffer_size);
2427 let target_file_size_base = 64 << 20;
2430 self.options
2431 .set_target_file_size_base(target_file_size_base);
2432 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2434 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2435 self.options
2436 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
2437
2438 self
2439 }
2440
2441 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
2443 self.options
2444 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
2445 self
2446 }
2447
2448 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
2450 self.options
2451 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
2452 self.options
2453 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
2454 self
2455 }
2456
2457 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
2459 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2461 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2462 * 1024
2463 * 1024;
2464 self.options.set_write_buffer_size(write_buffer_size);
2465 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2467 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2468 self.options
2469 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2470 self.options
2472 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2473
2474 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2476 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
2477 self.options.set_level_zero_file_num_compaction_trigger(
2478 max_level_zero_file_num.try_into().unwrap(),
2479 );
2480 self.options.set_level_zero_slowdown_writes_trigger(
2481 (max_level_zero_file_num * 12).try_into().unwrap(),
2482 );
2483 self.options
2484 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2485
2486 self.options.set_target_file_size_base(
2488 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2489 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2490 * 1024
2491 * 1024,
2492 );
2493
2494 self.options
2496 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2497
2498 self
2499 }
2500
2501 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
2505 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
2507 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
2508 * 1024
2509 * 1024;
2510 self.options.set_write_buffer_size(write_buffer_size);
2511 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
2513 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
2514 self.options
2515 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
2516 self.options
2518 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
2519
2520 self.options
2522 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
2523 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
2524 compaction_options.set_max_size_amplification_percent(10000);
2525 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
2526 self.options
2527 .set_universal_compaction_options(&compaction_options);
2528
2529 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
2530 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
2531 self.options.set_level_zero_file_num_compaction_trigger(
2532 max_level_zero_file_num.try_into().unwrap(),
2533 );
2534 self.options.set_level_zero_slowdown_writes_trigger(
2535 (max_level_zero_file_num * 12).try_into().unwrap(),
2536 );
2537 self.options
2538 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
2539
2540 self.options.set_target_file_size_base(
2542 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
2543 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
2544 * 1024
2545 * 1024,
2546 );
2547
2548 self.options
2550 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
2551
2552 self
2553 }
2554
2555 pub fn set_block_options(
2557 mut self,
2558 block_cache_size_mb: usize,
2559 block_size_bytes: usize,
2560 ) -> DBOptions {
2561 self.options
2562 .set_block_based_table_factory(&get_block_options(
2563 block_cache_size_mb,
2564 block_size_bytes,
2565 ));
2566 self
2567 }
2568
2569 pub fn disable_write_throttling(mut self) -> DBOptions {
2571 self.options.set_soft_pending_compaction_bytes_limit(0);
2572 self.options.set_hard_pending_compaction_bytes_limit(0);
2573 self
2574 }
2575}
2576
2577pub fn default_db_options() -> DBOptions {
2580 let mut opt = rocksdb::Options::default();
2581
2582 if let Some(limit) = fdlimit::raise_fd_limit() {
2586 opt.set_max_open_files((limit / 8) as i32);
2588 }
2589
2590 opt.set_table_cache_num_shard_bits(10);
2593
2594 opt.set_min_level_to_compress(2);
2596 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
2597 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
2598 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
2599
2600 opt.set_db_write_buffer_size(
2612 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
2613 * 1024
2614 * 1024,
2615 );
2616 opt.set_max_total_wal_size(
2617 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
2618 );
2619
2620 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
2622
2623 opt.set_enable_pipelined_write(true);
2624
2625 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
2628
2629 opt.set_memtable_prefix_bloom_ratio(0.02);
2631
2632 DBOptions {
2633 options: opt,
2634 rw_options: ReadWriteOptions::default(),
2635 }
2636}
2637
2638fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
2639 let mut block_options = BlockBasedOptions::default();
2644 block_options.set_block_size(block_size_bytes);
2646 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
2648 block_options.set_bloom_filter(10.0, false);
2650 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
2652 block_options
2653}
2654
2655#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
2658pub fn open_cf<P: AsRef<Path>>(
2659 path: P,
2660 db_options: Option<rocksdb::Options>,
2661 metric_conf: MetricConf,
2662 opt_cfs: &[&str],
2663) -> Result<Arc<RocksDB>, TypedStoreError> {
2664 let options = db_options.unwrap_or_else(|| default_db_options().options);
2665 let column_descriptors: Vec<_> = opt_cfs
2666 .iter()
2667 .map(|name| (*name, options.clone()))
2668 .collect();
2669 open_cf_opts(
2670 path,
2671 Some(options.clone()),
2672 metric_conf,
2673 &column_descriptors[..],
2674 )
2675}
2676
2677fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
2678 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2680 options.create_if_missing(true);
2681 options.create_missing_column_families(true);
2682 options
2683}
2684
2685#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2688pub fn open_cf_opts<P: AsRef<Path>>(
2689 path: P,
2690 db_options: Option<rocksdb::Options>,
2691 metric_conf: MetricConf,
2692 opt_cfs: &[(&str, rocksdb::Options)],
2693) -> Result<Arc<RocksDB>, TypedStoreError> {
2694 let path = path.as_ref();
2695 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2705 nondeterministic!({
2706 let options = prepare_db_options(db_options);
2707 let rocksdb = {
2708 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2709 &options,
2710 path,
2711 cfs.into_iter()
2712 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2713 )
2714 .map_err(typed_store_err_from_rocks_err)?
2715 };
2716 Ok(Arc::new(RocksDB::DBWithThreadMode(
2717 DBWithThreadModeWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2718 )))
2719 })
2720}
2721
2722#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2725pub fn open_cf_opts_transactional<P: AsRef<Path>>(
2726 path: P,
2727 db_options: Option<rocksdb::Options>,
2728 metric_conf: MetricConf,
2729 opt_cfs: &[(&str, rocksdb::Options)],
2730) -> Result<Arc<RocksDB>, TypedStoreError> {
2731 let path = path.as_ref();
2732 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2733 nondeterministic!({
2735 let options = prepare_db_options(db_options);
2736 let rocksdb = rocksdb::OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
2737 &options,
2738 path,
2739 cfs.into_iter()
2740 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2741 )
2742 .map_err(typed_store_err_from_rocks_err)?;
2743 Ok(Arc::new(RocksDB::OptimisticTransactionDB(
2744 OptimisticTransactionDBWrapper::new(rocksdb, metric_conf, PathBuf::from(path)),
2745 )))
2746 })
2747}
2748
2749pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2752 primary_path: P,
2753 secondary_path: Option<P>,
2754 db_options: Option<rocksdb::Options>,
2755 metric_conf: MetricConf,
2756 opt_cfs: &[(&str, rocksdb::Options)],
2757) -> Result<Arc<RocksDB>, TypedStoreError> {
2758 let primary_path = primary_path.as_ref();
2759 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2760 nondeterministic!({
2762 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2764
2765 fdlimit::raise_fd_limit();
2766 options.set_max_open_files(-1);
2768
2769 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2770 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2771 .ok()
2772 .unwrap_or_default();
2773
2774 let default_db_options = default_db_options();
2775 for cf_key in cfs.iter() {
2777 if !opt_cfs.contains_key(&cf_key[..]) {
2778 opt_cfs.insert(cf_key, default_db_options.options.clone());
2779 }
2780 }
2781
2782 let primary_path = primary_path.to_path_buf();
2783 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2784 let mut s = primary_path.clone();
2785 s.pop();
2786 s.push("SECONDARY");
2787 s.as_path().to_path_buf()
2788 });
2789
2790 let rocksdb = {
2791 options.create_if_missing(true);
2792 options.create_missing_column_families(true);
2793 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2794 &options,
2795 &primary_path,
2796 &secondary_path,
2797 opt_cfs
2798 .iter()
2799 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2800 )
2801 .map_err(typed_store_err_from_rocks_err)?;
2802 db.try_catch_up_with_primary()
2803 .map_err(typed_store_err_from_rocks_err)?;
2804 db
2805 };
2806 Ok(Arc::new(RocksDB::DBWithThreadMode(
2807 DBWithThreadModeWrapper::new(rocksdb, metric_conf, secondary_path),
2808 )))
2809 })
2810}
2811
2812pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
2813 const DB_DEFAULT_CF_NAME: &str = "default";
2814
2815 let opts = rocksdb::Options::default();
2816 rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
2817 .map_err(|e| e.into())
2818 .map(|q| {
2819 q.iter()
2820 .filter_map(|s| {
2821 if s != DB_DEFAULT_CF_NAME {
2823 Some(s.clone())
2824 } else {
2825 None
2826 }
2827 })
2828 .collect()
2829 })
2830}
2831
2832#[inline]
2834pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
2835where
2836 S: ?Sized + serde::Serialize,
2837{
2838 bincode::DefaultOptions::new()
2839 .with_big_endian()
2840 .with_fixint_encoding()
2841 .serialize(t)
2842 .map_err(typed_store_err_from_bincode_err)
2843}
2844
2845#[derive(Clone)]
2846pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
2847impl DBMapTableConfigMap {
2848 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
2849 Self(map)
2850 }
2851
2852 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
2853 self.0.clone()
2854 }
2855}
2856
2857pub enum RocksDBAccessType {
2858 Primary,
2859 Secondary(Option<PathBuf>),
2860}
2861
2862pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
2863 rocksdb::DB::destroy(&rocksdb::Options::default(), path)
2864}
2865
2866fn populate_missing_cfs(
2867 input_cfs: &[(&str, rocksdb::Options)],
2868 path: &Path,
2869) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2870 let mut cfs = vec![];
2871 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2872 let existing_cfs =
2873 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2874 .ok()
2875 .unwrap_or_default();
2876
2877 for cf_name in existing_cfs {
2878 if !input_cf_index.contains(&cf_name[..]) {
2879 cfs.push((cf_name, rocksdb::Options::default()));
2880 }
2881 }
2882 cfs.extend(
2883 input_cfs
2884 .iter()
2885 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2886 );
2887 Ok(cfs)
2888}
2889
2890fn big_endian_saturating_add_one(v: &mut [u8]) {
2894 if is_max(v) {
2895 return;
2896 }
2897 for i in (0..v.len()).rev() {
2898 if v[i] == u8::MAX {
2899 v[i] = 0;
2900 } else {
2901 v[i] += 1;
2902 break;
2903 }
2904 }
2905}
2906
2907fn is_max(v: &[u8]) -> bool {
2909 v.iter().all(|&x| x == u8::MAX)
2910}
2911
2912#[expect(clippy::assign_op_pattern, clippy::manual_div_ceil)]
2915#[test]
2916fn test_helpers() {
2917 let v = vec![];
2918 assert!(is_max(&v));
2919
2920 fn check_add(v: Vec<u8>) {
2921 let mut v = v;
2922 let num = Num32::from_big_endian(&v);
2923 big_endian_saturating_add_one(&mut v);
2924 assert!(num + 1 == Num32::from_big_endian(&v));
2925 }
2926
2927 uint::construct_uint! {
2928 struct Num32(4);
2930 }
2931
2932 let mut v = vec![255; 32];
2933 big_endian_saturating_add_one(&mut v);
2934 assert!(Num32::MAX == Num32::from_big_endian(&v));
2935
2936 check_add(vec![1; 32]);
2937 check_add(vec![6; 32]);
2938 check_add(vec![254; 32]);
2939
2940 }