typed_store/rocks/
safe_iter.rs1use std::{marker::PhantomData, sync::Arc};
6
7use bincode::Options;
8use prometheus::{Histogram, HistogramTimer};
9use rocksdb::Direction;
10use serde::{Serialize, de::DeserializeOwned};
11
12use super::{RocksDBRawIter, TypedStoreError, be_fix_int_ser};
13use crate::metrics::{DBMetrics, RocksDBPerfContext};
14
15pub struct SafeIter<'a, K, V> {
17 cf_name: String,
18 db_iter: RocksDBRawIter<'a>,
19 _phantom: PhantomData<(K, V)>,
20 direction: Direction,
21 is_initialized: bool,
22 _timer: Option<HistogramTimer>,
23 _perf_ctx: Option<RocksDBPerfContext>,
24 bytes_scanned: Option<Histogram>,
25 keys_scanned: Option<Histogram>,
26 db_metrics: Option<Arc<DBMetrics>>,
27 bytes_scanned_counter: usize,
28 keys_returned_counter: usize,
29}
30
31impl<'a, K: DeserializeOwned, V: DeserializeOwned> SafeIter<'a, K, V> {
32 pub(super) fn new(
33 cf_name: String,
34 db_iter: RocksDBRawIter<'a>,
35 _timer: Option<HistogramTimer>,
36 _perf_ctx: Option<RocksDBPerfContext>,
37 bytes_scanned: Option<Histogram>,
38 keys_scanned: Option<Histogram>,
39 db_metrics: Option<Arc<DBMetrics>>,
40 ) -> Self {
41 Self {
42 cf_name,
43 db_iter,
44 _phantom: PhantomData,
45 direction: Direction::Forward,
46 is_initialized: false,
47 _timer,
48 _perf_ctx,
49 bytes_scanned,
50 keys_scanned,
51 db_metrics,
52 bytes_scanned_counter: 0,
53 keys_returned_counter: 0,
54 }
55 }
56}
57
58impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SafeIter<'_, K, V> {
59 type Item = Result<(K, V), TypedStoreError>;
60
61 fn next(&mut self) -> Option<Self::Item> {
62 if !self.is_initialized {
65 self.db_iter.seek_to_first();
66 self.is_initialized = true;
67 }
68 if self.db_iter.valid() {
69 let config = bincode::DefaultOptions::new()
70 .with_big_endian()
71 .with_fixint_encoding();
72 let raw_key = self
73 .db_iter
74 .key()
75 .expect("Valid iterator failed to get key");
76 let raw_value = self
77 .db_iter
78 .value()
79 .expect("Valid iterator failed to get value");
80 self.bytes_scanned_counter += raw_key.len() + raw_value.len();
81 self.keys_returned_counter += 1;
82 let key = config.deserialize(raw_key).ok();
83 let value = bcs::from_bytes(raw_value).ok();
84 match self.direction {
85 Direction::Forward => self.db_iter.next(),
86 Direction::Reverse => self.db_iter.prev(),
87 }
88 key.and_then(|k| value.map(|v| Ok((k, v))))
89 } else {
90 match self.db_iter.status() {
91 Ok(_) => None,
92 Err(err) => Some(Err(TypedStoreError::RocksDB(format!("{err}")))),
93 }
94 }
95 }
96}
97
98impl<K, V> Drop for SafeIter<'_, K, V> {
99 fn drop(&mut self) {
100 if let Some(bytes_scanned) = self.bytes_scanned.take() {
101 bytes_scanned.observe(self.bytes_scanned_counter as f64);
102 }
103 if let Some(keys_scanned) = self.keys_scanned.take() {
104 keys_scanned.observe(self.keys_returned_counter as f64);
105 }
106 if let Some(db_metrics) = self.db_metrics.take() {
107 db_metrics
108 .read_perf_ctx_metrics
109 .report_metrics(&self.cf_name);
110 }
111 }
112}
113
114impl<'a, K: Serialize, V> SafeIter<'a, K, V> {
115 pub fn skip_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
119 self.db_iter.seek(be_fix_int_ser(key)?);
120 self.is_initialized = true;
121 Ok(self)
122 }
123
124 pub fn skip_prior_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
128 self.db_iter.seek_for_prev(be_fix_int_ser(key)?);
129 self.is_initialized = true;
130 Ok(self)
131 }
132
133 pub fn skip_to_last(mut self) -> Self {
135 self.db_iter.seek_to_last();
136 self.is_initialized = true;
137 self
138 }
139
140 pub fn reverse(mut self) -> SafeRevIter<'a, K, V> {
144 self.direction = Direction::Reverse;
145 SafeRevIter::new(self)
146 }
147}
148
149pub struct SafeRevIter<'a, K, V> {
154 iter: SafeIter<'a, K, V>,
155}
156
157impl<'a, K, V> SafeRevIter<'a, K, V> {
158 fn new(iter: SafeIter<'a, K, V>) -> Self {
159 Self { iter }
160 }
161}
162
163impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SafeRevIter<'_, K, V> {
164 type Item = Result<(K, V), TypedStoreError>;
165
166 fn next(&mut self) -> Option<Self::Item> {
168 self.iter.next()
169 }
170}