iota_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::Path;
6
7use iota_types::{
8    base_types::SequenceNumber,
9    digests::TransactionEventsDigest,
10    effects::{TransactionEffects, TransactionEvents},
11    global_state_hash::GlobalStateHash,
12    storage::MarkerValue,
13};
14use serde::{Deserialize, Serialize};
15use tracing::error;
16use typed_store::{
17    DBMapUtils, DbIterator,
18    metrics::SamplingInterval,
19    rocks::{
20        DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
21        read_size_from_env,
22    },
23    rocksdb::compaction_filter::Decision,
24    traits::Map,
25};
26
27use super::*;
28use crate::authority::{
29    authority_store_pruner::ObjectsCompactionFilter,
30    authority_store_types::{
31        StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
32    },
33    epoch_start_configuration::EpochStartConfiguration,
34};
35
36const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
37pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
38const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
39const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
40const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
41
42/// Options to apply to every column family of the `perpetual` DB.
43#[derive(Default)]
44pub struct AuthorityPerpetualTablesOptions {
45    /// Whether to enable write stalling on all column families.
46    pub enable_write_stall: bool,
47    pub compaction_filter: Option<ObjectsCompactionFilter>,
48}
49
50impl AuthorityPerpetualTablesOptions {
51    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
52        if !self.enable_write_stall {
53            db_options = db_options.disable_write_throttling();
54        }
55        db_options
56    }
57}
58
59/// AuthorityPerpetualTables contains data that must be preserved from one epoch
60/// to the next.
61#[derive(DBMapUtils)]
62pub struct AuthorityPerpetualTables {
63    /// This is a map between the object (ID, version) and the latest state of
64    /// the object, namely the state that is needed to process new
65    /// transactions. State is represented by `StoreObject` enum, which is
66    /// either a move module or a move object.
67    ///
68    /// Note that while this map can store all versions of an object, we will
69    /// eventually prune old object versions from the db.
70    ///
71    /// IMPORTANT: object versions must *only* be pruned if they appear as
72    /// inputs in some TransactionEffects. Simply pruning all objects but
73    /// the most recent is an error! This is because there can be partially
74    /// executed transactions whose effects have not yet been written out,
75    /// and which must be retried. But, they cannot be retried unless their
76    /// input objects are still accessible!
77    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
78
79    /// Object references of currently active objects that can be mutated.
80    pub(crate) live_owned_object_markers: DBMap<ObjectRef, ()>,
81
82    /// This is a map between the transaction digest and the corresponding
83    /// transaction that's known to be executable. This means that it may
84    /// have been executed locally, or it may have been synced through
85    /// state-sync but hasn't been executed yet.
86    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
87
88    /// A map between the transaction digest of a certificate to the effects of
89    /// its execution. We store effects into this table in two different
90    /// cases:
91    /// 1. When a transaction is synced through state_sync, we store the effects
92    ///    here. These effects are known to be final in the network, but may not
93    ///    have been executed locally yet.
94    /// 2. When the transaction is executed locally on this node, we store the
95    ///    effects here. This means that it's possible to store the same effects
96    ///    twice (once for the synced transaction, and once for the executed).
97    ///
98    /// It's also possible for the effects to be reverted if the transaction
99    /// didn't make it into the epoch.
100    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
101
102    /// Transactions that have been executed locally on this node. We need this
103    /// table since the `effects` table doesn't say anything about the
104    /// execution status of the transaction on this node. When we wait for
105    /// transactions to be executed, we wait for them to appear in this
106    /// table. When we revert transactions, we remove them from both tables.
107    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
108
109    // Currently this is needed in the validator for returning events during process certificates.
110    // We could potentially remove this if we decided not to provide events in the execution path.
111    // TODO: Figure out what to do with this table in the long run.
112    // Also we need a pruning policy for this table. We can prune this table along with tx/effects.
113    pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
114
115    // Events keyed by the digest of the transaction that produced them.
116    pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
117
118    /// Epoch and checkpoint of transactions finalized by checkpoint
119    /// executor. Currently, mainly used to implement JSON RPC `ReadApi`.
120    /// Note, there is a table with the same name in
121    /// `AuthorityEpochTables`/`AuthorityPerEpochStore`.
122    pub(crate) executed_transactions_to_checkpoint:
123        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
124
125    // Finalized root state hash for epoch, to be included in CheckpointSummary
126    // of last checkpoint of epoch. These values should only ever be written once
127    // and never changed
128    pub(crate) root_state_hash_by_epoch:
129        DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
130
131    /// Parameters of the system fixed at the epoch start
132    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
133
134    /// A singleton table that stores latest pruned checkpoint. Used to keep
135    /// objects pruner progress
136    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
137
138    /// The total IOTA supply and the epoch at which it was stored.
139    /// We check and update it at the end of each epoch if expensive checks are
140    /// enabled.
141    pub(crate) total_iota_supply: DBMap<(), TotalIotaSupplyCheck>,
142
143    /// Expected imbalance between storage fund balance and the sum of storage
144    /// rebate of all live objects. This could be non-zero due to bugs in
145    /// earlier protocol versions. This number is the result of
146    /// storage_fund_balance - sum(storage_rebate).
147    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
148
149    /// Table that stores the set of received objects and deleted objects and
150    /// the version at which they were received. This is used to prevent
151    /// possible race conditions around receiving objects (since they are
152    /// not locked by the transaction manager) and for tracking shared
153    /// objects that have been deleted. This table is meant to be pruned
154    /// per-epoch, and all previous epochs other than the current epoch may
155    /// be pruned safely.
156    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
157}
158
159#[derive(DBMapUtils)]
160pub struct AuthorityPrunerTables {
161    pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
162}
163
164impl AuthorityPrunerTables {
165    pub fn path(parent_path: &Path) -> PathBuf {
166        parent_path.join("pruner")
167    }
168
169    pub fn open(parent_path: &Path) -> Self {
170        Self::open_tables_read_write(
171            Self::path(parent_path),
172            MetricConf::new("pruner")
173                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
174            None,
175            None,
176        )
177    }
178}
179
180/// The total IOTA supply used during conservation checks.
181#[derive(Debug, Serialize, Deserialize)]
182pub(crate) struct TotalIotaSupplyCheck {
183    /// The IOTA supply at the time of `last_check_epoch`.
184    pub(crate) total_supply: u64,
185    /// The epoch at which the total supply was last checked or updated.
186    pub(crate) last_check_epoch: EpochId,
187}
188
189impl AuthorityPerpetualTables {
190    pub fn path(parent_path: &Path) -> PathBuf {
191        parent_path.join("perpetual")
192    }
193
194    pub fn open(
195        parent_path: &Path,
196        db_options_override: Option<AuthorityPerpetualTablesOptions>,
197    ) -> Self {
198        let db_options_override = db_options_override.unwrap_or_default();
199        let db_options =
200            db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
201        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
202            (
203                "objects".to_string(),
204                objects_table_config(db_options.clone(), db_options_override.compaction_filter),
205            ),
206            (
207                "live_owned_object_markers".to_string(),
208                live_owned_object_markers_table_config(db_options.clone()),
209            ),
210            (
211                "transactions".to_string(),
212                transactions_table_config(db_options.clone()),
213            ),
214            (
215                "effects".to_string(),
216                effects_table_config(db_options.clone()),
217            ),
218            (
219                "events".to_string(),
220                events_table_config(db_options.clone()),
221            ),
222        ]));
223        Self::open_tables_read_write(
224            Self::path(parent_path),
225            MetricConf::new("perpetual")
226                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
227            Some(db_options.options),
228            Some(table_options),
229        )
230    }
231
232    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
233        Self::get_read_only_handle(
234            Self::path(parent_path),
235            None,
236            None,
237            MetricConf::new("perpetual_readonly"),
238        )
239    }
240
241    // This is used by indexer to find the correct version of dynamic field child
242    // object. We do not store the version of the child object, but because of
243    // lamport timestamp, we know the child must have version number less then
244    // or eq to the parent.
245    pub fn find_object_lt_or_eq_version(
246        &self,
247        object_id: ObjectID,
248        version: SequenceNumber,
249    ) -> IotaResult<Option<Object>> {
250        let mut iter = self.objects.reversed_safe_iter_with_bounds(
251            Some(ObjectKey::min_for_id(&object_id)),
252            Some(ObjectKey(object_id, version)),
253        )?;
254        match iter.next() {
255            Some(Ok((key, o))) => self.object(&key, o),
256            Some(Err(e)) => Err(e.into()),
257            None => Ok(None),
258        }
259    }
260
261    fn construct_object(
262        &self,
263        object_key: &ObjectKey,
264        store_object: StoreObjectValue,
265    ) -> Result<Object, IotaError> {
266        try_construct_object(object_key, store_object)
267    }
268
269    // Constructs `iota_types::object::Object` from `StoreObjectWrapper`.
270    // Returns `None` if object was deleted/wrapped
271    pub fn object(
272        &self,
273        object_key: &ObjectKey,
274        store_object: StoreObjectWrapper,
275    ) -> Result<Option<Object>, IotaError> {
276        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
277            return Ok(None);
278        };
279        Ok(Some(self.construct_object(object_key, *store_object)?))
280    }
281
282    pub fn object_reference(
283        &self,
284        object_key: &ObjectKey,
285        store_object: StoreObjectWrapper,
286    ) -> Result<ObjectRef, IotaError> {
287        let obj_ref = match store_object.migrate().into_inner() {
288            StoreObject::Value(object) => self
289                .construct_object(object_key, *object)?
290                .compute_object_reference(),
291            StoreObject::Deleted => {
292                ObjectRef::new(object_key.0, object_key.1, ObjectDigest::OBJECT_DELETED)
293            }
294            StoreObject::Wrapped => {
295                ObjectRef::new(object_key.0, object_key.1, ObjectDigest::OBJECT_WRAPPED)
296            }
297        };
298        Ok(obj_ref)
299    }
300
301    pub fn tombstone_reference(
302        &self,
303        object_key: &ObjectKey,
304        store_object: &StoreObjectWrapper,
305    ) -> Result<Option<ObjectRef>, IotaError> {
306        let obj_ref = match store_object.inner() {
307            StoreObject::Deleted => Some(ObjectRef::new(
308                object_key.0,
309                object_key.1,
310                ObjectDigest::OBJECT_DELETED,
311            )),
312            StoreObject::Wrapped => Some(ObjectRef::new(
313                object_key.0,
314                object_key.1,
315                ObjectDigest::OBJECT_WRAPPED,
316            )),
317            _ => None,
318        };
319        Ok(obj_ref)
320    }
321
322    pub fn get_latest_object_ref_or_tombstone(
323        &self,
324        object_id: ObjectID,
325    ) -> Result<Option<ObjectRef>, IotaError> {
326        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
327            Some(ObjectKey::min_for_id(&object_id)),
328            Some(ObjectKey::max_for_id(&object_id)),
329        )?;
330
331        if let Some(Ok((object_key, value))) = iterator.next() {
332            if object_key.0 == object_id {
333                return Ok(Some(self.object_reference(&object_key, value)?));
334            }
335        }
336        Ok(None)
337    }
338
339    pub fn get_latest_object_or_tombstone(
340        &self,
341        object_id: ObjectID,
342    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, IotaError> {
343        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
344            Some(ObjectKey::min_for_id(&object_id)),
345            Some(ObjectKey::max_for_id(&object_id)),
346        )?;
347
348        if let Some(Ok((object_key, value))) = iterator.next() {
349            if object_key.0 == object_id {
350                return Ok(Some((object_key, value)));
351            }
352        }
353        Ok(None)
354    }
355
356    pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
357        Ok(self
358            .epoch_start_configuration
359            .get(&())?
360            .expect("Must have current epoch.")
361            .epoch_start_state()
362            .epoch())
363    }
364
365    pub fn set_epoch_start_configuration(
366        &self,
367        epoch_start_configuration: &EpochStartConfiguration,
368    ) -> IotaResult {
369        let mut wb = self.epoch_start_configuration.batch();
370        wb.insert_batch(
371            &self.epoch_start_configuration,
372            std::iter::once(((), epoch_start_configuration)),
373        )?;
374        wb.write()?;
375        Ok(())
376    }
377
378    pub fn get_highest_pruned_checkpoint(
379        &self,
380    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
381        self.pruned_checkpoint.get(&())
382    }
383
384    pub fn set_highest_pruned_checkpoint(
385        &self,
386        wb: &mut DBBatch,
387        checkpoint_number: CheckpointSequenceNumber,
388    ) -> IotaResult {
389        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
390        Ok(())
391    }
392
393    pub fn get_transaction(
394        &self,
395        digest: &TransactionDigest,
396    ) -> IotaResult<Option<TrustedTransaction>> {
397        let Some(transaction) = self.transactions.get(digest)? else {
398            return Ok(None);
399        };
400        Ok(Some(transaction))
401    }
402
403    pub fn get_effects(
404        &self,
405        digest: &TransactionDigest,
406    ) -> IotaResult<Option<TransactionEffects>> {
407        let Some(effect_digest) = self.executed_effects.get(digest)? else {
408            return Ok(None);
409        };
410        Ok(self.effects.get(&effect_digest)?)
411    }
412
413    pub fn get_checkpoint_sequence_number(
414        &self,
415        digest: &TransactionDigest,
416    ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
417        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
418    }
419
420    pub fn get_newer_object_keys(
421        &self,
422        object: &(ObjectID, SequenceNumber),
423    ) -> IotaResult<Vec<ObjectKey>> {
424        let mut objects = vec![];
425        for result in self.objects.safe_iter_with_bounds(
426            Some(ObjectKey(object.0, object.1.next().unwrap())),
427            Some(ObjectKey(object.0, VersionNumber::MAX_VALID_EXCL)),
428        ) {
429            let (key, _) = result?;
430            objects.push(key);
431        }
432        Ok(objects)
433    }
434
435    pub fn set_highest_pruned_checkpoint_without_wb(
436        &self,
437        checkpoint_number: CheckpointSequenceNumber,
438    ) -> IotaResult {
439        let mut wb = self.pruned_checkpoint.batch();
440        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
441        wb.write()?;
442        Ok(())
443    }
444
445    pub fn database_is_empty(&self) -> IotaResult<bool> {
446        Ok(self.objects.safe_iter().next().is_none())
447    }
448
449    pub fn iter_live_object_set(&self) -> LiveSetIter<'_> {
450        LiveSetIter {
451            iter: Box::new(self.objects.safe_iter()),
452            tables: self,
453            prev: None,
454        }
455    }
456
457    pub fn range_iter_live_object_set(
458        &self,
459        lower_bound: Option<ObjectID>,
460        upper_bound: Option<ObjectID>,
461    ) -> LiveSetIter<'_> {
462        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
463        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
464
465        LiveSetIter {
466            iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
467            tables: self,
468            prev: None,
469        }
470    }
471
472    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
473        // This checkpoints the entire db and not just objects table
474        self.objects.checkpoint_db(path).map_err(Into::into)
475    }
476
477    pub fn get_root_state_hash(
478        &self,
479        epoch: EpochId,
480    ) -> IotaResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
481        Ok(self.root_state_hash_by_epoch.get(&epoch)?)
482    }
483
484    pub fn insert_root_state_hash(
485        &self,
486        epoch: EpochId,
487        last_checkpoint_of_epoch: CheckpointSequenceNumber,
488        hash: GlobalStateHash,
489    ) -> IotaResult {
490        self.root_state_hash_by_epoch
491            .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
492        Ok(())
493    }
494
495    pub fn insert_object_test_only(&self, object: Object) -> IotaResult {
496        let object_reference = object.compute_object_reference();
497        let wrapper = get_store_object(object);
498        let mut wb = self.objects.batch();
499        wb.insert_batch(
500            &self.objects,
501            std::iter::once((ObjectKey::from(object_reference), wrapper)),
502        )?;
503        wb.write()?;
504        Ok(())
505    }
506}
507
508impl ObjectStore for AuthorityPerpetualTables {
509    /// Read an object and return it, or Ok(None) if the object was not found.
510    fn try_get_object(
511        &self,
512        object_id: &ObjectID,
513    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
514        let obj_entry = self
515            .objects
516            .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))
517            .map_err(iota_types::storage::error::Error::custom)?
518            .next();
519
520        match obj_entry.transpose()? {
521            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
522                .object(&ObjectKey(obj_id, version), obj)
523                .map_err(iota_types::storage::error::Error::custom)?),
524            _ => Ok(None),
525        }
526    }
527
528    fn try_get_object_by_key(
529        &self,
530        object_id: &ObjectID,
531        version: VersionNumber,
532    ) -> Result<Option<Object>, iota_types::storage::error::Error> {
533        Ok(self
534            .objects
535            .get(&ObjectKey(*object_id, version))
536            .map_err(iota_types::storage::error::Error::custom)?
537            .map(|object| self.object(&ObjectKey(*object_id, version), object))
538            .transpose()
539            .map_err(iota_types::storage::error::Error::custom)?
540            .flatten())
541    }
542}
543
544pub struct LiveSetIter<'a> {
545    iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
546    tables: &'a AuthorityPerpetualTables,
547    prev: Option<(ObjectKey, StoreObjectWrapper)>,
548}
549
550#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
551pub enum LiveObject {
552    Normal(Object),
553    Wrapped(ObjectKey),
554}
555
556impl LiveObject {
557    pub fn object_id(&self) -> ObjectID {
558        match self {
559            LiveObject::Normal(obj) => obj.id(),
560            LiveObject::Wrapped(key) => key.0,
561        }
562    }
563
564    pub fn version(&self) -> SequenceNumber {
565        match self {
566            LiveObject::Normal(obj) => obj.version(),
567            LiveObject::Wrapped(key) => key.1,
568        }
569    }
570
571    pub fn object_reference(&self) -> ObjectRef {
572        match self {
573            LiveObject::Normal(obj) => obj.compute_object_reference(),
574            LiveObject::Wrapped(key) => ObjectRef::new(key.0, key.1, ObjectDigest::OBJECT_WRAPPED),
575        }
576    }
577
578    pub fn to_normal(self) -> Option<Object> {
579        match self {
580            LiveObject::Normal(object) => Some(object),
581            LiveObject::Wrapped(_) => None,
582        }
583    }
584}
585
586impl LiveSetIter<'_> {
587    fn store_object_wrapper_to_live_object(
588        &self,
589        object_key: ObjectKey,
590        store_object: StoreObjectWrapper,
591    ) -> Option<LiveObject> {
592        match store_object.migrate().into_inner() {
593            StoreObject::Value(object) => {
594                let object = self
595                    .tables
596                    .construct_object(&object_key, *object)
597                    .expect("Constructing object from store cannot fail");
598                Some(LiveObject::Normal(object))
599            }
600            StoreObject::Wrapped | StoreObject::Deleted => None,
601        }
602    }
603}
604
605impl Iterator for LiveSetIter<'_> {
606    type Item = LiveObject;
607
608    fn next(&mut self) -> Option<Self::Item> {
609        loop {
610            if let Some(Ok((next_key, next_value))) = self.iter.next() {
611                let prev = self.prev.take();
612                self.prev = Some((next_key, next_value));
613
614                if let Some((prev_key, prev_value)) = prev {
615                    if prev_key.0 != next_key.0 {
616                        let live_object =
617                            self.store_object_wrapper_to_live_object(prev_key, prev_value);
618                        if live_object.is_some() {
619                            return live_object;
620                        }
621                    }
622                }
623                continue;
624            }
625            if let Some((key, value)) = self.prev.take() {
626                let live_object = self.store_object_wrapper_to_live_object(key, value);
627                if live_object.is_some() {
628                    return live_object;
629                }
630            }
631            return None;
632        }
633    }
634}
635
636// These functions are used to initialize the DB tables
637fn live_owned_object_markers_table_config(db_options: DBOptions) -> DBOptions {
638    DBOptions {
639        options: db_options
640            .clone()
641            .optimize_for_write_throughput()
642            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
643            .options,
644        rw_options: db_options.rw_options,
645    }
646}
647
648fn objects_table_config(
649    mut db_options: DBOptions,
650    compaction_filter: Option<ObjectsCompactionFilter>,
651) -> DBOptions {
652    if let Some(mut compaction_filter) = compaction_filter {
653        db_options
654            .options
655            .set_compaction_filter("objects", move |_, key, value| {
656                match compaction_filter.filter(key, value) {
657                    Ok(decision) => decision,
658                    Err(err) => {
659                        error!("Compaction error: {:?}", err);
660                        Decision::Keep
661                    }
662                }
663            });
664    }
665    db_options
666        .optimize_for_write_throughput()
667        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
668}
669
670fn transactions_table_config(db_options: DBOptions) -> DBOptions {
671    db_options
672        .optimize_for_write_throughput()
673        .optimize_for_point_lookup(
674            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
675        )
676}
677
678fn effects_table_config(db_options: DBOptions) -> DBOptions {
679    db_options
680        .optimize_for_write_throughput()
681        .optimize_for_point_lookup(
682            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
683        )
684}
685
686fn events_table_config(db_options: DBOptions) -> DBOptions {
687    db_options
688        .optimize_for_write_throughput()
689        .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
690}