iota_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::Path;
6
7use iota_types::{
8    accumulator::Accumulator, base_types::SequenceNumber, digests::TransactionEventsDigest,
9    effects::TransactionEffects, storage::MarkerValue,
10};
11use serde::{Deserialize, Serialize};
12use tracing::error;
13use typed_store::{
14    DBMapUtils,
15    metrics::SamplingInterval,
16    rocks::{
17        DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
18        read_size_from_env,
19    },
20    rocksdb::compaction_filter::Decision,
21    traits::{Map, TableSummary, TypedStoreDebug},
22};
23
24use super::*;
25use crate::authority::{
26    authority_store_pruner::ObjectsCompactionFilter,
27    authority_store_types::{
28        StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
29    },
30    epoch_start_configuration::EpochStartConfiguration,
31};
32
33const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
34pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
35const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
36const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
37const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
38
39/// Options to apply to every column family of the `perpetual` DB.
40#[derive(Default)]
41pub struct AuthorityPerpetualTablesOptions {
42    /// Whether to enable write stalling on all column families.
43    pub enable_write_stall: bool,
44    pub compaction_filter: Option<ObjectsCompactionFilter>,
45}
46
47impl AuthorityPerpetualTablesOptions {
48    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
49        if !self.enable_write_stall {
50            db_options = db_options.disable_write_throttling();
51        }
52        db_options
53    }
54}
55
56/// AuthorityPerpetualTables contains data that must be preserved from one epoch
57/// to the next.
58#[derive(DBMapUtils)]
59pub struct AuthorityPerpetualTables {
60    /// This is a map between the object (ID, version) and the latest state of
61    /// the object, namely the state that is needed to process new
62    /// transactions. State is represented by `StoreObject` enum, which is
63    /// either a move module or a move object.
64    ///
65    /// Note that while this map can store all versions of an object, we will
66    /// eventually prune old object versions from the db.
67    ///
68    /// IMPORTANT: object versions must *only* be pruned if they appear as
69    /// inputs in some TransactionEffects. Simply pruning all objects but
70    /// the most recent is an error! This is because there can be partially
71    /// executed transactions whose effects have not yet been written out,
72    /// and which must be retried. But, they cannot be retried unless their
73    /// input objects are still accessible!
74    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
75
76    /// Object references of currently active objects that can be mutated.
77    pub(crate) live_owned_object_markers: DBMap<ObjectRef, ()>,
78
79    /// This is a map between the transaction digest and the corresponding
80    /// transaction that's known to be executable. This means that it may
81    /// have been executed locally, or it may have been synced through
82    /// state-sync but hasn't been executed yet.
83    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
84
85    /// A map between the transaction digest of a certificate to the effects of
86    /// its execution. We store effects into this table in two different
87    /// cases:
88    /// 1. When a transaction is synced through state_sync, we store the effects
89    ///    here. These effects are known to be final in the network, but may not
90    ///    have been executed locally yet.
91    /// 2. When the transaction is executed locally on this node, we store the
92    ///    effects here. This means that it's possible to store the same effects
93    ///    twice (once for the synced transaction, and once for the executed).
94    ///
95    /// It's also possible for the effects to be reverted if the transaction
96    /// didn't make it into the epoch.
97    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
98
99    /// Transactions that have been executed locally on this node. We need this
100    /// table since the `effects` table doesn't say anything about the
101    /// execution status of the transaction on this node. When we wait for
102    /// transactions to be executed, we wait for them to appear in this
103    /// table. When we revert transactions, we remove them from both tables.
104    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
105
106    // Currently this is needed in the validator for returning events during process certificates.
107    // We could potentially remove this if we decided not to provide events in the execution path.
108    // TODO: Figure out what to do with this table in the long run.
109    // Also we need a pruning policy for this table. We can prune this table along with tx/effects.
110    pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
111
112    /// Epoch and checkpoint of transactions finalized by checkpoint
113    /// executor. Currently, mainly used to implement JSON RPC `ReadApi`.
114    /// Note, there is a table with the same name in
115    /// `AuthorityEpochTables`/`AuthorityPerEpochStore`.
116    pub(crate) executed_transactions_to_checkpoint:
117        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
118
119    // Finalized root state accumulator for epoch, to be included in CheckpointSummary
120    // of last checkpoint of epoch. These values should only ever be written once
121    // and never changed
122    pub(crate) root_state_hash_by_epoch: DBMap<EpochId, (CheckpointSequenceNumber, Accumulator)>,
123
124    /// Parameters of the system fixed at the epoch start
125    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
126
127    /// A singleton table that stores latest pruned checkpoint. Used to keep
128    /// objects pruner progress
129    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
130
131    /// The total IOTA supply and the epoch at which it was stored.
132    /// We check and update it at the end of each epoch if expensive checks are
133    /// enabled.
134    pub(crate) total_iota_supply: DBMap<(), TotalIotaSupplyCheck>,
135
136    /// Expected imbalance between storage fund balance and the sum of storage
137    /// rebate of all live objects. This could be non-zero due to bugs in
138    /// earlier protocol versions. This number is the result of
139    /// storage_fund_balance - sum(storage_rebate).
140    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
141
142    /// Table that stores the set of received objects and deleted objects and
143    /// the version at which they were received. This is used to prevent
144    /// possible race conditions around receiving objects (since they are
145    /// not locked by the transaction manager) and for tracking shared
146    /// objects that have been deleted. This table is meant to be pruned
147    /// per-epoch, and all previous epochs other than the current epoch may
148    /// be pruned safely.
149    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
150}
151
152#[derive(DBMapUtils)]
153pub struct AuthorityPrunerTables {
154    pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
155}
156
157impl AuthorityPrunerTables {
158    pub fn path(parent_path: &Path) -> PathBuf {
159        parent_path.join("pruner")
160    }
161
162    pub fn open(parent_path: &Path) -> Self {
163        Self::open_tables_read_write(
164            Self::path(parent_path),
165            MetricConf::new("pruner")
166                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
167            None,
168            None,
169        )
170    }
171}
172
173/// The total IOTA supply used during conservation checks.
174#[derive(Debug, Serialize, Deserialize)]
175pub(crate) struct TotalIotaSupplyCheck {
176    /// The IOTA supply at the time of `last_check_epoch`.
177    pub(crate) total_supply: u64,
178    /// The epoch at which the total supply was last checked or updated.
179    pub(crate) last_check_epoch: EpochId,
180}
181
182impl AuthorityPerpetualTables {
183    pub fn path(parent_path: &Path) -> PathBuf {
184        parent_path.join("perpetual")
185    }
186
187    pub fn open(
188        parent_path: &Path,
189        db_options_override: Option<AuthorityPerpetualTablesOptions>,
190    ) -> Self {
191        let db_options_override = db_options_override.unwrap_or_default();
192        let db_options =
193            db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
194        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
195            (
196                "objects".to_string(),
197                objects_table_config(db_options.clone(), db_options_override.compaction_filter),
198            ),
199            (
200                "live_owned_object_markers".to_string(),
201                live_owned_object_markers_table_config(db_options.clone()),
202            ),
203            (
204                "transactions".to_string(),
205                transactions_table_config(db_options.clone()),
206            ),
207            (
208                "effects".to_string(),
209                effects_table_config(db_options.clone()),
210            ),
211            (
212                "events".to_string(),
213                events_table_config(db_options.clone()),
214            ),
215        ]));
216        Self::open_tables_read_write(
217            Self::path(parent_path),
218            MetricConf::new("perpetual")
219                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
220            Some(db_options.options),
221            Some(table_options),
222        )
223    }
224
225    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
226        Self::get_read_only_handle(
227            Self::path(parent_path),
228            None,
229            None,
230            MetricConf::new("perpetual_readonly"),
231        )
232    }
233
234    // This is used by indexer to find the correct version of dynamic field child
235    // object. We do not store the version of the child object, but because of
236    // lamport timestamp, we know the child must have version number less then
237    // or eq to the parent.
238    pub fn find_object_lt_or_eq_version(
239        &self,
240        object_id: ObjectID,
241        version: SequenceNumber,
242    ) -> IotaResult<Option<Object>> {
243        let mut iter = self.objects.reversed_safe_iter_with_bounds(
244            Some(ObjectKey::min_for_id(&object_id)),
245            Some(ObjectKey(object_id, version)),
246        )?;
247        match iter.next() {
248            Some(Ok((key, o))) => self.object(&key, o),
249            Some(Err(e)) => Err(e.into()),
250            None => Ok(None),
251        }
252    }
253
254    fn construct_object(
255        &self,
256        object_key: &ObjectKey,
257        store_object: StoreObjectValue,
258    ) -> Result<Object, IotaError> {
259        try_construct_object(object_key, store_object)
260    }
261
262    // Constructs `iota_types::object::Object` from `StoreObjectWrapper`.
263    // Returns `None` if object was deleted/wrapped
264    pub fn object(
265        &self,
266        object_key: &ObjectKey,
267        store_object: StoreObjectWrapper,
268    ) -> Result<Option<Object>, IotaError> {
269        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
270            return Ok(None);
271        };
272        Ok(Some(self.construct_object(object_key, store_object)?))
273    }
274
275    pub fn object_reference(
276        &self,
277        object_key: &ObjectKey,
278        store_object: StoreObjectWrapper,
279    ) -> Result<ObjectRef, IotaError> {
280        let obj_ref = match store_object.migrate().into_inner() {
281            StoreObject::Value(object) => self
282                .construct_object(object_key, object)?
283                .compute_object_reference(),
284            StoreObject::Deleted => (
285                object_key.0,
286                object_key.1,
287                ObjectDigest::OBJECT_DIGEST_DELETED,
288            ),
289            StoreObject::Wrapped => (
290                object_key.0,
291                object_key.1,
292                ObjectDigest::OBJECT_DIGEST_WRAPPED,
293            ),
294        };
295        Ok(obj_ref)
296    }
297
298    pub fn tombstone_reference(
299        &self,
300        object_key: &ObjectKey,
301        store_object: &StoreObjectWrapper,
302    ) -> Result<Option<ObjectRef>, IotaError> {
303        let obj_ref = match store_object.inner() {
304            StoreObject::Deleted => Some((
305                object_key.0,
306                object_key.1,
307                ObjectDigest::OBJECT_DIGEST_DELETED,
308            )),
309            StoreObject::Wrapped => Some((
310                object_key.0,
311                object_key.1,
312                ObjectDigest::OBJECT_DIGEST_WRAPPED,
313            )),
314            _ => None,
315        };
316        Ok(obj_ref)
317    }
318
319    pub fn get_latest_object_ref_or_tombstone(
320        &self,
321        object_id: ObjectID,
322    ) -> Result<Option<ObjectRef>, IotaError> {
323        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
324            Some(ObjectKey::min_for_id(&object_id)),
325            Some(ObjectKey::max_for_id(&object_id)),
326        )?;
327
328        if let Some(Ok((object_key, value))) = iterator.next() {
329            if object_key.0 == object_id {
330                return Ok(Some(self.object_reference(&object_key, value)?));
331            }
332        }
333        Ok(None)
334    }
335
336    pub fn get_latest_object_or_tombstone(
337        &self,
338        object_id: ObjectID,
339    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, IotaError> {
340        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
341            Some(ObjectKey::min_for_id(&object_id)),
342            Some(ObjectKey::max_for_id(&object_id)),
343        )?;
344
345        if let Some(Ok((object_key, value))) = iterator.next() {
346            if object_key.0 == object_id {
347                return Ok(Some((object_key, value)));
348            }
349        }
350        Ok(None)
351    }
352
353    pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
354        Ok(self
355            .epoch_start_configuration
356            .get(&())?
357            .expect("Must have current epoch.")
358            .epoch_start_state()
359            .epoch())
360    }
361
362    pub fn set_epoch_start_configuration(
363        &self,
364        epoch_start_configuration: &EpochStartConfiguration,
365    ) -> IotaResult {
366        let mut wb = self.epoch_start_configuration.batch();
367        wb.insert_batch(
368            &self.epoch_start_configuration,
369            std::iter::once(((), epoch_start_configuration)),
370        )?;
371        wb.write()?;
372        Ok(())
373    }
374
375    pub fn get_highest_pruned_checkpoint(
376        &self,
377    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
378        self.pruned_checkpoint.get(&())
379    }
380
381    pub fn set_highest_pruned_checkpoint(
382        &self,
383        wb: &mut DBBatch,
384        checkpoint_number: CheckpointSequenceNumber,
385    ) -> IotaResult {
386        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
387        Ok(())
388    }
389
390    pub fn get_transaction(
391        &self,
392        digest: &TransactionDigest,
393    ) -> IotaResult<Option<TrustedTransaction>> {
394        let Some(transaction) = self.transactions.get(digest)? else {
395            return Ok(None);
396        };
397        Ok(Some(transaction))
398    }
399
400    pub fn get_effects(
401        &self,
402        digest: &TransactionDigest,
403    ) -> IotaResult<Option<TransactionEffects>> {
404        let Some(effect_digest) = self.executed_effects.get(digest)? else {
405            return Ok(None);
406        };
407        Ok(self.effects.get(&effect_digest)?)
408    }
409
410    pub fn get_checkpoint_sequence_number(
411        &self,
412        digest: &TransactionDigest,
413    ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
414        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
415    }
416
417    pub fn get_newer_object_keys(
418        &self,
419        object: &(ObjectID, SequenceNumber),
420    ) -> IotaResult<Vec<ObjectKey>> {
421        let mut objects = vec![];
422        for result in self.objects.safe_iter_with_bounds(
423            Some(ObjectKey(object.0, object.1.next())),
424            Some(ObjectKey(object.0, VersionNumber::MAX_VALID_EXCL)),
425        ) {
426            let (key, _) = result?;
427            objects.push(key);
428        }
429        Ok(objects)
430    }
431
432    pub fn set_highest_pruned_checkpoint_without_wb(
433        &self,
434        checkpoint_number: CheckpointSequenceNumber,
435    ) -> IotaResult {
436        let mut wb = self.pruned_checkpoint.batch();
437        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
438        wb.write()?;
439        Ok(())
440    }
441
442    pub fn database_is_empty(&self) -> IotaResult<bool> {
443        Ok(self.objects.safe_iter().next().is_none())
444    }
445
446    pub fn iter_live_object_set(&self) -> LiveSetIter<'_> {
447        LiveSetIter {
448            iter: self.objects.safe_iter(),
449            tables: self,
450            prev: None,
451        }
452    }
453
454    pub fn range_iter_live_object_set(
455        &self,
456        lower_bound: Option<ObjectID>,
457        upper_bound: Option<ObjectID>,
458    ) -> LiveSetIter<'_> {
459        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
460        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
461
462        LiveSetIter {
463            iter: self.objects.safe_iter_with_bounds(lower_bound, upper_bound),
464            tables: self,
465            prev: None,
466        }
467    }
468
469    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
470        // This checkpoints the entire db and not just objects table
471        self.objects.checkpoint_db(path).map_err(Into::into)
472    }
473
474    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
475        // TODO: Add new tables that get added to the db automatically
476        self.objects.unsafe_clear()?;
477        self.live_owned_object_markers.unsafe_clear()?;
478        self.executed_effects.unsafe_clear()?;
479        self.events.unsafe_clear()?;
480        self.executed_transactions_to_checkpoint.unsafe_clear()?;
481        self.root_state_hash_by_epoch.unsafe_clear()?;
482        self.epoch_start_configuration.unsafe_clear()?;
483        self.pruned_checkpoint.unsafe_clear()?;
484        self.total_iota_supply.unsafe_clear()?;
485        self.expected_storage_fund_imbalance.unsafe_clear()?;
486        self.object_per_epoch_marker_table.unsafe_clear()?;
487        self.objects.rocksdb.flush()?;
488        Ok(())
489    }
490
491    pub fn get_root_state_hash(
492        &self,
493        epoch: EpochId,
494    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
495        Ok(self.root_state_hash_by_epoch.get(&epoch)?)
496    }
497
498    pub fn insert_root_state_hash(
499        &self,
500        epoch: EpochId,
501        last_checkpoint_of_epoch: CheckpointSequenceNumber,
502        accumulator: Accumulator,
503    ) -> IotaResult {
504        self.root_state_hash_by_epoch
505            .insert(&epoch, &(last_checkpoint_of_epoch, accumulator))?;
506        Ok(())
507    }
508
509    pub fn insert_object_test_only(&self, object: Object) -> IotaResult {
510        let object_reference = object.compute_object_reference();
511        let wrapper = get_store_object(object);
512        let mut wb = self.objects.batch();
513        wb.insert_batch(
514            &self.objects,
515            std::iter::once((ObjectKey::from(object_reference), wrapper)),
516        )?;
517        wb.write()?;
518        Ok(())
519    }
520}
521
522impl ObjectStore for AuthorityPerpetualTables {
523    /// Read an object and return it, or Ok(None) if the object was not found.
524    fn try_get_object(
525        &self,
526        object_id: &ObjectID,
527    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
528        let obj_entry = self
529            .objects
530            .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))
531            .map_err(iota_types::storage::error::Error::custom)?
532            .next();
533
534        match obj_entry.transpose()? {
535            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
536                .object(&ObjectKey(obj_id, version), obj)
537                .map_err(iota_types::storage::error::Error::custom)?),
538            _ => Ok(None),
539        }
540    }
541
542    fn try_get_object_by_key(
543        &self,
544        object_id: &ObjectID,
545        version: VersionNumber,
546    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
547        Ok(self
548            .objects
549            .get(&ObjectKey(*object_id, version))
550            .map_err(iota_types::storage::error::Error::custom)?
551            .map(|object| self.object(&ObjectKey(*object_id, version), object))
552            .transpose()
553            .map_err(iota_types::storage::error::Error::custom)?
554            .flatten())
555    }
556}
557
558pub struct LiveSetIter<'a> {
559    iter:
560        <DBMap<ObjectKey, StoreObjectWrapper> as Map<'a, ObjectKey, StoreObjectWrapper>>::SafeIterator,
561    tables: &'a AuthorityPerpetualTables,
562    prev: Option<(ObjectKey, StoreObjectWrapper)>,
563}
564
565#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
566pub enum LiveObject {
567    Normal(Object),
568    Wrapped(ObjectKey),
569}
570
571impl LiveObject {
572    pub fn object_id(&self) -> ObjectID {
573        match self {
574            LiveObject::Normal(obj) => obj.id(),
575            LiveObject::Wrapped(key) => key.0,
576        }
577    }
578
579    pub fn version(&self) -> SequenceNumber {
580        match self {
581            LiveObject::Normal(obj) => obj.version(),
582            LiveObject::Wrapped(key) => key.1,
583        }
584    }
585
586    pub fn object_reference(&self) -> ObjectRef {
587        match self {
588            LiveObject::Normal(obj) => obj.compute_object_reference(),
589            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
590        }
591    }
592
593    pub fn to_normal(self) -> Option<Object> {
594        match self {
595            LiveObject::Normal(object) => Some(object),
596            LiveObject::Wrapped(_) => None,
597        }
598    }
599}
600
601impl LiveSetIter<'_> {
602    fn store_object_wrapper_to_live_object(
603        &self,
604        object_key: ObjectKey,
605        store_object: StoreObjectWrapper,
606    ) -> Option<LiveObject> {
607        match store_object.migrate().into_inner() {
608            StoreObject::Value(object) => {
609                let object = self
610                    .tables
611                    .construct_object(&object_key, object)
612                    .expect("Constructing object from store cannot fail");
613                Some(LiveObject::Normal(object))
614            }
615            StoreObject::Wrapped | StoreObject::Deleted => None,
616        }
617    }
618}
619
620impl Iterator for LiveSetIter<'_> {
621    type Item = LiveObject;
622
623    fn next(&mut self) -> Option<Self::Item> {
624        loop {
625            if let Some(Ok((next_key, next_value))) = self.iter.next() {
626                let prev = self.prev.take();
627                self.prev = Some((next_key, next_value));
628
629                if let Some((prev_key, prev_value)) = prev {
630                    if prev_key.0 != next_key.0 {
631                        let live_object =
632                            self.store_object_wrapper_to_live_object(prev_key, prev_value);
633                        if live_object.is_some() {
634                            return live_object;
635                        }
636                    }
637                }
638                continue;
639            }
640            if let Some((key, value)) = self.prev.take() {
641                let live_object = self.store_object_wrapper_to_live_object(key, value);
642                if live_object.is_some() {
643                    return live_object;
644                }
645            }
646            return None;
647        }
648    }
649}
650
651// These functions are used to initialize the DB tables
652fn live_owned_object_markers_table_config(db_options: DBOptions) -> DBOptions {
653    DBOptions {
654        options: db_options
655            .clone()
656            .optimize_for_write_throughput()
657            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
658            .options,
659        rw_options: db_options.rw_options.set_ignore_range_deletions(false),
660    }
661}
662
663fn objects_table_config(
664    mut db_options: DBOptions,
665    compaction_filter: Option<ObjectsCompactionFilter>,
666) -> DBOptions {
667    if let Some(mut compaction_filter) = compaction_filter {
668        db_options
669            .options
670            .set_compaction_filter("objects", move |_, key, value| {
671                match compaction_filter.filter(key, value) {
672                    Ok(decision) => decision,
673                    Err(err) => {
674                        error!("Compaction error: {:?}", err);
675                        Decision::Keep
676                    }
677                }
678            });
679    }
680    db_options
681        .optimize_for_write_throughput()
682        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
683}
684
685fn transactions_table_config(db_options: DBOptions) -> DBOptions {
686    db_options
687        .optimize_for_write_throughput()
688        .optimize_for_point_lookup(
689            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
690        )
691}
692
693fn effects_table_config(db_options: DBOptions) -> DBOptions {
694    db_options
695        .optimize_for_write_throughput()
696        .optimize_for_point_lookup(
697            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
698        )
699}
700
701fn events_table_config(db_options: DBOptions) -> DBOptions {
702    db_options
703        .optimize_for_write_throughput()
704        .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
705}