iota_core/
rest_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    path::PathBuf,
8    sync::{Arc, Mutex},
9    time::Instant,
10};
11
12use iota_types::{
13    base_types::{IotaAddress, MoveObjectType, ObjectID, SequenceNumber},
14    digests::TransactionDigest,
15    dynamic_field::visitor as DFV,
16    full_checkpoint_content::CheckpointData,
17    layout_resolver::LayoutResolver,
18    messages_checkpoint::CheckpointContents,
19    object::{Object, Owner},
20    storage::{
21        BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, error::Error as StorageError,
22    },
23};
24use move_core_types::language_storage::StructTag;
25use rayon::iter::{IntoParallelIterator, ParallelIterator};
26use serde::{Deserialize, Serialize};
27use tracing::{debug, info};
28use typed_store::{
29    DBMapUtils, TypedStoreError,
30    rocks::{DBMap, MetricConf},
31    traits::{Map, TableSummary, TypedStoreDebug},
32};
33
34use crate::{
35    authority::{AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore},
36    checkpoints::CheckpointStore,
37    par_index_live_object_set::{LiveObjectIndexer, ParMakeLiveObjectIndexer},
38};
39
40const CURRENT_DB_VERSION: u64 = 0;
41
42#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
43struct MetadataInfo {
44    /// Version of the Database
45    version: u64,
46}
47
48#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
49pub struct OwnerIndexKey {
50    pub owner: IotaAddress,
51    pub object_id: ObjectID,
52}
53
54impl OwnerIndexKey {
55    fn new(owner: IotaAddress, object_id: ObjectID) -> Self {
56        Self { owner, object_id }
57    }
58}
59
60#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
61pub struct OwnerIndexInfo {
62    // object_id of the object is a part of the Key
63    pub version: SequenceNumber,
64    pub type_: MoveObjectType,
65}
66
67impl OwnerIndexInfo {
68    pub fn new(object: &Object) -> Self {
69        Self {
70            version: object.version(),
71            type_: object.type_().expect("packages cannot be owned").to_owned(),
72        }
73    }
74}
75
76#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
77pub struct TransactionInfo {
78    pub checkpoint: u64,
79}
80
81#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
82pub struct CoinIndexKey {
83    coin_type: StructTag,
84}
85
86#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
87pub struct CoinIndexInfo {
88    pub coin_metadata_object_id: Option<ObjectID>,
89    pub treasury_object_id: Option<ObjectID>,
90}
91
92impl CoinIndexInfo {
93    fn merge(self, other: Self) -> Self {
94        Self {
95            coin_metadata_object_id: self
96                .coin_metadata_object_id
97                .or(other.coin_metadata_object_id),
98            treasury_object_id: self.treasury_object_id.or(other.treasury_object_id),
99        }
100    }
101}
102
103/// RocksDB tables for the RestIndexStore
104///
105/// Anytime a new table is added, or and existing one has it's schema changed,
106/// make sure to also update the value of `CURRENT_DB_VERSION`.
107///
108/// NOTE: Authors and Reviewers before adding any new tables ensure that they
109/// are either:
110/// - bounded in size by the live object set
111/// - are prune-able and have corresponding logic in the `prune` function
112#[derive(DBMapUtils)]
113struct IndexStoreTables {
114    /// A singleton that store metadata information on the DB.
115    ///
116    /// A few uses for this singleton:
117    /// - determining if the DB has been initialized (as some tables will still
118    ///   be empty post initialization)
119    /// - version of the DB. Everytime a new table or schema is changed the
120    ///   version number needs to be incremented.
121    meta: DBMap<(), MetadataInfo>,
122
123    /// An index of extra metadata for Transactions.
124    ///
125    /// Only contains entries for transactions which have yet to be pruned from
126    /// the main database.
127    transactions: DBMap<TransactionDigest, TransactionInfo>,
128
129    /// An index of object ownership.
130    ///
131    /// Allows an efficient iterator to list all objects currently owned by a
132    /// specific user account.
133    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
134
135    /// An index of dynamic fields (children objects).
136    ///
137    /// Allows an efficient iterator to list all of the dynamic fields owned by
138    /// a particular ObjectID.
139    dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
140
141    /// An index of Coin Types
142    ///
143    /// Allows looking up information related to published Coins, like the
144    /// ObjectID of its coorisponding CoinMetadata.
145    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
146    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
147    // - bounded in size by the live object set
148    // - are prune-able and have corresponding logic in the `prune` function
149}
150
151impl IndexStoreTables {
152    fn open<P: Into<PathBuf>>(path: P) -> Self {
153        IndexStoreTables::open_tables_read_write(
154            path.into(),
155            MetricConf::new("rest-index"),
156            None,
157            None,
158        )
159    }
160
161    fn needs_to_do_initialization(&self) -> bool {
162        match self.meta.get(&()) {
163            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
164            Ok(None) => true,
165            Err(_) => true,
166        }
167    }
168
169    fn needs_to_delete_old_db(&self) -> bool {
170        match self.meta.get(&()) {
171            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
172            Ok(None) => false,
173            Err(_) => true,
174        }
175    }
176
177    fn init(
178        &mut self,
179        authority_store: &AuthorityStore,
180        checkpoint_store: &CheckpointStore,
181        epoch_store: &AuthorityPerEpochStore,
182        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
183    ) -> Result<(), StorageError> {
184        info!("Initializing REST indexes");
185
186        // Iterate through available, executed checkpoints that have yet to be pruned
187        // to initialize checkpoint and transaction based indexes.
188        if let Some(highest_executed_checkpoint) =
189            checkpoint_store.get_highest_executed_checkpoint_seq_number()?
190        {
191            let lowest_available_checkpoint = checkpoint_store
192                .get_highest_pruned_checkpoint_seq_number()?
193                .saturating_add(1);
194
195            let checkpoint_range = lowest_available_checkpoint..=highest_executed_checkpoint;
196
197            info!(
198                "Indexing {} checkpoints in range {checkpoint_range:?}",
199                checkpoint_range.size_hint().0
200            );
201            let start_time = Instant::now();
202
203            checkpoint_range.into_par_iter().try_for_each(|seq| {
204                let checkpoint = checkpoint_store
205                    .get_checkpoint_by_sequence_number(seq)?
206                    .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
207                let contents = checkpoint_store
208                    .get_checkpoint_contents(&checkpoint.content_digest)?
209                    .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
210
211                let info = TransactionInfo {
212                    checkpoint: checkpoint.sequence_number,
213                };
214
215                self.transactions
216                    .multi_insert(contents.iter().map(|digests| (digests.transaction, info)))
217                    .map_err(StorageError::from)
218            })?;
219
220            info!(
221                "Indexing checkpoints took {} seconds",
222                start_time.elapsed().as_secs()
223            );
224        }
225
226        let coin_index = Mutex::new(HashMap::new());
227
228        let make_live_object_indexer = RestParLiveObjectSetIndexer {
229            tables: self,
230            coin_index: &coin_index,
231            epoch_store,
232            package_store,
233        };
234
235        crate::par_index_live_object_set::par_index_live_object_set(
236            authority_store,
237            &make_live_object_indexer,
238        )?;
239
240        self.coin.multi_insert(coin_index.into_inner().unwrap())?;
241
242        self.meta.insert(
243            &(),
244            &MetadataInfo {
245                version: CURRENT_DB_VERSION,
246            },
247        )?;
248
249        info!("Finished initializing REST indexes");
250
251        Ok(())
252    }
253
254    /// Prune data from this Index
255    fn prune(
256        &self,
257        checkpoint_contents_to_prune: &[CheckpointContents],
258    ) -> Result<(), TypedStoreError> {
259        let mut batch = self.transactions.batch();
260
261        let transactions_to_prune = checkpoint_contents_to_prune
262            .iter()
263            .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
264
265        batch.delete_batch(&self.transactions, transactions_to_prune)?;
266
267        batch.write()
268    }
269
270    /// Index a Checkpoint
271    fn index_checkpoint(
272        &self,
273        checkpoint: &CheckpointData,
274        resolver: &mut dyn LayoutResolver,
275    ) -> Result<typed_store::rocks::DBBatch, StorageError> {
276        debug!(
277            checkpoint = checkpoint.checkpoint_summary.sequence_number,
278            "indexing checkpoint"
279        );
280
281        let mut batch = self.transactions.batch();
282
283        // transactions index
284        {
285            let info = TransactionInfo {
286                checkpoint: checkpoint.checkpoint_summary.sequence_number,
287            };
288
289            batch.insert_batch(
290                &self.transactions,
291                checkpoint
292                    .checkpoint_contents
293                    .iter()
294                    .map(|digests| (digests.transaction, info)),
295            )?;
296        }
297
298        // object indexes
299        {
300            let mut coin_index = HashMap::new();
301
302            for tx in &checkpoint.transactions {
303                // determine changes from removed objects
304                for removed_object in tx.removed_objects_pre_version() {
305                    match removed_object.owner() {
306                        Owner::AddressOwner(address) => {
307                            let owner_key = OwnerIndexKey::new(*address, removed_object.id());
308                            batch.delete_batch(&self.owner, [owner_key])?;
309                        }
310                        Owner::ObjectOwner(object_id) => {
311                            batch.delete_batch(
312                                &self.dynamic_field,
313                                [DynamicFieldKey::new(*object_id, removed_object.id())],
314                            )?;
315                        }
316                        Owner::Shared { .. } | Owner::Immutable => {}
317                    }
318                }
319
320                // determine changes from changed objects
321                for (object, old_object) in tx.changed_objects() {
322                    if let Some(old_object) = old_object {
323                        if old_object.owner() != object.owner() {
324                            match old_object.owner() {
325                                Owner::AddressOwner(address) => {
326                                    let owner_key = OwnerIndexKey::new(*address, old_object.id());
327                                    batch.delete_batch(&self.owner, [owner_key])?;
328                                }
329
330                                Owner::ObjectOwner(object_id) => {
331                                    batch.delete_batch(
332                                        &self.dynamic_field,
333                                        [DynamicFieldKey::new(*object_id, old_object.id())],
334                                    )?;
335                                }
336
337                                Owner::Shared { .. } | Owner::Immutable => {}
338                            }
339                        }
340                    }
341
342                    match object.owner() {
343                        Owner::AddressOwner(owner) => {
344                            let owner_key = OwnerIndexKey::new(*owner, object.id());
345                            let owner_info = OwnerIndexInfo::new(object);
346                            batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
347                        }
348                        Owner::ObjectOwner(parent) => {
349                            if let Some(field_info) =
350                                try_create_dynamic_field_info(object, resolver)
351                                    .ok()
352                                    .flatten()
353                            {
354                                let field_key = DynamicFieldKey::new(*parent, object.id());
355
356                                batch
357                                    .insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
358                            }
359                        }
360                        Owner::Shared { .. } | Owner::Immutable => {}
361                    }
362                }
363
364                // coin indexing
365                //
366                // coin indexing relies on the fact that CoinMetadata and TreasuryCap are
367                // created in the same transaction so we don't need to worry
368                // about overriding any older value that may exist in the
369                // database (because there necessarily cannot be).
370                for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
371                    use std::collections::hash_map::Entry;
372
373                    match coin_index.entry(key) {
374                        Entry::Occupied(o) => {
375                            let (key, v) = o.remove_entry();
376                            let value = value.merge(v);
377                            batch.insert_batch(&self.coin, [(key, value)])?;
378                        }
379                        Entry::Vacant(v) => {
380                            v.insert(value);
381                        }
382                    }
383                }
384            }
385
386            batch.insert_batch(&self.coin, coin_index)?;
387        }
388
389        debug!(
390            checkpoint = checkpoint.checkpoint_summary.sequence_number,
391            "finished indexing checkpoint"
392        );
393
394        Ok(batch)
395    }
396
397    fn get_transaction_info(
398        &self,
399        digest: &TransactionDigest,
400    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
401        self.transactions.get(digest)
402    }
403
404    fn owner_iter(
405        &self,
406        owner: IotaAddress,
407        cursor: Option<ObjectID>,
408    ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
409        let lower_bound = OwnerIndexKey::new(owner, cursor.unwrap_or(ObjectID::ZERO));
410        let upper_bound = OwnerIndexKey::new(owner, ObjectID::MAX);
411        Ok(self
412            .owner
413            .iter_with_bounds(Some(lower_bound), Some(upper_bound)))
414    }
415
416    fn dynamic_field_iter(
417        &self,
418        parent: ObjectID,
419        cursor: Option<ObjectID>,
420    ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
421    {
422        let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
423        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
424        let iter = self
425            .dynamic_field
426            .iter_with_bounds(Some(lower_bound), Some(upper_bound));
427        Ok(iter)
428    }
429
430    fn get_coin_info(
431        &self,
432        coin_type: &StructTag,
433    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
434        let key = CoinIndexKey {
435            coin_type: coin_type.to_owned(),
436        };
437        self.coin.get(&key)
438    }
439}
440
441pub struct RestIndexStore {
442    tables: IndexStoreTables,
443    pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
444}
445
446impl RestIndexStore {
447    pub fn new(
448        path: PathBuf,
449        authority_store: &AuthorityStore,
450        checkpoint_store: &CheckpointStore,
451        epoch_store: &AuthorityPerEpochStore,
452        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
453    ) -> Self {
454        let tables = {
455            let tables = IndexStoreTables::open(&path);
456
457            // If the index tables are uninitialized or on an older version then we need to
458            // populate them
459            if tables.needs_to_do_initialization() {
460                let mut tables = if tables.needs_to_delete_old_db() {
461                    drop(tables);
462                    typed_store::rocks::safe_drop_db(path.clone())
463                        .expect("unable to destroy old rest-index db");
464                    IndexStoreTables::open(path)
465                } else {
466                    tables
467                };
468
469                tables
470                    .init(
471                        authority_store,
472                        checkpoint_store,
473                        epoch_store,
474                        package_store,
475                    )
476                    .expect("unable to initialize rest index from live object set");
477                tables
478            } else {
479                tables
480            }
481        };
482
483        Self {
484            tables,
485            pending_updates: Default::default(),
486        }
487    }
488
489    pub fn new_without_init(path: PathBuf) -> Self {
490        let tables = IndexStoreTables::open(path);
491
492        Self {
493            tables,
494            pending_updates: Default::default(),
495        }
496    }
497
498    pub fn prune(
499        &self,
500        checkpoint_contents_to_prune: &[CheckpointContents],
501    ) -> Result<(), TypedStoreError> {
502        self.tables.prune(checkpoint_contents_to_prune)
503    }
504
505    /// Index a checkpoint and stage the index updated in `pending_updates`.
506    ///
507    /// Updates will not be committed to the database until
508    /// `commit_update_for_checkpoint` is called.
509    pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
510        let sequence_number = checkpoint.checkpoint_summary.sequence_number;
511        let batch = self
512            .tables
513            .index_checkpoint(checkpoint, resolver)
514            .expect("db error");
515
516        self.pending_updates
517            .lock()
518            .unwrap()
519            .insert(sequence_number, batch);
520    }
521
522    /// Commits the pending updates for the provided checkpoint number.
523    ///
524    /// Invariants:
525    /// - `index_checkpoint` must have been called for the provided checkpoint
526    /// - Callers of this function must ensure that it is called for each
527    ///   checkpoint in sequential order. This will panic if the provided
528    ///   checkpoint does not match the expected next checkpoint to commit.
529    pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
530        let next_batch = self.pending_updates.lock().unwrap().pop_first();
531
532        // Its expected that the next batch exists
533        let (next_sequence_number, batch) = next_batch.unwrap();
534        assert_eq!(
535            checkpoint, next_sequence_number,
536            "commit_update_for_checkpoint must be called in order"
537        );
538
539        Ok(batch.write()?)
540    }
541
542    pub fn get_transaction_info(
543        &self,
544        digest: &TransactionDigest,
545    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
546        self.tables.get_transaction_info(digest)
547    }
548
549    pub fn owner_iter(
550        &self,
551        owner: IotaAddress,
552        cursor: Option<ObjectID>,
553    ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
554        self.tables.owner_iter(owner, cursor)
555    }
556
557    pub fn dynamic_field_iter(
558        &self,
559        parent: ObjectID,
560        cursor: Option<ObjectID>,
561    ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
562    {
563        self.tables.dynamic_field_iter(parent, cursor)
564    }
565
566    pub fn get_coin_info(
567        &self,
568        coin_type: &StructTag,
569    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
570        self.tables.get_coin_info(coin_type)
571    }
572}
573
574fn try_create_dynamic_field_info(
575    object: &Object,
576    resolver: &mut dyn LayoutResolver,
577) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
578    // Skip if not a move object
579    let Some(move_object) = object.data.try_as_move() else {
580        return Ok(None);
581    };
582
583    // Skip any objects that aren't of type `Field<Name, Value>`
584    //
585    // All dynamic fields are of type:
586    //   - Field<Name, Value> for dynamic fields
587    //   - Field<Wrapper<Name, ID>> for dynamic field objects where the ID is the id
588    //     of the pointed
589    //   to object
590    //
591    if !move_object.type_().is_dynamic_field() {
592        return Ok(None);
593    }
594
595    let layout = resolver
596        .get_annotated_layout(&move_object.type_().clone().into())
597        .map_err(StorageError::custom)?
598        .into_layout();
599
600    let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
601        .map_err(StorageError::custom)?;
602
603    let value_metadata = field.value_metadata().map_err(StorageError::custom)?;
604
605    Ok(Some(DynamicFieldIndexInfo {
606        name_type: field.name_layout.into(),
607        name_value: field.name_bytes.to_owned(),
608        dynamic_field_type: field.kind,
609        dynamic_object_id: if let DFV::ValueMetadata::DynamicObjectField(id) = value_metadata {
610            Some(id)
611        } else {
612            None
613        },
614    }))
615}
616
617fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
618    use iota_types::coin::{CoinMetadata, TreasuryCap};
619
620    object
621        .type_()
622        .and_then(MoveObjectType::other)
623        .and_then(|object_type| {
624            CoinMetadata::is_coin_metadata_with_coin_type(object_type)
625                .cloned()
626                .map(|coin_type| {
627                    (
628                        CoinIndexKey { coin_type },
629                        CoinIndexInfo {
630                            coin_metadata_object_id: Some(object.id()),
631                            treasury_object_id: None,
632                        },
633                    )
634                })
635                .or_else(|| {
636                    TreasuryCap::is_treasury_with_coin_type(object_type)
637                        .cloned()
638                        .map(|coin_type| {
639                            (
640                                CoinIndexKey { coin_type },
641                                CoinIndexInfo {
642                                    coin_metadata_object_id: None,
643                                    treasury_object_id: Some(object.id()),
644                                },
645                            )
646                        })
647                })
648        })
649}
650
651struct RestParLiveObjectSetIndexer<'a> {
652    tables: &'a IndexStoreTables,
653    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
654    epoch_store: &'a AuthorityPerEpochStore,
655    package_store: &'a Arc<dyn BackingPackageStore + Send + Sync>,
656}
657
658struct RestLiveObjectIndexer<'a> {
659    tables: &'a IndexStoreTables,
660    batch: typed_store::rocks::DBBatch,
661    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
662    resolver: Box<dyn LayoutResolver + 'a>,
663}
664
665impl<'a> ParMakeLiveObjectIndexer for RestParLiveObjectSetIndexer<'a> {
666    type ObjectIndexer = RestLiveObjectIndexer<'a>;
667
668    fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
669        RestLiveObjectIndexer {
670            tables: self.tables,
671            batch: self.tables.owner.batch(),
672            coin_index: self.coin_index,
673            resolver: self
674                .epoch_store
675                .executor()
676                .type_layout_resolver(Box::new(self.package_store)),
677        }
678    }
679}
680
681impl LiveObjectIndexer for RestLiveObjectIndexer<'_> {
682    fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
683        match object.owner {
684            // Owner Index
685            Owner::AddressOwner(owner) => {
686                let owner_key = OwnerIndexKey::new(owner, object.id());
687                let owner_info = OwnerIndexInfo::new(&object);
688                self.batch
689                    .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
690            }
691
692            // Dynamic Field Index
693            Owner::ObjectOwner(parent) => {
694                if let Some(field_info) =
695                    try_create_dynamic_field_info(&object, self.resolver.as_mut())?
696                {
697                    let field_key = DynamicFieldKey::new(parent, object.id());
698
699                    self.batch
700                        .insert_batch(&self.tables.dynamic_field, [(field_key, field_info)])?;
701                }
702            }
703
704            Owner::Shared { .. } | Owner::Immutable => {}
705        }
706
707        // Look for CoinMetadata<T> and TreasuryCap<T> objects
708        if let Some((key, value)) = try_create_coin_index_info(&object) {
709            use std::collections::hash_map::Entry;
710
711            match self.coin_index.lock().unwrap().entry(key) {
712                Entry::Occupied(o) => {
713                    let (key, v) = o.remove_entry();
714                    let value = value.merge(v);
715                    self.batch.insert_batch(&self.tables.coin, [(key, value)])?;
716                }
717                Entry::Vacant(v) => {
718                    v.insert(value);
719                }
720            }
721        }
722
723        // If the batch size grows to greater that 128MB then write out to the DB so
724        // that the data we need to hold in memory doesn't grown unbounded.
725        if self.batch.size_in_bytes() >= 1 << 27 {
726            std::mem::replace(&mut self.batch, self.tables.owner.batch()).write()?;
727        }
728
729        Ok(())
730    }
731
732    fn finish(self) -> Result<(), StorageError> {
733        self.batch.write()?;
734        Ok(())
735    }
736}