1use std::{borrow::Borrow, collections::BTreeMap, path::PathBuf};
66
67use async_trait::async_trait;
68use collectable::TryExtend;
69use rocksdb::Options;
70use serde::{Serialize, de::DeserializeOwned};
71
72use crate::{
73 TypedStoreError,
74 rocks::{
75 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, RocksDBAccessType,
76 default_db_options,
77 keys::Keys,
78 safe_iter::{SafeIter as RocksDBIter, SafeRevIter},
79 values::Values,
80 },
81 test_db::{TestDB, TestDBIter, TestDBKeys, TestDBRevIter, TestDBValues, TestDBWriteBatch},
82 traits::{AsyncMap, Map},
83};
84
85pub enum SallyRunMode {
86 FallbackToDB,
91}
92
93pub struct SallyConfig {
94 pub mode: SallyRunMode,
95}
96
97impl Default for SallyConfig {
98 fn default() -> Self {
99 Self {
100 mode: SallyRunMode::FallbackToDB,
101 }
102 }
103}
104
105pub enum SallyColumn<K, V> {
110 RocksDB((DBMap<K, V>, SallyConfig)),
111 TestDB((TestDB<K, V>, SallyConfig)),
112}
113
114impl<K, V> SallyColumn<K, V> {
115 pub fn new_single_rocksdb(db: DBMap<K, V>) -> Self {
116 SallyColumn::RocksDB((db, SallyConfig::default()))
120 }
121 pub fn new_testdb(db: TestDB<K, V>) -> Self {
122 SallyColumn::TestDB((db, SallyConfig::default()))
123 }
124 pub fn batch(&self) -> SallyWriteBatch {
125 match self {
126 SallyColumn::RocksDB((
127 db_map,
128 SallyConfig {
129 mode: SallyRunMode::FallbackToDB,
130 },
131 )) => SallyWriteBatch::RocksDB(db_map.batch()),
132 SallyColumn::TestDB((
133 test_db,
134 SallyConfig {
135 mode: SallyRunMode::FallbackToDB,
136 },
137 )) => SallyWriteBatch::TestDB(test_db.batch()),
138 }
139 }
140}
141
142#[async_trait]
143impl<'a, K, V> AsyncMap<'a, K, V> for SallyColumn<K, V>
144where
145 K: Serialize + DeserializeOwned + std::marker::Sync,
146 V: Serialize + DeserializeOwned + std::marker::Sync + std::marker::Send,
147{
148 type Error = TypedStoreError;
149 type Iterator = SallyIter<'a, K, V>;
150 type Keys = SallyKeys<'a, K>;
151 type Values = SallyValues<'a, V>;
152
153 async fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
154 match self {
155 SallyColumn::RocksDB((
156 db_map,
157 SallyConfig {
158 mode: SallyRunMode::FallbackToDB,
159 },
160 )) => db_map.contains_key(key),
161 SallyColumn::TestDB((
162 test_db,
163 SallyConfig {
164 mode: SallyRunMode::FallbackToDB,
165 },
166 )) => test_db.contains_key(key),
167 }
168 }
169 async fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
170 match self {
171 SallyColumn::RocksDB((
172 db_map,
173 SallyConfig {
174 mode: SallyRunMode::FallbackToDB,
175 },
176 )) => db_map.get(key),
177 SallyColumn::TestDB((
178 test_db,
179 SallyConfig {
180 mode: SallyRunMode::FallbackToDB,
181 },
182 )) => test_db.get(key),
183 }
184 }
185 async fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
186 match self {
187 SallyColumn::RocksDB((
188 db_map,
189 SallyConfig {
190 mode: SallyRunMode::FallbackToDB,
191 },
192 )) => db_map.get_raw_bytes(key),
193 SallyColumn::TestDB((
194 test_db,
195 SallyConfig {
196 mode: SallyRunMode::FallbackToDB,
197 },
198 )) => test_db.get_raw_bytes(key),
199 }
200 }
201 async fn is_empty(&self) -> bool {
202 match self {
203 SallyColumn::RocksDB((
204 db_map,
205 SallyConfig {
206 mode: SallyRunMode::FallbackToDB,
207 },
208 )) => db_map.is_empty(),
209 SallyColumn::TestDB((
210 test_db,
211 SallyConfig {
212 mode: SallyRunMode::FallbackToDB,
213 },
214 )) => test_db.is_empty(),
215 }
216 }
217 async fn iter(&'a self) -> Self::Iterator {
218 match self {
219 SallyColumn::RocksDB((
220 db_map,
221 SallyConfig {
222 mode: SallyRunMode::FallbackToDB,
223 },
224 )) => SallyIter::RocksDB(db_map.safe_iter()),
225 SallyColumn::TestDB((
226 test_db,
227 SallyConfig {
228 mode: SallyRunMode::FallbackToDB,
229 },
230 )) => SallyIter::TestDB(test_db.safe_iter()),
231 }
232 }
233 async fn keys(&'a self) -> Self::Keys {
234 match self {
235 SallyColumn::RocksDB((
236 db_map,
237 SallyConfig {
238 mode: SallyRunMode::FallbackToDB,
239 },
240 )) => SallyKeys::RocksDB(db_map.keys()),
241 SallyColumn::TestDB((
242 test_db,
243 SallyConfig {
244 mode: SallyRunMode::FallbackToDB,
245 },
246 )) => SallyKeys::TestDB(test_db.keys()),
247 }
248 }
249 async fn values(&'a self) -> Self::Values {
250 match self {
251 SallyColumn::RocksDB((
252 db_map,
253 SallyConfig {
254 mode: SallyRunMode::FallbackToDB,
255 },
256 )) => SallyValues::RocksDB(db_map.values()),
257 SallyColumn::TestDB((
258 test_db,
259 SallyConfig {
260 mode: SallyRunMode::FallbackToDB,
261 },
262 )) => SallyValues::TestDB(test_db.values()),
263 }
264 }
265 async fn multi_get<J>(
266 &self,
267 keys: impl IntoIterator<Item = J> + std::marker::Send,
268 ) -> Result<Vec<Option<V>>, TypedStoreError>
269 where
270 J: Borrow<K>,
271 {
272 match self {
273 SallyColumn::RocksDB((
274 db_map,
275 SallyConfig {
276 mode: SallyRunMode::FallbackToDB,
277 },
278 )) => db_map.multi_get(keys),
279 SallyColumn::TestDB((
280 test_db,
281 SallyConfig {
282 mode: SallyRunMode::FallbackToDB,
283 },
284 )) => test_db.multi_get(keys),
285 }
286 }
287 async fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
288 match self {
289 SallyColumn::RocksDB((
290 db_map,
291 SallyConfig {
292 mode: SallyRunMode::FallbackToDB,
293 },
294 )) => Ok(db_map.try_catch_up_with_primary()?),
295 SallyColumn::TestDB((
296 test_db,
297 SallyConfig {
298 mode: SallyRunMode::FallbackToDB,
299 },
300 )) => Ok(test_db.try_catch_up_with_primary()?),
301 }
302 }
303}
304
305impl<J, K, U, V> TryExtend<(J, U)> for SallyColumn<K, V>
306where
307 J: Borrow<K> + std::clone::Clone,
308 U: Borrow<V> + std::clone::Clone,
309 K: Serialize,
310 V: Serialize,
311{
312 type Error = TypedStoreError;
313
314 fn try_extend<T>(&mut self, iter: &mut T) -> Result<(), Self::Error>
315 where
316 T: Iterator<Item = (J, U)>,
317 {
318 match self {
319 SallyColumn::RocksDB((
320 db_map,
321 SallyConfig {
322 mode: SallyRunMode::FallbackToDB,
323 },
324 )) => db_map.try_extend(iter),
325 SallyColumn::TestDB((
326 test_db,
327 SallyConfig {
328 mode: SallyRunMode::FallbackToDB,
329 },
330 )) => test_db.try_extend(iter),
331 }
332 }
333 fn try_extend_from_slice(&mut self, slice: &[(J, U)]) -> Result<(), Self::Error> {
334 match self {
335 SallyColumn::RocksDB((
336 db_map,
337 SallyConfig {
338 mode: SallyRunMode::FallbackToDB,
339 },
340 )) => db_map.try_extend_from_slice(slice),
341 SallyColumn::TestDB((
342 test_db,
343 SallyConfig {
344 mode: SallyRunMode::FallbackToDB,
345 },
346 )) => test_db.try_extend_from_slice(slice),
347 }
348 }
349}
350
351pub enum SallyWriteBatch {
356 RocksDB(DBBatch),
358 TestDB(TestDBWriteBatch),
360}
361
362impl SallyWriteBatch {
363 pub async fn write(self) -> Result<(), TypedStoreError> {
364 match self {
365 SallyWriteBatch::RocksDB(db_batch) => db_batch.write(),
366 SallyWriteBatch::TestDB(write_batch) => write_batch.write(),
367 }
368 }
369 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
371 &mut self,
372 db: &SallyColumn<K, V>,
373 purged_vals: impl IntoIterator<Item = J>,
374 ) -> Result<(), TypedStoreError> {
375 match (self, db) {
376 (SallyWriteBatch::RocksDB(db_batch), SallyColumn::RocksDB((db_map, _))) => {
377 db_batch.delete_batch(db_map, purged_vals)
378 }
379 (SallyWriteBatch::TestDB(write_batch), SallyColumn::TestDB((test_db, _))) => {
380 write_batch.delete_batch(test_db, purged_vals)
381 }
382 _ => unimplemented!(),
383 }
384 }
385 pub fn delete_range<K: Serialize, V>(
388 &mut self,
389 db: &SallyColumn<K, V>,
390 from: &K,
391 to: &K,
392 ) -> Result<(), TypedStoreError> {
393 match (self, db) {
394 (SallyWriteBatch::RocksDB(db_batch), SallyColumn::RocksDB((db_map, _))) => {
395 db_batch.schedule_delete_range(db_map, from, to)
396 }
397 (SallyWriteBatch::TestDB(write_batch), SallyColumn::TestDB((test_db, _))) => {
398 write_batch.delete_range(test_db, from, to)
399 }
400 _ => unimplemented!(),
401 }
402 }
403 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
405 &mut self,
406 db: &SallyColumn<K, V>,
407 new_vals: impl IntoIterator<Item = (J, U)>,
408 ) -> Result<(), TypedStoreError> {
409 match (self, db) {
410 (SallyWriteBatch::RocksDB(db_batch), SallyColumn::RocksDB((db_map, _))) => {
411 db_batch.insert_batch(db_map, new_vals)?;
412 Ok(())
413 }
414 (SallyWriteBatch::TestDB(write_batch), SallyColumn::TestDB((test_db, _))) => {
415 write_batch.insert_batch(test_db, new_vals)?;
416 Ok(())
417 }
418 _ => unimplemented!(),
419 }
420 }
421}
422
423pub enum SallyIter<'a, K, V> {
425 RocksDB(RocksDBIter<'a, K, V>),
427 TestDB(TestDBIter<'a, K, V>),
428}
429
430impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SallyIter<'_, K, V> {
431 type Item = Result<(K, V), TypedStoreError>;
432 fn next(&mut self) -> Option<Self::Item> {
433 match self {
434 SallyIter::RocksDB(iter) => iter.next(),
435 SallyIter::TestDB(iter) => iter.next(),
436 }
437 }
438}
439
440impl<'a, K: Serialize, V> SallyIter<'a, K, V> {
441 pub fn skip_to(self, key: &K) -> Result<Self, TypedStoreError> {
445 let iter = match self {
446 SallyIter::RocksDB(iter) => SallyIter::RocksDB(iter.skip_to(key)?),
447 SallyIter::TestDB(iter) => SallyIter::TestDB(iter.skip_to(key)?),
448 };
449 Ok(iter)
450 }
451
452 pub fn skip_prior_to(self, key: &K) -> Result<Self, TypedStoreError> {
456 let iter = match self {
457 SallyIter::RocksDB(iter) => SallyIter::RocksDB(iter.skip_prior_to(key)?),
458 SallyIter::TestDB(iter) => SallyIter::TestDB(iter.skip_prior_to(key)?),
459 };
460 Ok(iter)
461 }
462
463 pub fn skip_to_last(self) -> Self {
465 match self {
466 SallyIter::RocksDB(iter) => SallyIter::RocksDB(iter.skip_to_last()),
467 SallyIter::TestDB(iter) => SallyIter::TestDB(iter.skip_to_last()),
468 }
469 }
470
471 pub fn reverse(self) -> SallyRevIter<'a, K, V> {
475 match self {
476 SallyIter::RocksDB(iter) => SallyRevIter::RocksDB(iter.reverse()),
477 SallyIter::TestDB(iter) => SallyRevIter::TestDB(iter.reverse()),
478 }
479 }
480}
481
482pub enum SallyRevIter<'a, K, V> {
483 RocksDB(SafeRevIter<'a, K, V>),
485 TestDB(TestDBRevIter<'a, K, V>),
486}
487
488impl<K: DeserializeOwned, V: DeserializeOwned> Iterator for SallyRevIter<'_, K, V> {
489 type Item = Result<(K, V), TypedStoreError>;
490
491 fn next(&mut self) -> Option<Self::Item> {
493 match self {
494 SallyRevIter::RocksDB(rev_iter) => rev_iter.next(),
495 SallyRevIter::TestDB(rev_iter) => rev_iter.next(),
496 }
497 }
498}
499
500pub enum SallyKeys<'a, K> {
502 RocksDB(Keys<'a, K>),
504 TestDB(TestDBKeys<'a, K>),
505}
506
507impl<K: DeserializeOwned> Iterator for SallyKeys<'_, K> {
508 type Item = Result<K, TypedStoreError>;
509
510 fn next(&mut self) -> Option<Self::Item> {
511 match self {
512 SallyKeys::RocksDB(keys) => keys.next(),
513 SallyKeys::TestDB(iter) => iter.next(),
514 }
515 }
516}
517
518pub enum SallyValues<'a, V> {
520 RocksDB(Values<'a, V>),
522 TestDB(TestDBValues<'a, V>),
523}
524
525impl<V: DeserializeOwned> Iterator for SallyValues<'_, V> {
526 type Item = Result<V, TypedStoreError>;
527
528 fn next(&mut self) -> Option<Self::Item> {
529 match self {
530 SallyValues::RocksDB(values) => values.next(),
531 SallyValues::TestDB(iter) => iter.next(),
532 }
533 }
534}
535
536pub enum SallyDBOptions {
538 RocksDB(
540 (
541 PathBuf,
542 MetricConf,
543 RocksDBAccessType,
544 Option<Options>,
545 Option<DBMapTableConfigMap>,
546 ),
547 ),
548 TestDB,
549}
550
551pub enum SallyReadOnlyDBOptions {
554 RocksDB(Box<(PathBuf, MetricConf, Option<PathBuf>, Option<Options>)>),
556 TestDB,
557}
558
559#[derive(Clone)]
561pub enum SallyColumnOptions {
562 RocksDB(DBOptions),
564 TestDB,
565}
566
567impl SallyColumnOptions {
568 pub fn get_rocksdb_options(&self) -> Option<&DBOptions> {
569 match self {
570 SallyColumnOptions::RocksDB(db_options) => Some(db_options),
571 _ => None,
572 }
573 }
574}
575
576pub fn default_column_options() -> SallyColumnOptions {
579 SallyColumnOptions::RocksDB(default_db_options())
580}
581
582#[derive(Clone)]
583pub struct SallyDBConfigMap(BTreeMap<String, SallyColumnOptions>);
584impl SallyDBConfigMap {
585 pub fn new(map: BTreeMap<String, SallyColumnOptions>) -> Self {
586 Self(map)
587 }
588
589 pub fn to_map(&self) -> BTreeMap<String, SallyColumnOptions> {
590 self.0.clone()
591 }
592}