typed_store/rocks/
iter.rs1use std::{marker::PhantomData, sync::Arc};
6
7use bincode::Options;
8use prometheus::{Histogram, HistogramTimer};
9use rocksdb::Direction;
10use serde::de::DeserializeOwned;
11
12use super::RocksDBRawIter;
13use crate::{DBMetrics, metrics::RocksDBPerfContext};
14
15pub struct Iter<'a, K, V> {
17 cf_name: String,
18 db_iter: RocksDBRawIter<'a>,
19 _phantom: PhantomData<*const (K, V)>,
22 direction: Direction,
23 is_initialized: bool,
24 _timer: Option<HistogramTimer>,
25 _perf_ctx: Option<RocksDBPerfContext>,
26 bytes_scanned: Option<Histogram>,
27 keys_scanned: Option<Histogram>,
28 db_metrics: Option<Arc<DBMetrics>>,
29 bytes_scanned_counter: usize,
30 keys_returned_counter: usize,
31}
32
33impl<'a, K: DeserializeOwned, V: DeserializeOwned> Iter<'a, K, V> {
34 pub(super) fn new(
35 cf_name: String,
36 db_iter: RocksDBRawIter<'a>,
37 _timer: Option<HistogramTimer>,
38 _perf_ctx: Option<RocksDBPerfContext>,
39 bytes_scanned: Option<Histogram>,
40 keys_scanned: Option<Histogram>,
41 db_metrics: Option<Arc<DBMetrics>>,
42 ) -> Self {
43 Self {
44 cf_name,
45 db_iter,
46 _phantom: PhantomData,
47 direction: Direction::Forward,
48 is_initialized: false,
49 _timer,
50 _perf_ctx,
51 bytes_scanned,
52 keys_scanned,
53 db_metrics,
54 bytes_scanned_counter: 0,
55 keys_returned_counter: 0,
56 }
57 }
58}
59
60impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for Iter<'_, K, V> {
61 type Item = (K, V);
62
63 fn next(&mut self) -> Option<Self::Item> {
64 if !self.is_initialized {
67 self.db_iter.seek_to_first();
68 self.is_initialized = true;
69 }
70 if self.db_iter.valid() {
71 let config = bincode::DefaultOptions::new()
72 .with_big_endian()
73 .with_fixint_encoding();
74 let raw_key = self
75 .db_iter
76 .key()
77 .expect("Valid iterator failed to get key");
78 let raw_value = self
79 .db_iter
80 .value()
81 .expect("Valid iterator failed to get value");
82 self.bytes_scanned_counter += raw_key.len() + raw_value.len();
83 self.keys_returned_counter += 1;
84 let key = config.deserialize(raw_key).ok();
85 let value = bcs::from_bytes(raw_value).ok();
86 match self.direction {
87 Direction::Forward => self.db_iter.next(),
88 Direction::Reverse => self.db_iter.prev(),
89 }
90 key.and_then(|k| value.map(|v| (k, v)))
91 } else {
92 None
93 }
94 }
95}
96
97impl<K, V> Drop for Iter<'_, K, V> {
98 fn drop(&mut self) {
99 if let Some(bytes_scanned) = self.bytes_scanned.take() {
100 bytes_scanned.observe(self.bytes_scanned_counter as f64);
101 }
102 if let Some(keys_scanned) = self.keys_scanned.take() {
103 keys_scanned.observe(self.keys_returned_counter as f64);
104 }
105 if let Some(db_metrics) = self.db_metrics.take() {
106 db_metrics
107 .read_perf_ctx_metrics
108 .report_metrics(&self.cf_name);
109 }
110 }
111}