typed_store/rocks/
iter.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{marker::PhantomData, sync::Arc};
6
7use bincode::Options;
8use prometheus::{Histogram, HistogramTimer};
9use rocksdb::Direction;
10use serde::{Serialize, de::DeserializeOwned};
11
12use super::{RocksDBRawIter, TypedStoreError, be_fix_int_ser};
13use crate::{DBMetrics, metrics::RocksDBPerfContext};
14
15/// An iterator over all key-value pairs in a data map.
16pub struct Iter<'a, K, V> {
17    cf_name: String,
18    db_iter: RocksDBRawIter<'a>,
19    // *const here is an equivalent to `impl !Send for Iter` (which is not a stable feature at the
20    // moment)
21    _phantom: PhantomData<*const (K, V)>,
22    direction: Direction,
23    is_initialized: bool,
24    _timer: Option<HistogramTimer>,
25    _perf_ctx: Option<RocksDBPerfContext>,
26    bytes_scanned: Option<Histogram>,
27    keys_scanned: Option<Histogram>,
28    db_metrics: Option<Arc<DBMetrics>>,
29    bytes_scanned_counter: usize,
30    keys_returned_counter: usize,
31}
32
33impl<'a, K: DeserializeOwned, V: DeserializeOwned> Iter<'a, K, V> {
34    pub(super) fn new(
35        cf_name: String,
36        db_iter: RocksDBRawIter<'a>,
37        _timer: Option<HistogramTimer>,
38        _perf_ctx: Option<RocksDBPerfContext>,
39        bytes_scanned: Option<Histogram>,
40        keys_scanned: Option<Histogram>,
41        db_metrics: Option<Arc<DBMetrics>>,
42    ) -> Self {
43        Self {
44            cf_name,
45            db_iter,
46            _phantom: PhantomData,
47            direction: Direction::Forward,
48            is_initialized: false,
49            _timer,
50            _perf_ctx,
51            bytes_scanned,
52            keys_scanned,
53            db_metrics,
54            bytes_scanned_counter: 0,
55            keys_returned_counter: 0,
56        }
57    }
58}
59
60impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for Iter<'_, K, V> {
61    type Item = (K, V);
62
63    fn next(&mut self) -> Option<Self::Item> {
64        // implicitly set iterator to the first entry in the column family if it hasn't
65        // been initialized used for backward compatibility
66        if !self.is_initialized {
67            self.db_iter.seek_to_first();
68            self.is_initialized = true;
69        }
70        if self.db_iter.valid() {
71            let config = bincode::DefaultOptions::new()
72                .with_big_endian()
73                .with_fixint_encoding();
74            let raw_key = self
75                .db_iter
76                .key()
77                .expect("Valid iterator failed to get key");
78            let raw_value = self
79                .db_iter
80                .value()
81                .expect("Valid iterator failed to get value");
82            self.bytes_scanned_counter += raw_key.len() + raw_value.len();
83            self.keys_returned_counter += 1;
84            let key = config.deserialize(raw_key).ok();
85            let value = bcs::from_bytes(raw_value).ok();
86            match self.direction {
87                Direction::Forward => self.db_iter.next(),
88                Direction::Reverse => self.db_iter.prev(),
89            }
90            key.and_then(|k| value.map(|v| (k, v)))
91        } else {
92            None
93        }
94    }
95}
96
97impl<K, V> Drop for Iter<'_, K, V> {
98    fn drop(&mut self) {
99        if let Some(bytes_scanned) = self.bytes_scanned.take() {
100            bytes_scanned.observe(self.bytes_scanned_counter as f64);
101        }
102        if let Some(keys_scanned) = self.keys_scanned.take() {
103            keys_scanned.observe(self.keys_returned_counter as f64);
104        }
105        if let Some(db_metrics) = self.db_metrics.take() {
106            db_metrics
107                .read_perf_ctx_metrics
108                .report_metrics(&self.cf_name);
109        }
110    }
111}
112
113impl<'a, K: Serialize, V> Iter<'a, K, V> {
114    /// Skips all the elements that are smaller than the given key,
115    /// and either lands on the key or the first one greater than
116    /// the key.
117    pub fn skip_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
118        self.is_initialized = true;
119        self.db_iter.seek(be_fix_int_ser(key)?);
120        Ok(self)
121    }
122
123    /// Moves the iterator the element given or
124    /// the one prior to it if it does not exist. If there is
125    /// no element prior to it, it returns an empty iterator.
126    pub fn skip_prior_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
127        self.is_initialized = true;
128        self.db_iter.seek_for_prev(be_fix_int_ser(key)?);
129        Ok(self)
130    }
131
132    /// Seeks to the last key in the database (at this column family).
133    pub fn skip_to_last(mut self) -> Self {
134        self.is_initialized = true;
135        self.db_iter.seek_to_last();
136        self
137    }
138
139    /// Seeks to the first key in the database (at this column family).
140    pub fn seek_to_first(mut self) -> Self {
141        self.is_initialized = true;
142        self.db_iter.seek_to_first();
143        self
144    }
145
146    /// Will make the direction of the iteration reverse and will
147    /// create a new `RevIter` to consume. Every call to `next` method
148    /// will give the next element from the end.
149    pub fn reverse(mut self) -> RevIter<'a, K, V> {
150        self.direction = Direction::Reverse;
151        RevIter::new(self)
152    }
153}
154
155/// An iterator with a reverted direction to the original. The `RevIter`
156/// is hosting an iteration which is consuming in the opposing direction.
157/// It's not possible to do further manipulation (ex re-reverse) to the
158/// iterator.
159pub struct RevIter<'a, K, V> {
160    iter: Iter<'a, K, V>,
161}
162
163impl<'a, K, V> RevIter<'a, K, V> {
164    fn new(iter: Iter<'a, K, V>) -> Self {
165        Self { iter }
166    }
167}
168
169impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for RevIter<'_, K, V> {
170    type Item = (K, V);
171
172    /// Will give the next item backwards
173    fn next(&mut self) -> Option<Self::Item> {
174        self.iter.next()
175    }
176}