typed_store/rocks/
iter.rs1use std::{marker::PhantomData, sync::Arc};
6
7use bincode::Options;
8use prometheus::{Histogram, HistogramTimer};
9use rocksdb::Direction;
10use serde::{Serialize, de::DeserializeOwned};
11
12use super::{RocksDBRawIter, TypedStoreError, be_fix_int_ser};
13use crate::{DBMetrics, metrics::RocksDBPerfContext};
14
15pub struct Iter<'a, K, V> {
17 cf_name: String,
18 db_iter: RocksDBRawIter<'a>,
19 _phantom: PhantomData<*const (K, V)>,
22 direction: Direction,
23 is_initialized: bool,
24 _timer: Option<HistogramTimer>,
25 _perf_ctx: Option<RocksDBPerfContext>,
26 bytes_scanned: Option<Histogram>,
27 keys_scanned: Option<Histogram>,
28 db_metrics: Option<Arc<DBMetrics>>,
29 bytes_scanned_counter: usize,
30 keys_returned_counter: usize,
31}
32
33impl<'a, K: DeserializeOwned, V: DeserializeOwned> Iter<'a, K, V> {
34 pub(super) fn new(
35 cf_name: String,
36 db_iter: RocksDBRawIter<'a>,
37 _timer: Option<HistogramTimer>,
38 _perf_ctx: Option<RocksDBPerfContext>,
39 bytes_scanned: Option<Histogram>,
40 keys_scanned: Option<Histogram>,
41 db_metrics: Option<Arc<DBMetrics>>,
42 ) -> Self {
43 Self {
44 cf_name,
45 db_iter,
46 _phantom: PhantomData,
47 direction: Direction::Forward,
48 is_initialized: false,
49 _timer,
50 _perf_ctx,
51 bytes_scanned,
52 keys_scanned,
53 db_metrics,
54 bytes_scanned_counter: 0,
55 keys_returned_counter: 0,
56 }
57 }
58}
59
60impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for Iter<'_, K, V> {
61 type Item = (K, V);
62
63 fn next(&mut self) -> Option<Self::Item> {
64 if !self.is_initialized {
67 self.db_iter.seek_to_first();
68 self.is_initialized = true;
69 }
70 if self.db_iter.valid() {
71 let config = bincode::DefaultOptions::new()
72 .with_big_endian()
73 .with_fixint_encoding();
74 let raw_key = self
75 .db_iter
76 .key()
77 .expect("Valid iterator failed to get key");
78 let raw_value = self
79 .db_iter
80 .value()
81 .expect("Valid iterator failed to get value");
82 self.bytes_scanned_counter += raw_key.len() + raw_value.len();
83 self.keys_returned_counter += 1;
84 let key = config.deserialize(raw_key).ok();
85 let value = bcs::from_bytes(raw_value).ok();
86 match self.direction {
87 Direction::Forward => self.db_iter.next(),
88 Direction::Reverse => self.db_iter.prev(),
89 }
90 key.and_then(|k| value.map(|v| (k, v)))
91 } else {
92 None
93 }
94 }
95}
96
97impl<K, V> Drop for Iter<'_, K, V> {
98 fn drop(&mut self) {
99 if let Some(bytes_scanned) = self.bytes_scanned.take() {
100 bytes_scanned.observe(self.bytes_scanned_counter as f64);
101 }
102 if let Some(keys_scanned) = self.keys_scanned.take() {
103 keys_scanned.observe(self.keys_returned_counter as f64);
104 }
105 if let Some(db_metrics) = self.db_metrics.take() {
106 db_metrics
107 .read_perf_ctx_metrics
108 .report_metrics(&self.cf_name);
109 }
110 }
111}
112
113impl<'a, K: Serialize, V> Iter<'a, K, V> {
114 pub fn skip_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
118 self.is_initialized = true;
119 self.db_iter.seek(be_fix_int_ser(key)?);
120 Ok(self)
121 }
122
123 pub fn skip_prior_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
127 self.is_initialized = true;
128 self.db_iter.seek_for_prev(be_fix_int_ser(key)?);
129 Ok(self)
130 }
131
132 pub fn skip_to_last(mut self) -> Self {
134 self.is_initialized = true;
135 self.db_iter.seek_to_last();
136 self
137 }
138
139 pub fn seek_to_first(mut self) -> Self {
141 self.is_initialized = true;
142 self.db_iter.seek_to_first();
143 self
144 }
145
146 pub fn reverse(mut self) -> RevIter<'a, K, V> {
150 self.direction = Direction::Reverse;
151 RevIter::new(self)
152 }
153}
154
155pub struct RevIter<'a, K, V> {
160 iter: Iter<'a, K, V>,
161}
162
163impl<'a, K, V> RevIter<'a, K, V> {
164 fn new(iter: Iter<'a, K, V>) -> Self {
165 Self { iter }
166 }
167}
168
169impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for RevIter<'_, K, V> {
170 type Item = (K, V);
171
172 fn next(&mut self) -> Option<Self::Item> {
174 self.iter.next()
175 }
176}