typed_store/sally/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! Storage Atomicity Layer Library (aka Sally) is a wrapper around pluggable
6//! storage backends which implement a common key value interface. It enables
7//! users to switch storage backends in their code with simple options. It is
8//! also designed to be able to support atomic operations across different
9//! columns of the db even when they are backed by different storage instances.
10//!
11//! # Examples
12//!
13//! ```
14//! use typed_store::{
15//!     SallyDB,
16//!     rocks::*,
17//!     sally::{SallyColumn, SallyDBOptions},
18//!     test_db::*,
19//!     traits::{TableSummary, TypedStoreDebug},
20//!     *,
21//! };
22//!
23//! use crate::typed_store::Map;
24//!
25//! // `ExampleTable` is a sally db instance where each column is first initialized with TestDB
26//! // (btree map) backend and later switched to a RocksDB column family
27//!
28//! #[derive(SallyDB)]
29//! pub struct ExampleTable {
30//!     col1: SallyColumn<String, String>,
31//!     col2: SallyColumn<i32, String>,
32//! }
33//!
34//! async fn insert_key_vals(table: &ExampleTable) {
35//!     // create a write batch and do atomic commit across columns in the table
36//!     let keys_vals = (1..100).map(|i| (i, i.to_string()));
37//!     let mut wb = table.col1.batch();
38//!     wb.insert_batch(&table.col2, keys_vals)
39//!         .expect("Failed to batch insert");
40//!     wb.delete_range(&table.col2, &50, &100)
41//!         .expect("Failed to batch delete");
42//!     wb.write().await.expect("Failed to commit batch");
43//! }
44//!
45//! #[tokio::main]
46//! async fn main() -> Result<(), TypedStoreError> {
47//!     // use a btree map backend first
48//!     let mut table = ExampleTable::init(SallyDBOptions::TestDB);
49//!     insert_key_vals(&table).await;
50//!     // switch to rocksdb backend
51//!     let primary_path = tempfile::tempdir()
52//!         .expect("Failed to open db path")
53//!         .into_path();
54//!     table = ExampleTable::init(SallyDBOptions::RocksDB((
55//!         primary_path,
56//!         MetricConf::default(),
57//!         RocksDBAccessType::Primary,
58//!         None,
59//!         None,
60//!     )));
61//!     insert_key_vals(&table).await;
62//!     Ok(())
63//! }
64//! ```
65use std::{borrow::Borrow, collections::BTreeMap, path::PathBuf};
66
67use async_trait::async_trait;
68use collectable::TryExtend;
69use rocksdb::Options;
70use serde::{Serialize, de::DeserializeOwned};
71
72use crate::{
73    TypedStoreError,
74    rocks::{
75        DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, RocksDBAccessType,
76        default_db_options,
77        keys::Keys,
78        safe_iter::{SafeIter as RocksDBIter, SafeRevIter},
79        values::Values,
80    },
81    test_db::{TestDB, TestDBIter, TestDBKeys, TestDBRevIter, TestDBValues, TestDBWriteBatch},
82    traits::{AsyncMap, Map},
83};
84
85pub enum SallyRunMode {
86    // Whether Sally should use its own memtable and wal for read/write or just fallback to
87    // reading/writing directly from the backend db. When columns in the db are backed by
88    // different backend stores, we should never use `FallbackToDB` as that would lose
89    // atomicity, transactions and db recovery
90    FallbackToDB,
91}
92
93pub struct SallyConfig {
94    pub mode: SallyRunMode,
95}
96
97impl Default for SallyConfig {
98    fn default() -> Self {
99        Self {
100            mode: SallyRunMode::FallbackToDB,
101        }
102    }
103}
104
105/// A Sally column could be anything that implements key value interface. We
106/// will eventually have Sally serve read/writes using its own memtable and wal
107/// when columns in the db are backend by more then one backend store (e.g
108/// different rocksdb instances and/or distributed key value stores)
109pub enum SallyColumn<K, V> {
110    RocksDB((DBMap<K, V>, SallyConfig)),
111    TestDB((TestDB<K, V>, SallyConfig)),
112}
113
114impl<K, V> SallyColumn<K, V> {
115    pub fn new_single_rocksdb(db: DBMap<K, V>) -> Self {
116        // When all columns in the db are backed by a single rocksdb instance, we will
117        // fallback to using native rocksdb read and write apis and use default
118        // config
119        SallyColumn::RocksDB((db, SallyConfig::default()))
120    }
121    pub fn new_testdb(db: TestDB<K, V>) -> Self {
122        SallyColumn::TestDB((db, SallyConfig::default()))
123    }
124    pub fn batch(&self) -> SallyWriteBatch {
125        match self {
126            SallyColumn::RocksDB((
127                db_map,
128                SallyConfig {
129                    mode: SallyRunMode::FallbackToDB,
130                },
131            )) => SallyWriteBatch::RocksDB(db_map.batch()),
132            SallyColumn::TestDB((
133                test_db,
134                SallyConfig {
135                    mode: SallyRunMode::FallbackToDB,
136                },
137            )) => SallyWriteBatch::TestDB(test_db.batch()),
138        }
139    }
140}
141
142#[async_trait]
143impl<'a, K, V> AsyncMap<'a, K, V> for SallyColumn<K, V>
144where
145    K: Serialize + DeserializeOwned + std::marker::Sync,
146    V: Serialize + DeserializeOwned + std::marker::Sync + std::marker::Send,
147{
148    type Error = TypedStoreError;
149    type Iterator = SallyIter<'a, K, V>;
150    type Keys = SallyKeys<'a, K>;
151    type Values = SallyValues<'a, V>;
152
153    async fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
154        match self {
155            SallyColumn::RocksDB((
156                db_map,
157                SallyConfig {
158                    mode: SallyRunMode::FallbackToDB,
159                },
160            )) => db_map.contains_key(key),
161            SallyColumn::TestDB((
162                test_db,
163                SallyConfig {
164                    mode: SallyRunMode::FallbackToDB,
165                },
166            )) => test_db.contains_key(key),
167        }
168    }
169    async fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
170        match self {
171            SallyColumn::RocksDB((
172                db_map,
173                SallyConfig {
174                    mode: SallyRunMode::FallbackToDB,
175                },
176            )) => db_map.get(key),
177            SallyColumn::TestDB((
178                test_db,
179                SallyConfig {
180                    mode: SallyRunMode::FallbackToDB,
181                },
182            )) => test_db.get(key),
183        }
184    }
185    async fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
186        match self {
187            SallyColumn::RocksDB((
188                db_map,
189                SallyConfig {
190                    mode: SallyRunMode::FallbackToDB,
191                },
192            )) => db_map.get_raw_bytes(key),
193            SallyColumn::TestDB((
194                test_db,
195                SallyConfig {
196                    mode: SallyRunMode::FallbackToDB,
197                },
198            )) => test_db.get_raw_bytes(key),
199        }
200    }
201    async fn is_empty(&self) -> bool {
202        match self {
203            SallyColumn::RocksDB((
204                db_map,
205                SallyConfig {
206                    mode: SallyRunMode::FallbackToDB,
207                },
208            )) => db_map.is_empty(),
209            SallyColumn::TestDB((
210                test_db,
211                SallyConfig {
212                    mode: SallyRunMode::FallbackToDB,
213                },
214            )) => test_db.is_empty(),
215        }
216    }
217    async fn iter(&'a self) -> Self::Iterator {
218        match self {
219            SallyColumn::RocksDB((
220                db_map,
221                SallyConfig {
222                    mode: SallyRunMode::FallbackToDB,
223                },
224            )) => SallyIter::RocksDB(db_map.safe_iter()),
225            SallyColumn::TestDB((
226                test_db,
227                SallyConfig {
228                    mode: SallyRunMode::FallbackToDB,
229                },
230            )) => SallyIter::TestDB(test_db.safe_iter()),
231        }
232    }
233    async fn keys(&'a self) -> Self::Keys {
234        match self {
235            SallyColumn::RocksDB((
236                db_map,
237                SallyConfig {
238                    mode: SallyRunMode::FallbackToDB,
239                },
240            )) => SallyKeys::RocksDB(db_map.keys()),
241            SallyColumn::TestDB((
242                test_db,
243                SallyConfig {
244                    mode: SallyRunMode::FallbackToDB,
245                },
246            )) => SallyKeys::TestDB(test_db.keys()),
247        }
248    }
249    async fn values(&'a self) -> Self::Values {
250        match self {
251            SallyColumn::RocksDB((
252                db_map,
253                SallyConfig {
254                    mode: SallyRunMode::FallbackToDB,
255                },
256            )) => SallyValues::RocksDB(db_map.values()),
257            SallyColumn::TestDB((
258                test_db,
259                SallyConfig {
260                    mode: SallyRunMode::FallbackToDB,
261                },
262            )) => SallyValues::TestDB(test_db.values()),
263        }
264    }
265    async fn multi_get<J>(
266        &self,
267        keys: impl IntoIterator<Item = J> + std::marker::Send,
268    ) -> Result<Vec<Option<V>>, TypedStoreError>
269    where
270        J: Borrow<K>,
271    {
272        match self {
273            SallyColumn::RocksDB((
274                db_map,
275                SallyConfig {
276                    mode: SallyRunMode::FallbackToDB,
277                },
278            )) => db_map.multi_get(keys),
279            SallyColumn::TestDB((
280                test_db,
281                SallyConfig {
282                    mode: SallyRunMode::FallbackToDB,
283                },
284            )) => test_db.multi_get(keys),
285        }
286    }
287    async fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
288        match self {
289            SallyColumn::RocksDB((
290                db_map,
291                SallyConfig {
292                    mode: SallyRunMode::FallbackToDB,
293                },
294            )) => Ok(db_map.try_catch_up_with_primary()?),
295            SallyColumn::TestDB((
296                test_db,
297                SallyConfig {
298                    mode: SallyRunMode::FallbackToDB,
299                },
300            )) => Ok(test_db.try_catch_up_with_primary()?),
301        }
302    }
303}
304
305impl<J, K, U, V> TryExtend<(J, U)> for SallyColumn<K, V>
306where
307    J: Borrow<K> + std::clone::Clone,
308    U: Borrow<V> + std::clone::Clone,
309    K: Serialize,
310    V: Serialize,
311{
312    type Error = TypedStoreError;
313
314    fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
315    where
316        T: Iterator<Item = (J, U)>,
317    {
318        match self {
319            SallyColumn::RocksDB((
320                db_map,
321                SallyConfig {
322                    mode: SallyRunMode::FallbackToDB,
323                },
324            )) => db_map.try_extend(iter),
325            SallyColumn::TestDB((
326                test_db,
327                SallyConfig {
328                    mode: SallyRunMode::FallbackToDB,
329                },
330            )) => test_db.try_extend(iter),
331        }
332    }
333    fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
334        match self {
335            SallyColumn::RocksDB((
336                db_map,
337                SallyConfig {
338                    mode: SallyRunMode::FallbackToDB,
339                },
340            )) => db_map.try_extend_from_slice(slice),
341            SallyColumn::TestDB((
342                test_db,
343                SallyConfig {
344                    mode: SallyRunMode::FallbackToDB,
345                },
346            )) => test_db.try_extend_from_slice(slice),
347        }
348    }
349}
350
351/// A Sally write batch provides a mutable struct which holds a collection of db
352/// mutation operations and applies them atomically to the db.
353/// Once sally has its own memtable and wal, atomic commits across multiple db
354/// instances will be possible.
355pub enum SallyWriteBatch {
356    // Write batch for RocksDB backend when `fallback_to_db` is set as true
357    RocksDB(DBBatch),
358    // Write batch for btree map based backend
359    TestDB(TestDBWriteBatch),
360}
361
362impl SallyWriteBatch {
363    pub async fn write(self) -> Result<(), TypedStoreError> {
364        match self {
365            SallyWriteBatch::RocksDB(db_batch) => db_batch.write(),
366            SallyWriteBatch::TestDB(write_batch) => write_batch.write(),
367        }
368    }
369    /// Deletes a set of keys given as an iterator
370    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
371        &mut self,
372        db: &SallyColumn<K, V>,
373        purged_vals: impl IntoIterator<Item = J>,
374    ) -> Result<(), TypedStoreError> {
375        match (self, db) {
376            (SallyWriteBatch::RocksDB(db_batch), SallyColumn::RocksDB((db_map, _))) => {
377                db_batch.delete_batch(db_map, purged_vals)
378            }
379            (SallyWriteBatch::TestDB(write_batch), SallyColumn::TestDB((test_db, _))) => {
380                write_batch.delete_batch(test_db, purged_vals)
381            }
382            _ => unimplemented!(),
383        }
384    }
385    /// Deletes a range of keys between `from` (inclusive) and `to`
386    /// (non-inclusive)
387    pub fn delete_range<K: Serialize, V>(
388        &mut self,
389        db: &SallyColumn<K, V>,
390        from: &K,
391        to: &K,
392    ) -> Result<(), TypedStoreError> {
393        match (self, db) {
394            (SallyWriteBatch::RocksDB(db_batch), SallyColumn::RocksDB((db_map, _))) => {
395                db_batch.schedule_delete_range(db_map, from, to)
396            }
397            (SallyWriteBatch::TestDB(write_batch), SallyColumn::TestDB((test_db, _))) => {
398                write_batch.delete_range(test_db, from, to)
399            }
400            _ => unimplemented!(),
401        }
402    }
403    /// inserts a range of (key, value) pairs given as an iterator
404    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
405        &mut self,
406        db: &SallyColumn<K, V>,
407        new_vals: impl IntoIterator<Item = (J, U)>,
408    ) -> Result<(), TypedStoreError> {
409        match (self, db) {
410            (SallyWriteBatch::RocksDB(db_batch), SallyColumn::RocksDB((db_map, _))) => {
411                db_batch.insert_batch(db_map, new_vals)?;
412                Ok(())
413            }
414            (SallyWriteBatch::TestDB(write_batch), SallyColumn::TestDB((test_db, _))) => {
415                write_batch.insert_batch(test_db, new_vals)?;
416                Ok(())
417            }
418            _ => unimplemented!(),
419        }
420    }
421}
422
423/// A SallyIter provides an iterator over all key values in a sally column
424pub enum SallyIter<'a, K, V> {
425    // Iter for a rocksdb backed sally column when `fallback_to_db` is true
426    RocksDB(RocksDBIter<'a, K, V>),
427    TestDB(TestDBIter<'a, K, V>),
428}
429
430impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SallyIter<'_, K, V> {
431    type Item = Result<(K, V), TypedStoreError>;
432    fn next(&mut self) -> Option<Self::Item> {
433        match self {
434            SallyIter::RocksDB(iter) => iter.next(),
435            SallyIter::TestDB(iter) => iter.next(),
436        }
437    }
438}
439
440impl<'a, K: Serialize, V> SallyIter<'a, K, V> {
441    /// Skips all the elements that are smaller than the given key,
442    /// and either lands on the key or the first one greater than
443    /// the key.
444    pub fn skip_to(self, key: &K) -> Result<Self, TypedStoreError> {
445        let iter = match self {
446            SallyIter::RocksDB(iter) => SallyIter::RocksDB(iter.skip_to(key)?),
447            SallyIter::TestDB(iter) => SallyIter::TestDB(iter.skip_to(key)?),
448        };
449        Ok(iter)
450    }
451
452    /// Moves the iterator the element given or
453    /// the one prior to it if it does not exist. If there is
454    /// no element prior to it, it returns an empty iterator.
455    pub fn skip_prior_to(self, key: &K) -> Result<Self, TypedStoreError> {
456        let iter = match self {
457            SallyIter::RocksDB(iter) => SallyIter::RocksDB(iter.skip_prior_to(key)?),
458            SallyIter::TestDB(iter) => SallyIter::TestDB(iter.skip_prior_to(key)?),
459        };
460        Ok(iter)
461    }
462
463    /// Seeks to the last key in the database (at this column family).
464    pub fn skip_to_last(self) -> Self {
465        match self {
466            SallyIter::RocksDB(iter) => SallyIter::RocksDB(iter.skip_to_last()),
467            SallyIter::TestDB(iter) => SallyIter::TestDB(iter.skip_to_last()),
468        }
469    }
470
471    /// Will make the direction of the iteration reverse and will
472    /// create a new `RevIter` to consume. Every call to `next` method
473    /// will give the next element from the end.
474    pub fn reverse(self) -> SallyRevIter<'a, K, V> {
475        match self {
476            SallyIter::RocksDB(iter) => SallyRevIter::RocksDB(iter.reverse()),
477            SallyIter::TestDB(iter) => SallyRevIter::TestDB(iter.reverse()),
478        }
479    }
480}
481
482pub enum SallyRevIter<'a, K, V> {
483    // Iter for a rocksdb backed sally column when `fallback_to_db` is true
484    RocksDB(SafeRevIter<'a, K, V>),
485    TestDB(TestDBRevIter<'a, K, V>),
486}
487
488impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SallyRevIter<'_, K, V> {
489    type Item = Result<(K, V), TypedStoreError>;
490
491    /// Will give the next item backwards
492    fn next(&mut self) -> Option<Self::Item> {
493        match self {
494            SallyRevIter::RocksDB(rev_iter) => rev_iter.next(),
495            SallyRevIter::TestDB(rev_iter) => rev_iter.next(),
496        }
497    }
498}
499
500/// A SallyIter provides an iterator over all keys in a sally column
501pub enum SallyKeys<'a, K> {
502    // Iter for a rocksdb backed sally column when `fallback_to_db` is true
503    RocksDB(Keys<'a, K>),
504    TestDB(TestDBKeys<'a, K>),
505}
506
507impl<K: DeserializeOwned> Iterator for SallyKeys<'_, K> {
508    type Item = Result<K, TypedStoreError>;
509
510    fn next(&mut self) -> Option<Self::Item> {
511        match self {
512            SallyKeys::RocksDB(keys) => keys.next(),
513            SallyKeys::TestDB(iter) => iter.next(),
514        }
515    }
516}
517
518/// A SallyIter provides an iterator over all values in a sally column
519pub enum SallyValues<'a, V> {
520    // Iter for a rocksdb backed sally column when `fallback_to_db` is true
521    RocksDB(Values<'a, V>),
522    TestDB(TestDBValues<'a, V>),
523}
524
525impl<V: DeserializeOwned> Iterator for SallyValues<'_, V> {
526    type Item = Result<V, TypedStoreError>;
527
528    fn next(&mut self) -> Option<Self::Item> {
529        match self {
530            SallyValues::RocksDB(values) => values.next(),
531            SallyValues::TestDB(iter) => iter.next(),
532        }
533    }
534}
535
536/// Options to configure a sally db instance at the global level
537pub enum SallyDBOptions {
538    // Options when sally db instance is backed by a single rocksdb instance
539    RocksDB(
540        (
541            PathBuf,
542            MetricConf,
543            RocksDBAccessType,
544            Option<Options>,
545            Option<DBMapTableConfigMap>,
546        ),
547    ),
548    TestDB,
549}
550
551/// Options to configure a sally db instance for performing read only operations
552/// at the global level
553pub enum SallyReadOnlyDBOptions {
554    // Options when sally db instance is backed by a single rocksdb instance
555    RocksDB(Box<(PathBuf, MetricConf, Option<PathBuf>, Option<Options>)>),
556    TestDB,
557}
558
559/// Options to configure an individual column in a sally db instance
560#[derive(Clone)]
561pub enum SallyColumnOptions {
562    // Options to configure a rocksdb column family backed sally column
563    RocksDB(DBOptions),
564    TestDB,
565}
566
567impl SallyColumnOptions {
568    pub fn get_rocksdb_options(&self) -> Option<&DBOptions> {
569        match self {
570            SallyColumnOptions::RocksDB(db_options) => Some(db_options),
571            _ => None,
572        }
573    }
574}
575
576/// Creates a default RocksDB option, to be used when RocksDB option is not
577/// specified..
578pub fn default_column_options() -> SallyColumnOptions {
579    SallyColumnOptions::RocksDB(default_db_options())
580}
581
582#[derive(Clone)]
583pub struct SallyDBConfigMap(BTreeMap<String, SallyColumnOptions>);
584impl SallyDBConfigMap {
585    pub fn new(map: BTreeMap<String, SallyColumnOptions>) -> Self {
586        Self(map)
587    }
588
589    pub fn to_map(&self) -> BTreeMap<String, SallyColumnOptions> {
590        self.0.clone()
591    }
592}