typed_store/rocks/
safe_iter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{marker::PhantomData, sync::Arc};
6
7use bincode::Options;
8use prometheus::{Histogram, HistogramTimer};
9use rocksdb::Direction;
10use serde::{Serialize, de::DeserializeOwned};
11
12use super::{RocksDBRawIter, TypedStoreError, be_fix_int_ser};
13use crate::metrics::{DBMetrics, RocksDBPerfContext};
14
15/// An iterator over all key-value pairs in a data map.
16pub struct SafeIter<'a, K, V> {
17    cf_name: String,
18    db_iter: RocksDBRawIter<'a>,
19    _phantom: PhantomData<(K, V)>,
20    direction: Direction,
21    is_initialized: bool,
22    _timer: Option<HistogramTimer>,
23    _perf_ctx: Option<RocksDBPerfContext>,
24    bytes_scanned: Option<Histogram>,
25    keys_scanned: Option<Histogram>,
26    db_metrics: Option<Arc<DBMetrics>>,
27    bytes_scanned_counter: usize,
28    keys_returned_counter: usize,
29}
30
31impl<'a, K: DeserializeOwned, V: DeserializeOwned> SafeIter<'a, K, V> {
32    pub(super) fn new(
33        cf_name: String,
34        db_iter: RocksDBRawIter<'a>,
35        _timer: Option<HistogramTimer>,
36        _perf_ctx: Option<RocksDBPerfContext>,
37        bytes_scanned: Option<Histogram>,
38        keys_scanned: Option<Histogram>,
39        db_metrics: Option<Arc<DBMetrics>>,
40    ) -> Self {
41        Self {
42            cf_name,
43            db_iter,
44            _phantom: PhantomData,
45            direction: Direction::Forward,
46            is_initialized: false,
47            _timer,
48            _perf_ctx,
49            bytes_scanned,
50            keys_scanned,
51            db_metrics,
52            bytes_scanned_counter: 0,
53            keys_returned_counter: 0,
54        }
55    }
56}
57
58impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SafeIter<'_, K, V> {
59    type Item = Result<(K, V), TypedStoreError>;
60
61    fn next(&mut self) -> Option<Self::Item> {
62        // Implicitly set iterator to the first entry in the column family if it hasn't
63        // been initialized used for backward compatibility
64        if !self.is_initialized {
65            self.db_iter.seek_to_first();
66            self.is_initialized = true;
67        }
68        if self.db_iter.valid() {
69            let config = bincode::DefaultOptions::new()
70                .with_big_endian()
71                .with_fixint_encoding();
72            let raw_key = self
73                .db_iter
74                .key()
75                .expect("Valid iterator failed to get key");
76            let raw_value = self
77                .db_iter
78                .value()
79                .expect("Valid iterator failed to get value");
80            self.bytes_scanned_counter += raw_key.len() + raw_value.len();
81            self.keys_returned_counter += 1;
82            let key = config.deserialize(raw_key).ok();
83            let value = bcs::from_bytes(raw_value).ok();
84            match self.direction {
85                Direction::Forward => self.db_iter.next(),
86                Direction::Reverse => self.db_iter.prev(),
87            }
88            key.and_then(|k| value.map(|v| Ok((k, v))))
89        } else {
90            match self.db_iter.status() {
91                Ok(_) => None,
92                Err(err) => Some(Err(TypedStoreError::RocksDB(format!("{err}")))),
93            }
94        }
95    }
96}
97
98impl<K, V> Drop for SafeIter<'_, K, V> {
99    fn drop(&mut self) {
100        if let Some(bytes_scanned) = self.bytes_scanned.take() {
101            bytes_scanned.observe(self.bytes_scanned_counter as f64);
102        }
103        if let Some(keys_scanned) = self.keys_scanned.take() {
104            keys_scanned.observe(self.keys_returned_counter as f64);
105        }
106        if let Some(db_metrics) = self.db_metrics.take() {
107            db_metrics
108                .read_perf_ctx_metrics
109                .report_metrics(&self.cf_name);
110        }
111    }
112}
113
114impl<'a, K: Serialize, V> SafeIter<'a, K, V> {
115    /// Skips all the elements that are smaller than the given key,
116    /// and either lands on the key or the first one greater than
117    /// the key.
118    pub fn skip_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
119        self.db_iter.seek(be_fix_int_ser(key)?);
120        self.is_initialized = true;
121        Ok(self)
122    }
123
124    /// Moves the iterator the element given or
125    /// the one prior to it if it does not exist. If there is
126    /// no element prior to it, it returns an empty iterator.
127    pub fn skip_prior_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
128        self.db_iter.seek_for_prev(be_fix_int_ser(key)?);
129        self.is_initialized = true;
130        Ok(self)
131    }
132
133    /// Seeks to the last key in the database (at this column family).
134    pub fn skip_to_last(mut self) -> Self {
135        self.db_iter.seek_to_last();
136        self.is_initialized = true;
137        self
138    }
139
140    /// Will make the direction of the iteration reverse and will
141    /// create a new `RevIter` to consume. Every call to `next` method
142    /// will give the next element from the end.
143    pub fn reverse(mut self) -> SafeRevIter<'a, K, V> {
144        self.direction = Direction::Reverse;
145        SafeRevIter::new(self)
146    }
147}
148
149/// An iterator with a reverted direction to the original. The `RevIter`
150/// is hosting an iteration which is consuming in the opposing direction.
151/// It's not possible to do further manipulation (ex re-reverse) to the
152/// iterator.
153pub struct SafeRevIter<'a, K, V> {
154    iter: SafeIter<'a, K, V>,
155}
156
157impl<'a, K, V> SafeRevIter<'a, K, V> {
158    fn new(iter: SafeIter<'a, K, V>) -> Self {
159        Self { iter }
160    }
161}
162
163impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SafeRevIter<'_, K, V> {
164    type Item = Result<(K, V), TypedStoreError>;
165
166    /// Will give the next item backwards
167    fn next(&mut self) -> Option<Self::Item> {
168        self.iter.next()
169    }
170}