iota_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::Path;
6
7use iota_types::{
8    accumulator::Accumulator, base_types::SequenceNumber, digests::TransactionEventsDigest,
9    effects::TransactionEffects, storage::MarkerValue,
10};
11use serde::{Deserialize, Serialize};
12use typed_store::{
13    DBMapUtils,
14    metrics::SamplingInterval,
15    rocks::{
16        DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
17        read_size_from_env,
18        util::{empty_compaction_filter, reference_count_merge_operator},
19    },
20    traits::{Map, TableSummary, TypedStoreDebug},
21};
22
23use super::*;
24use crate::authority::{
25    authority_store_types::{
26        ObjectContentDigest, StoreData, StoreMoveObjectWrapper, StoreObject, StoreObjectPair,
27        StoreObjectValue, StoreObjectWrapper, get_store_object_pair, try_construct_object,
28    },
29    epoch_start_configuration::EpochStartConfiguration,
30};
31
32const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
33pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
34const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
35const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
36const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
37const ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE: &str = "INDIRECT_OBJECTS_BLOCK_CACHE_MB";
38
39/// Options to apply to every column family of the `perpetual` DB.
40#[derive(Default)]
41pub struct AuthorityPerpetualTablesOptions {
42    /// Whether to enable write stalling on all column families.
43    pub enable_write_stall: bool,
44}
45
46impl AuthorityPerpetualTablesOptions {
47    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
48        if !self.enable_write_stall {
49            db_options = db_options.disable_write_throttling();
50        }
51        db_options
52    }
53}
54
55/// AuthorityPerpetualTables contains data that must be preserved from one epoch
56/// to the next.
57#[derive(DBMapUtils)]
58pub struct AuthorityPerpetualTables {
59    /// This is a map between the object (ID, version) and the latest state of
60    /// the object, namely the state that is needed to process new
61    /// transactions. State is represented by `StoreObject` enum, which is
62    /// either a move module, a move object, or a pointer to an object
63    /// stored in the `indirect_move_objects` table.
64    ///
65    /// Note that while this map can store all versions of an object, we will
66    /// eventually prune old object versions from the db.
67    ///
68    /// IMPORTANT: object versions must *only* be pruned if they appear as
69    /// inputs in some TransactionEffects. Simply pruning all objects but
70    /// the most recent is an error! This is because there can be partially
71    /// executed transactions whose effects have not yet been written out,
72    /// and which must be retried. But, they cannot be retried unless their
73    /// input objects are still accessible!
74    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
75
76    pub(crate) indirect_move_objects: DBMap<ObjectContentDigest, StoreMoveObjectWrapper>,
77
78    /// Object references of currently active objects that can be mutated.
79    pub(crate) live_owned_object_markers: DBMap<ObjectRef, ()>,
80
81    /// This is a map between the transaction digest and the corresponding
82    /// transaction that's known to be executable. This means that it may
83    /// have been executed locally, or it may have been synced through
84    /// state-sync but hasn't been executed yet.
85    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
86
87    /// A map between the transaction digest of a certificate to the effects of
88    /// its execution. We store effects into this table in two different
89    /// cases:
90    /// 1. When a transaction is synced through state_sync, we store the effects
91    ///    here. These effects are known to be final in the network, but may not
92    ///    have been executed locally yet.
93    /// 2. When the transaction is executed locally on this node, we store the
94    ///    effects here. This means that it's possible to store the same effects
95    ///    twice (once for the synced transaction, and once for the executed).
96    ///
97    /// It's also possible for the effects to be reverted if the transaction
98    /// didn't make it into the epoch.
99    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
100
101    /// Transactions that have been executed locally on this node. We need this
102    /// table since the `effects` table doesn't say anything about the
103    /// execution status of the transaction on this node. When we wait for
104    /// transactions to be executed, we wait for them to appear in this
105    /// table. When we revert transactions, we remove them from both tables.
106    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
107
108    // Currently this is needed in the validator for returning events during process certificates.
109    // We could potentially remove this if we decided not to provide events in the execution path.
110    // TODO: Figure out what to do with this table in the long run.
111    // Also we need a pruning policy for this table. We can prune this table along with tx/effects.
112    pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
113
114    /// Epoch and checkpoint of transactions finalized by checkpoint
115    /// executor. Currently, mainly used to implement JSON RPC `ReadApi`.
116    /// Note, there is a table with the same name in
117    /// `AuthorityEpochTables`/`AuthorityPerEpochStore`.
118    pub(crate) executed_transactions_to_checkpoint:
119        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
120
121    // Finalized root state accumulator for epoch, to be included in CheckpointSummary
122    // of last checkpoint of epoch. These values should only ever be written once
123    // and never changed
124    pub(crate) root_state_hash_by_epoch: DBMap<EpochId, (CheckpointSequenceNumber, Accumulator)>,
125
126    /// Parameters of the system fixed at the epoch start
127    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
128
129    /// A singleton table that stores latest pruned checkpoint. Used to keep
130    /// objects pruner progress
131    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
132
133    /// The total IOTA supply and the epoch at which it was stored.
134    /// We check and update it at the end of each epoch if expensive checks are
135    /// enabled.
136    pub(crate) total_iota_supply: DBMap<(), TotalIotaSupplyCheck>,
137
138    /// Expected imbalance between storage fund balance and the sum of storage
139    /// rebate of all live objects. This could be non-zero due to bugs in
140    /// earlier protocol versions. This number is the result of
141    /// storage_fund_balance - sum(storage_rebate).
142    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
143
144    /// Table that stores the set of received objects and deleted objects and
145    /// the version at which they were received. This is used to prevent
146    /// possible race conditions around receiving objects (since they are
147    /// not locked by the transaction manager) and for tracking shared
148    /// objects that have been deleted. This table is meant to be pruned
149    /// per-epoch, and all previous epochs other than the current epoch may
150    /// be pruned safely.
151    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
152}
153
154/// The total IOTA supply used during conservation checks.
155#[derive(Debug, Serialize, Deserialize)]
156pub(crate) struct TotalIotaSupplyCheck {
157    /// The IOTA supply at the time of `last_check_epoch`.
158    pub(crate) total_supply: u64,
159    /// The epoch at which the total supply was last checked or updated.
160    pub(crate) last_check_epoch: EpochId,
161}
162
163impl AuthorityPerpetualTables {
164    pub fn path(parent_path: &Path) -> PathBuf {
165        parent_path.join("perpetual")
166    }
167
168    pub fn open(
169        parent_path: &Path,
170        db_options_override: Option<AuthorityPerpetualTablesOptions>,
171    ) -> Self {
172        let db_options_override = db_options_override.unwrap_or_default();
173        let db_options =
174            db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
175        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
176            (
177                "objects".to_string(),
178                objects_table_config(db_options.clone()),
179            ),
180            (
181                "indirect_move_objects".to_string(),
182                indirect_move_objects_table_config(db_options.clone()),
183            ),
184            (
185                "live_owned_object_markers".to_string(),
186                live_owned_object_markers_table_config(db_options.clone()),
187            ),
188            (
189                "transactions".to_string(),
190                transactions_table_config(db_options.clone()),
191            ),
192            (
193                "effects".to_string(),
194                effects_table_config(db_options.clone()),
195            ),
196            (
197                "events".to_string(),
198                events_table_config(db_options.clone()),
199            ),
200        ]));
201        Self::open_tables_read_write(
202            Self::path(parent_path),
203            MetricConf::new("perpetual")
204                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
205            Some(db_options.options),
206            Some(table_options),
207        )
208    }
209
210    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
211        Self::get_read_only_handle(
212            Self::path(parent_path),
213            None,
214            None,
215            MetricConf::new("perpetual_readonly"),
216        )
217    }
218
219    // This is used by indexer to find the correct version of dynamic field child
220    // object. We do not store the version of the child object, but because of
221    // lamport timestamp, we know the child must have version number less then
222    // or eq to the parent.
223    pub fn find_object_lt_or_eq_version(
224        &self,
225        object_id: ObjectID,
226        version: SequenceNumber,
227    ) -> IotaResult<Option<Object>> {
228        let iter = self
229            .objects
230            .safe_range_iter(ObjectKey::min_for_id(&object_id)..=ObjectKey::max_for_id(&object_id))
231            .skip_prior_to(&ObjectKey(object_id, version))?;
232        match iter.reverse().next() {
233            Some(Ok((key, o))) => self.object(&key, o),
234            Some(Err(e)) => Err(e.into()),
235            None => Ok(None),
236        }
237    }
238
239    fn construct_object(
240        &self,
241        object_key: &ObjectKey,
242        store_object: StoreObjectValue,
243    ) -> Result<Object, IotaError> {
244        let indirect_object = match store_object.data {
245            StoreData::IndirectObject(ref metadata) => self
246                .indirect_move_objects
247                .get(&metadata.digest)?
248                .map(|o| o.migrate().into_inner()),
249            _ => None,
250        };
251        try_construct_object(object_key, store_object, indirect_object)
252    }
253
254    // Constructs `iota_types::object::Object` from `StoreObjectWrapper`.
255    // Returns `None` if object was deleted/wrapped
256    pub fn object(
257        &self,
258        object_key: &ObjectKey,
259        store_object: StoreObjectWrapper,
260    ) -> Result<Option<Object>, IotaError> {
261        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
262            return Ok(None);
263        };
264        Ok(Some(self.construct_object(object_key, store_object)?))
265    }
266
267    pub fn object_reference(
268        &self,
269        object_key: &ObjectKey,
270        store_object: StoreObjectWrapper,
271    ) -> Result<ObjectRef, IotaError> {
272        let obj_ref = match store_object.migrate().into_inner() {
273            StoreObject::Value(object) => self
274                .construct_object(object_key, object)?
275                .compute_object_reference(),
276            StoreObject::Deleted => (
277                object_key.0,
278                object_key.1,
279                ObjectDigest::OBJECT_DIGEST_DELETED,
280            ),
281            StoreObject::Wrapped => (
282                object_key.0,
283                object_key.1,
284                ObjectDigest::OBJECT_DIGEST_WRAPPED,
285            ),
286        };
287        Ok(obj_ref)
288    }
289
290    pub fn tombstone_reference(
291        &self,
292        object_key: &ObjectKey,
293        store_object: &StoreObjectWrapper,
294    ) -> Result<Option<ObjectRef>, IotaError> {
295        let obj_ref = match store_object.inner() {
296            StoreObject::Deleted => Some((
297                object_key.0,
298                object_key.1,
299                ObjectDigest::OBJECT_DIGEST_DELETED,
300            )),
301            StoreObject::Wrapped => Some((
302                object_key.0,
303                object_key.1,
304                ObjectDigest::OBJECT_DIGEST_WRAPPED,
305            )),
306            _ => None,
307        };
308        Ok(obj_ref)
309    }
310
311    pub fn get_latest_object_ref_or_tombstone(
312        &self,
313        object_id: ObjectID,
314    ) -> Result<Option<ObjectRef>, IotaError> {
315        let mut iterator = self
316            .objects
317            .unbounded_iter()
318            .skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
319
320        if let Some((object_key, value)) = iterator.next() {
321            if object_key.0 == object_id {
322                return Ok(Some(self.object_reference(&object_key, value)?));
323            }
324        }
325        Ok(None)
326    }
327
328    pub fn get_latest_object_or_tombstone(
329        &self,
330        object_id: ObjectID,
331    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, IotaError> {
332        let mut iterator = self
333            .objects
334            .unbounded_iter()
335            .skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
336
337        if let Some((object_key, value)) = iterator.next() {
338            if object_key.0 == object_id {
339                return Ok(Some((object_key, value)));
340            }
341        }
342        Ok(None)
343    }
344
345    pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
346        Ok(self
347            .epoch_start_configuration
348            .get(&())?
349            .expect("Must have current epoch.")
350            .epoch_start_state()
351            .epoch())
352    }
353
354    pub fn set_epoch_start_configuration(
355        &self,
356        epoch_start_configuration: &EpochStartConfiguration,
357    ) -> IotaResult {
358        let mut wb = self.epoch_start_configuration.batch();
359        wb.insert_batch(
360            &self.epoch_start_configuration,
361            std::iter::once(((), epoch_start_configuration)),
362        )?;
363        wb.write()?;
364        Ok(())
365    }
366
367    pub fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
368        Ok(self.pruned_checkpoint.get(&())?.unwrap_or_default())
369    }
370
371    pub fn set_highest_pruned_checkpoint(
372        &self,
373        wb: &mut DBBatch,
374        checkpoint_number: CheckpointSequenceNumber,
375    ) -> IotaResult {
376        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
377        Ok(())
378    }
379
380    pub fn get_transaction(
381        &self,
382        digest: &TransactionDigest,
383    ) -> IotaResult<Option<TrustedTransaction>> {
384        let Some(transaction) = self.transactions.get(digest)? else {
385            return Ok(None);
386        };
387        Ok(Some(transaction))
388    }
389
390    pub fn get_effects(
391        &self,
392        digest: &TransactionDigest,
393    ) -> IotaResult<Option<TransactionEffects>> {
394        let Some(effect_digest) = self.executed_effects.get(digest)? else {
395            return Ok(None);
396        };
397        Ok(self.effects.get(&effect_digest)?)
398    }
399
400    pub fn get_checkpoint_sequence_number(
401        &self,
402        digest: &TransactionDigest,
403    ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
404        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
405    }
406
407    pub fn get_newer_object_keys(
408        &self,
409        object: &(ObjectID, SequenceNumber),
410    ) -> IotaResult<Vec<ObjectKey>> {
411        let mut objects = vec![];
412        for result in self.objects.safe_iter_with_bounds(
413            Some(ObjectKey(object.0, object.1.next())),
414            Some(ObjectKey(object.0, VersionNumber::MAX)),
415        ) {
416            let (key, _) = result?;
417            objects.push(key);
418        }
419        Ok(objects)
420    }
421
422    pub fn set_highest_pruned_checkpoint_without_wb(
423        &self,
424        checkpoint_number: CheckpointSequenceNumber,
425    ) -> IotaResult {
426        let mut wb = self.pruned_checkpoint.batch();
427        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
428        wb.write()?;
429        Ok(())
430    }
431
432    pub fn database_is_empty(&self) -> IotaResult<bool> {
433        Ok(self
434            .objects
435            .unbounded_iter()
436            .skip_to(&ObjectKey::ZERO)?
437            .next()
438            .is_none())
439    }
440
441    pub fn iter_live_object_set(&self) -> LiveSetIter<'_> {
442        LiveSetIter {
443            iter: self.objects.unbounded_iter(),
444            tables: self,
445            prev: None,
446        }
447    }
448
449    pub fn range_iter_live_object_set(
450        &self,
451        lower_bound: Option<ObjectID>,
452        upper_bound: Option<ObjectID>,
453    ) -> LiveSetIter<'_> {
454        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
455        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
456
457        LiveSetIter {
458            iter: self.objects.iter_with_bounds(lower_bound, upper_bound),
459            tables: self,
460            prev: None,
461        }
462    }
463
464    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
465        // This checkpoints the entire db and not just objects table
466        self.objects.checkpoint_db(path).map_err(Into::into)
467    }
468
469    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
470        // TODO: Add new tables that get added to the db automatically
471        self.objects.unsafe_clear()?;
472        self.indirect_move_objects.unsafe_clear()?;
473        self.live_owned_object_markers.unsafe_clear()?;
474        self.executed_effects.unsafe_clear()?;
475        self.events.unsafe_clear()?;
476        self.executed_transactions_to_checkpoint.unsafe_clear()?;
477        self.root_state_hash_by_epoch.unsafe_clear()?;
478        self.epoch_start_configuration.unsafe_clear()?;
479        self.pruned_checkpoint.unsafe_clear()?;
480        self.total_iota_supply.unsafe_clear()?;
481        self.expected_storage_fund_imbalance.unsafe_clear()?;
482        self.object_per_epoch_marker_table.unsafe_clear()?;
483        self.objects.rocksdb.flush()?;
484        Ok(())
485    }
486
487    pub fn get_root_state_hash(
488        &self,
489        epoch: EpochId,
490    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
491        Ok(self.root_state_hash_by_epoch.get(&epoch)?)
492    }
493
494    pub fn insert_root_state_hash(
495        &self,
496        epoch: EpochId,
497        last_checkpoint_of_epoch: CheckpointSequenceNumber,
498        accumulator: Accumulator,
499    ) -> IotaResult {
500        self.root_state_hash_by_epoch
501            .insert(&epoch, &(last_checkpoint_of_epoch, accumulator))?;
502        Ok(())
503    }
504
505    pub fn insert_object_test_only(&self, object: Object) -> IotaResult {
506        let object_reference = object.compute_object_reference();
507        let StoreObjectPair(wrapper, _indirect_object) = get_store_object_pair(object, usize::MAX);
508        let mut wb = self.objects.batch();
509        wb.insert_batch(
510            &self.objects,
511            std::iter::once((ObjectKey::from(object_reference), wrapper)),
512        )?;
513        wb.write()?;
514        Ok(())
515    }
516}
517
518impl ObjectStore for AuthorityPerpetualTables {
519    /// Read an object and return it, or Ok(None) if the object was not found.
520    fn get_object(
521        &self,
522        object_id: &ObjectID,
523    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
524        let obj_entry = self
525            .objects
526            .unbounded_iter()
527            .skip_prior_to(&ObjectKey::max_for_id(object_id))
528            .map_err(iota_types::storage::error::Error::custom)?
529            .next();
530
531        match obj_entry {
532            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
533                .object(&ObjectKey(obj_id, version), obj)
534                .map_err(iota_types::storage::error::Error::custom)?),
535            _ => Ok(None),
536        }
537    }
538
539    fn get_object_by_key(
540        &self,
541        object_id: &ObjectID,
542        version: VersionNumber,
543    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
544        Ok(self
545            .objects
546            .get(&ObjectKey(*object_id, version))
547            .map_err(iota_types::storage::error::Error::custom)?
548            .map(|object| self.object(&ObjectKey(*object_id, version), object))
549            .transpose()
550            .map_err(iota_types::storage::error::Error::custom)?
551            .flatten())
552    }
553}
554
555pub struct LiveSetIter<'a> {
556    iter:
557        <DBMap<ObjectKey, StoreObjectWrapper> as Map<'a, ObjectKey, StoreObjectWrapper>>::Iterator,
558    tables: &'a AuthorityPerpetualTables,
559    prev: Option<(ObjectKey, StoreObjectWrapper)>,
560}
561
562#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
563pub enum LiveObject {
564    Normal(Object),
565    Wrapped(ObjectKey),
566}
567
568impl LiveObject {
569    pub fn object_id(&self) -> ObjectID {
570        match self {
571            LiveObject::Normal(obj) => obj.id(),
572            LiveObject::Wrapped(key) => key.0,
573        }
574    }
575
576    pub fn version(&self) -> SequenceNumber {
577        match self {
578            LiveObject::Normal(obj) => obj.version(),
579            LiveObject::Wrapped(key) => key.1,
580        }
581    }
582
583    pub fn object_reference(&self) -> ObjectRef {
584        match self {
585            LiveObject::Normal(obj) => obj.compute_object_reference(),
586            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
587        }
588    }
589
590    pub fn to_normal(self) -> Option<Object> {
591        match self {
592            LiveObject::Normal(object) => Some(object),
593            LiveObject::Wrapped(_) => None,
594        }
595    }
596}
597
598impl LiveSetIter<'_> {
599    fn store_object_wrapper_to_live_object(
600        &self,
601        object_key: ObjectKey,
602        store_object: StoreObjectWrapper,
603    ) -> Option<LiveObject> {
604        match store_object.migrate().into_inner() {
605            StoreObject::Value(object) => {
606                let object = self
607                    .tables
608                    .construct_object(&object_key, object)
609                    .expect("Constructing object from store cannot fail");
610                Some(LiveObject::Normal(object))
611            }
612            StoreObject::Wrapped | StoreObject::Deleted => None,
613        }
614    }
615}
616
617impl Iterator for LiveSetIter<'_> {
618    type Item = LiveObject;
619
620    fn next(&mut self) -> Option<Self::Item> {
621        loop {
622            if let Some((next_key, next_value)) = self.iter.next() {
623                let prev = self.prev.take();
624                self.prev = Some((next_key, next_value));
625
626                if let Some((prev_key, prev_value)) = prev {
627                    if prev_key.0 != next_key.0 {
628                        let live_object =
629                            self.store_object_wrapper_to_live_object(prev_key, prev_value);
630                        if live_object.is_some() {
631                            return live_object;
632                        }
633                    }
634                }
635                continue;
636            }
637            if let Some((key, value)) = self.prev.take() {
638                let live_object = self.store_object_wrapper_to_live_object(key, value);
639                if live_object.is_some() {
640                    return live_object;
641                }
642            }
643            return None;
644        }
645    }
646}
647
648// These functions are used to initialize the DB tables
649fn live_owned_object_markers_table_config(db_options: DBOptions) -> DBOptions {
650    DBOptions {
651        options: db_options
652            .clone()
653            .optimize_for_write_throughput()
654            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
655            .options,
656        rw_options: db_options.rw_options.set_ignore_range_deletions(false),
657    }
658}
659
660fn objects_table_config(db_options: DBOptions) -> DBOptions {
661    db_options
662        .optimize_for_write_throughput()
663        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
664}
665
666fn transactions_table_config(db_options: DBOptions) -> DBOptions {
667    db_options
668        .optimize_for_write_throughput()
669        .optimize_for_point_lookup(
670            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
671        )
672}
673
674fn effects_table_config(db_options: DBOptions) -> DBOptions {
675    db_options
676        .optimize_for_write_throughput()
677        .optimize_for_point_lookup(
678            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
679        )
680}
681
682fn events_table_config(db_options: DBOptions) -> DBOptions {
683    db_options
684        .optimize_for_write_throughput()
685        .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
686}
687
688fn indirect_move_objects_table_config(mut db_options: DBOptions) -> DBOptions {
689    db_options = db_options
690        .optimize_for_write_throughput()
691        .optimize_for_point_lookup(
692            read_size_from_env(ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(512),
693        );
694    db_options.options.set_merge_operator(
695        "refcount operator",
696        reference_count_merge_operator,
697        reference_count_merge_operator,
698    );
699    db_options
700        .options
701        .set_compaction_filter("empty filter", empty_compaction_filter);
702    db_options
703}