iota_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::Path;
6
7use iota_types::{
8    accumulator::Accumulator, base_types::SequenceNumber, digests::TransactionEventsDigest,
9    effects::TransactionEffects, storage::MarkerValue,
10};
11use serde::{Deserialize, Serialize};
12use typed_store::{
13    DBMapUtils,
14    metrics::SamplingInterval,
15    rocks::{
16        DBBatch, DBMap, DBOptions, MetricConf, ReadWriteOptions, default_db_options,
17        read_size_from_env,
18        util::{empty_compaction_filter, reference_count_merge_operator},
19    },
20    rocksdb::Options,
21    traits::{Map, TableSummary, TypedStoreDebug},
22};
23
24use super::*;
25use crate::authority::{
26    authority_store_types::{
27        ObjectContentDigest, StoreData, StoreMoveObjectWrapper, StoreObject, StoreObjectPair,
28        StoreObjectValue, StoreObjectWrapper, get_store_object_pair, try_construct_object,
29    },
30    epoch_start_configuration::EpochStartConfiguration,
31};
32
33const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
34pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
35const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
36const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
37const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
38const ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE: &str = "INDIRECT_OBJECTS_BLOCK_CACHE_MB";
39
40/// AuthorityPerpetualTables contains data that must be preserved from one epoch
41/// to the next.
42#[derive(DBMapUtils)]
43pub struct AuthorityPerpetualTables {
44    /// This is a map between the object (ID, version) and the latest state of
45    /// the object, namely the state that is needed to process new
46    /// transactions. State is represented by `StoreObject` enum, which is
47    /// either a move module, a move object, or a pointer to an object
48    /// stored in the `indirect_move_objects` table.
49    ///
50    /// Note that while this map can store all versions of an object, we will
51    /// eventually prune old object versions from the db.
52    ///
53    /// IMPORTANT: object versions must *only* be pruned if they appear as
54    /// inputs in some TransactionEffects. Simply pruning all objects but
55    /// the most recent is an error! This is because there can be partially
56    /// executed transactions whose effects have not yet been written out,
57    /// and which must be retried. But, they cannot be retried unless their
58    /// input objects are still accessible!
59    #[default_options_override_fn = "objects_table_default_config"]
60    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
61
62    #[default_options_override_fn = "indirect_move_objects_table_default_config"]
63    pub(crate) indirect_move_objects: DBMap<ObjectContentDigest, StoreMoveObjectWrapper>,
64
65    /// Object references of currently active objects that can be mutated.
66    #[default_options_override_fn = "live_owned_object_markers_table_default_config"]
67    pub(crate) live_owned_object_markers: DBMap<ObjectRef, ()>,
68
69    /// This is a map between the transaction digest and the corresponding
70    /// transaction that's known to be executable. This means that it may
71    /// have been executed locally, or it may have been synced through
72    /// state-sync but hasn't been executed yet.
73    #[default_options_override_fn = "transactions_table_default_config"]
74    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
75
76    /// A map between the transaction digest of a certificate to the effects of
77    /// its execution. We store effects into this table in two different
78    /// cases:
79    /// 1. When a transaction is synced through state_sync, we store the effects
80    ///    here. These effects are known to be final in the network, but may not
81    ///    have been executed locally yet.
82    /// 2. When the transaction is executed locally on this node, we store the
83    ///    effects here. This means that it's possible to store the same effects
84    ///    twice (once for the synced transaction, and once for the executed).
85    ///
86    /// It's also possible for the effects to be reverted if the transaction
87    /// didn't make it into the epoch.
88    #[default_options_override_fn = "effects_table_default_config"]
89    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
90
91    /// Transactions that have been executed locally on this node. We need this
92    /// table since the `effects` table doesn't say anything about the
93    /// execution status of the transaction on this node. When we wait for
94    /// transactions to be executed, we wait for them to appear in this
95    /// table. When we revert transactions, we remove them from both tables.
96    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
97
98    // Currently this is needed in the validator for returning events during process certificates.
99    // We could potentially remove this if we decided not to provide events in the execution path.
100    // TODO: Figure out what to do with this table in the long run.
101    // Also we need a pruning policy for this table. We can prune this table along with tx/effects.
102    #[default_options_override_fn = "events_table_default_config"]
103    pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
104
105    /// Epoch and checkpoint of transactions finalized by checkpoint
106    /// executor. Currently, mainly used to implement JSON RPC `ReadApi`.
107    /// Note, there is a table with the same name in
108    /// `AuthorityEpochTables`/`AuthorityPerEpochStore`.
109    pub(crate) executed_transactions_to_checkpoint:
110        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
111
112    // Finalized root state accumulator for epoch, to be included in CheckpointSummary
113    // of last checkpoint of epoch. These values should only ever be written once
114    // and never changed
115    pub(crate) root_state_hash_by_epoch: DBMap<EpochId, (CheckpointSequenceNumber, Accumulator)>,
116
117    /// Parameters of the system fixed at the epoch start
118    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
119
120    /// A singleton table that stores latest pruned checkpoint. Used to keep
121    /// objects pruner progress
122    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
123
124    /// The total IOTA supply and the epoch at which it was stored.
125    /// We check and update it at the end of each epoch if expensive checks are
126    /// enabled.
127    pub(crate) total_iota_supply: DBMap<(), TotalIotaSupplyCheck>,
128
129    /// Expected imbalance between storage fund balance and the sum of storage
130    /// rebate of all live objects. This could be non-zero due to bugs in
131    /// earlier protocol versions. This number is the result of
132    /// storage_fund_balance - sum(storage_rebate).
133    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
134
135    /// Table that stores the set of received objects and deleted objects and
136    /// the version at which they were received. This is used to prevent
137    /// possible race conditions around receiving objects (since they are
138    /// not locked by the transaction manager) and for tracking shared
139    /// objects that have been deleted. This table is meant to be pruned
140    /// per-epoch, and all previous epochs other than the current epoch may
141    /// be pruned safely.
142    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
143}
144
145/// The total IOTA supply used during conservation checks.
146#[derive(Debug, Serialize, Deserialize)]
147pub(crate) struct TotalIotaSupplyCheck {
148    /// The IOTA supply at the time of `last_check_epoch`.
149    pub(crate) total_supply: u64,
150    /// The epoch at which the total supply was last checked or updated.
151    pub(crate) last_check_epoch: EpochId,
152}
153
154impl AuthorityPerpetualTables {
155    pub fn path(parent_path: &Path) -> PathBuf {
156        parent_path.join("perpetual")
157    }
158
159    pub fn open(parent_path: &Path, db_options: Option<Options>) -> Self {
160        Self::open_tables_read_write(
161            Self::path(parent_path),
162            MetricConf::new("perpetual")
163                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
164            db_options,
165            None,
166        )
167    }
168
169    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
170        Self::get_read_only_handle(
171            Self::path(parent_path),
172            None,
173            None,
174            MetricConf::new("perpetual_readonly"),
175        )
176    }
177
178    // This is used by indexer to find the correct version of dynamic field child
179    // object. We do not store the version of the child object, but because of
180    // lamport timestamp, we know the child must have version number less then
181    // or eq to the parent.
182    pub fn find_object_lt_or_eq_version(
183        &self,
184        object_id: ObjectID,
185        version: SequenceNumber,
186    ) -> IotaResult<Option<Object>> {
187        let iter = self
188            .objects
189            .safe_range_iter(ObjectKey::min_for_id(&object_id)..=ObjectKey::max_for_id(&object_id))
190            .skip_prior_to(&ObjectKey(object_id, version))?;
191        match iter.reverse().next() {
192            Some(Ok((key, o))) => self.object(&key, o),
193            Some(Err(e)) => Err(e.into()),
194            None => Ok(None),
195        }
196    }
197
198    fn construct_object(
199        &self,
200        object_key: &ObjectKey,
201        store_object: StoreObjectValue,
202    ) -> Result<Object, IotaError> {
203        let indirect_object = match store_object.data {
204            StoreData::IndirectObject(ref metadata) => self
205                .indirect_move_objects
206                .get(&metadata.digest)?
207                .map(|o| o.migrate().into_inner()),
208            _ => None,
209        };
210        try_construct_object(object_key, store_object, indirect_object)
211    }
212
213    // Constructs `iota_types::object::Object` from `StoreObjectWrapper`.
214    // Returns `None` if object was deleted/wrapped
215    pub fn object(
216        &self,
217        object_key: &ObjectKey,
218        store_object: StoreObjectWrapper,
219    ) -> Result<Option<Object>, IotaError> {
220        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
221            return Ok(None);
222        };
223        Ok(Some(self.construct_object(object_key, store_object)?))
224    }
225
226    pub fn object_reference(
227        &self,
228        object_key: &ObjectKey,
229        store_object: StoreObjectWrapper,
230    ) -> Result<ObjectRef, IotaError> {
231        let obj_ref = match store_object.migrate().into_inner() {
232            StoreObject::Value(object) => self
233                .construct_object(object_key, object)?
234                .compute_object_reference(),
235            StoreObject::Deleted => (
236                object_key.0,
237                object_key.1,
238                ObjectDigest::OBJECT_DIGEST_DELETED,
239            ),
240            StoreObject::Wrapped => (
241                object_key.0,
242                object_key.1,
243                ObjectDigest::OBJECT_DIGEST_WRAPPED,
244            ),
245        };
246        Ok(obj_ref)
247    }
248
249    pub fn tombstone_reference(
250        &self,
251        object_key: &ObjectKey,
252        store_object: &StoreObjectWrapper,
253    ) -> Result<Option<ObjectRef>, IotaError> {
254        let obj_ref = match store_object.inner() {
255            StoreObject::Deleted => Some((
256                object_key.0,
257                object_key.1,
258                ObjectDigest::OBJECT_DIGEST_DELETED,
259            )),
260            StoreObject::Wrapped => Some((
261                object_key.0,
262                object_key.1,
263                ObjectDigest::OBJECT_DIGEST_WRAPPED,
264            )),
265            _ => None,
266        };
267        Ok(obj_ref)
268    }
269
270    pub fn get_latest_object_ref_or_tombstone(
271        &self,
272        object_id: ObjectID,
273    ) -> Result<Option<ObjectRef>, IotaError> {
274        let mut iterator = self
275            .objects
276            .unbounded_iter()
277            .skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
278
279        if let Some((object_key, value)) = iterator.next() {
280            if object_key.0 == object_id {
281                return Ok(Some(self.object_reference(&object_key, value)?));
282            }
283        }
284        Ok(None)
285    }
286
287    pub fn get_latest_object_or_tombstone(
288        &self,
289        object_id: ObjectID,
290    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, IotaError> {
291        let mut iterator = self
292            .objects
293            .unbounded_iter()
294            .skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
295
296        if let Some((object_key, value)) = iterator.next() {
297            if object_key.0 == object_id {
298                return Ok(Some((object_key, value)));
299            }
300        }
301        Ok(None)
302    }
303
304    pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
305        Ok(self
306            .epoch_start_configuration
307            .get(&())?
308            .expect("Must have current epoch.")
309            .epoch_start_state()
310            .epoch())
311    }
312
313    pub fn set_epoch_start_configuration(
314        &self,
315        epoch_start_configuration: &EpochStartConfiguration,
316    ) -> IotaResult {
317        let mut wb = self.epoch_start_configuration.batch();
318        wb.insert_batch(
319            &self.epoch_start_configuration,
320            std::iter::once(((), epoch_start_configuration)),
321        )?;
322        wb.write()?;
323        Ok(())
324    }
325
326    pub fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
327        Ok(self.pruned_checkpoint.get(&())?.unwrap_or_default())
328    }
329
330    pub fn set_highest_pruned_checkpoint(
331        &self,
332        wb: &mut DBBatch,
333        checkpoint_number: CheckpointSequenceNumber,
334    ) -> IotaResult {
335        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
336        Ok(())
337    }
338
339    pub fn get_transaction(
340        &self,
341        digest: &TransactionDigest,
342    ) -> IotaResult<Option<TrustedTransaction>> {
343        let Some(transaction) = self.transactions.get(digest)? else {
344            return Ok(None);
345        };
346        Ok(Some(transaction))
347    }
348
349    pub fn get_effects(
350        &self,
351        digest: &TransactionDigest,
352    ) -> IotaResult<Option<TransactionEffects>> {
353        let Some(effect_digest) = self.executed_effects.get(digest)? else {
354            return Ok(None);
355        };
356        Ok(self.effects.get(&effect_digest)?)
357    }
358
359    pub fn get_checkpoint_sequence_number(
360        &self,
361        digest: &TransactionDigest,
362    ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
363        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
364    }
365
366    pub fn get_newer_object_keys(
367        &self,
368        object: &(ObjectID, SequenceNumber),
369    ) -> IotaResult<Vec<ObjectKey>> {
370        let mut objects = vec![];
371        for result in self.objects.safe_iter_with_bounds(
372            Some(ObjectKey(object.0, object.1.next())),
373            Some(ObjectKey(object.0, VersionNumber::MAX)),
374        ) {
375            let (key, _) = result?;
376            objects.push(key);
377        }
378        Ok(objects)
379    }
380
381    pub fn set_highest_pruned_checkpoint_without_wb(
382        &self,
383        checkpoint_number: CheckpointSequenceNumber,
384    ) -> IotaResult {
385        let mut wb = self.pruned_checkpoint.batch();
386        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
387        wb.write()?;
388        Ok(())
389    }
390
391    pub fn database_is_empty(&self) -> IotaResult<bool> {
392        Ok(self
393            .objects
394            .unbounded_iter()
395            .skip_to(&ObjectKey::ZERO)?
396            .next()
397            .is_none())
398    }
399
400    pub fn iter_live_object_set(&self) -> LiveSetIter<'_> {
401        LiveSetIter {
402            iter: self.objects.unbounded_iter(),
403            tables: self,
404            prev: None,
405        }
406    }
407
408    pub fn range_iter_live_object_set(
409        &self,
410        lower_bound: Option<ObjectID>,
411        upper_bound: Option<ObjectID>,
412    ) -> LiveSetIter<'_> {
413        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
414        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
415
416        LiveSetIter {
417            iter: self.objects.iter_with_bounds(lower_bound, upper_bound),
418            tables: self,
419            prev: None,
420        }
421    }
422
423    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
424        // This checkpoints the entire db and not just objects table
425        self.objects.checkpoint_db(path).map_err(Into::into)
426    }
427
428    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
429        // TODO: Add new tables that get added to the db automatically
430        self.objects.unsafe_clear()?;
431        self.indirect_move_objects.unsafe_clear()?;
432        self.live_owned_object_markers.unsafe_clear()?;
433        self.executed_effects.unsafe_clear()?;
434        self.events.unsafe_clear()?;
435        self.executed_transactions_to_checkpoint.unsafe_clear()?;
436        self.root_state_hash_by_epoch.unsafe_clear()?;
437        self.epoch_start_configuration.unsafe_clear()?;
438        self.pruned_checkpoint.unsafe_clear()?;
439        self.total_iota_supply.unsafe_clear()?;
440        self.expected_storage_fund_imbalance.unsafe_clear()?;
441        self.object_per_epoch_marker_table.unsafe_clear()?;
442        self.objects.rocksdb.flush()?;
443        Ok(())
444    }
445
446    pub fn get_root_state_hash(
447        &self,
448        epoch: EpochId,
449    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
450        Ok(self.root_state_hash_by_epoch.get(&epoch)?)
451    }
452
453    pub fn insert_root_state_hash(
454        &self,
455        epoch: EpochId,
456        last_checkpoint_of_epoch: CheckpointSequenceNumber,
457        accumulator: Accumulator,
458    ) -> IotaResult {
459        self.root_state_hash_by_epoch
460            .insert(&epoch, &(last_checkpoint_of_epoch, accumulator))?;
461        Ok(())
462    }
463
464    pub fn insert_object_test_only(&self, object: Object) -> IotaResult {
465        let object_reference = object.compute_object_reference();
466        let StoreObjectPair(wrapper, _indirect_object) = get_store_object_pair(object, usize::MAX);
467        let mut wb = self.objects.batch();
468        wb.insert_batch(
469            &self.objects,
470            std::iter::once((ObjectKey::from(object_reference), wrapper)),
471        )?;
472        wb.write()?;
473        Ok(())
474    }
475}
476
477impl ObjectStore for AuthorityPerpetualTables {
478    /// Read an object and return it, or Ok(None) if the object was not found.
479    fn get_object(
480        &self,
481        object_id: &ObjectID,
482    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
483        let obj_entry = self
484            .objects
485            .unbounded_iter()
486            .skip_prior_to(&ObjectKey::max_for_id(object_id))
487            .map_err(iota_types::storage::error::Error::custom)?
488            .next();
489
490        match obj_entry {
491            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
492                .object(&ObjectKey(obj_id, version), obj)
493                .map_err(iota_types::storage::error::Error::custom)?),
494            _ => Ok(None),
495        }
496    }
497
498    fn get_object_by_key(
499        &self,
500        object_id: &ObjectID,
501        version: VersionNumber,
502    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
503        Ok(self
504            .objects
505            .get(&ObjectKey(*object_id, version))
506            .map_err(iota_types::storage::error::Error::custom)?
507            .map(|object| self.object(&ObjectKey(*object_id, version), object))
508            .transpose()
509            .map_err(iota_types::storage::error::Error::custom)?
510            .flatten())
511    }
512}
513
514pub struct LiveSetIter<'a> {
515    iter:
516        <DBMap<ObjectKey, StoreObjectWrapper> as Map<'a, ObjectKey, StoreObjectWrapper>>::Iterator,
517    tables: &'a AuthorityPerpetualTables,
518    prev: Option<(ObjectKey, StoreObjectWrapper)>,
519}
520
521#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
522pub enum LiveObject {
523    Normal(Object),
524    Wrapped(ObjectKey),
525}
526
527impl LiveObject {
528    pub fn object_id(&self) -> ObjectID {
529        match self {
530            LiveObject::Normal(obj) => obj.id(),
531            LiveObject::Wrapped(key) => key.0,
532        }
533    }
534
535    pub fn version(&self) -> SequenceNumber {
536        match self {
537            LiveObject::Normal(obj) => obj.version(),
538            LiveObject::Wrapped(key) => key.1,
539        }
540    }
541
542    pub fn object_reference(&self) -> ObjectRef {
543        match self {
544            LiveObject::Normal(obj) => obj.compute_object_reference(),
545            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
546        }
547    }
548
549    pub fn to_normal(self) -> Option<Object> {
550        match self {
551            LiveObject::Normal(object) => Some(object),
552            LiveObject::Wrapped(_) => None,
553        }
554    }
555}
556
557impl LiveSetIter<'_> {
558    fn store_object_wrapper_to_live_object(
559        &self,
560        object_key: ObjectKey,
561        store_object: StoreObjectWrapper,
562    ) -> Option<LiveObject> {
563        match store_object.migrate().into_inner() {
564            StoreObject::Value(object) => {
565                let object = self
566                    .tables
567                    .construct_object(&object_key, object)
568                    .expect("Constructing object from store cannot fail");
569                Some(LiveObject::Normal(object))
570            }
571            StoreObject::Wrapped | StoreObject::Deleted => None,
572        }
573    }
574}
575
576impl Iterator for LiveSetIter<'_> {
577    type Item = LiveObject;
578
579    fn next(&mut self) -> Option<Self::Item> {
580        loop {
581            if let Some((next_key, next_value)) = self.iter.next() {
582                let prev = self.prev.take();
583                self.prev = Some((next_key, next_value));
584
585                if let Some((prev_key, prev_value)) = prev {
586                    if prev_key.0 != next_key.0 {
587                        let live_object =
588                            self.store_object_wrapper_to_live_object(prev_key, prev_value);
589                        if live_object.is_some() {
590                            return live_object;
591                        }
592                    }
593                }
594                continue;
595            }
596            if let Some((key, value)) = self.prev.take() {
597                let live_object = self.store_object_wrapper_to_live_object(key, value);
598                if live_object.is_some() {
599                    return live_object;
600                }
601            }
602            return None;
603        }
604    }
605}
606
607// These functions are used to initialize the DB tables
608fn live_owned_object_markers_table_default_config() -> DBOptions {
609    DBOptions {
610        options: default_db_options()
611            .optimize_for_write_throughput()
612            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
613            .options,
614        rw_options: ReadWriteOptions::default().set_ignore_range_deletions(false),
615    }
616}
617
618fn objects_table_default_config() -> DBOptions {
619    default_db_options()
620        .optimize_for_write_throughput()
621        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
622}
623
624fn transactions_table_default_config() -> DBOptions {
625    default_db_options()
626        .optimize_for_write_throughput()
627        .optimize_for_point_lookup(
628            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
629        )
630}
631
632fn effects_table_default_config() -> DBOptions {
633    default_db_options()
634        .optimize_for_write_throughput()
635        .optimize_for_point_lookup(
636            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
637        )
638}
639
640fn events_table_default_config() -> DBOptions {
641    default_db_options()
642        .optimize_for_write_throughput()
643        .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
644}
645
646fn indirect_move_objects_table_default_config() -> DBOptions {
647    let mut options = default_db_options()
648        .optimize_for_write_throughput()
649        .optimize_for_point_lookup(
650            read_size_from_env(ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(512),
651        );
652    options.options.set_merge_operator(
653        "refcount operator",
654        reference_count_merge_operator,
655        reference_count_merge_operator,
656    );
657    options
658        .options
659        .set_compaction_filter("empty filter", empty_compaction_filter);
660    options
661}