iota_core/
rest_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    path::PathBuf,
8    sync::{Arc, Mutex},
9    time::Instant,
10};
11
12use iota_types::{
13    base_types::{IotaAddress, MoveObjectType, ObjectID, SequenceNumber},
14    digests::TransactionDigest,
15    dynamic_field::visitor as DFV,
16    full_checkpoint_content::CheckpointData,
17    layout_resolver::LayoutResolver,
18    messages_checkpoint::CheckpointContents,
19    object::{Object, Owner},
20    storage::{
21        BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, error::Error as StorageError,
22    },
23};
24use move_core_types::language_storage::StructTag;
25use rayon::iter::{IntoParallelIterator, ParallelIterator};
26use serde::{Deserialize, Serialize};
27use tracing::{debug, info};
28use typed_store::{
29    DBMapUtils, TypedStoreError,
30    rocks::{DBMap, MetricConf},
31    traits::{Map, TableSummary, TypedStoreDebug},
32};
33
34use crate::{
35    authority::{AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore},
36    checkpoints::CheckpointStore,
37    par_index_live_object_set::{LiveObjectIndexer, ParMakeLiveObjectIndexer},
38};
39
40const CURRENT_DB_VERSION: u64 = 0;
41
42#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
43struct MetadataInfo {
44    /// Version of the Database
45    version: u64,
46}
47
48#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
49pub struct OwnerIndexKey {
50    pub owner: IotaAddress,
51    pub object_id: ObjectID,
52}
53
54impl OwnerIndexKey {
55    fn new(owner: IotaAddress, object_id: ObjectID) -> Self {
56        Self { owner, object_id }
57    }
58}
59
60#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
61pub struct OwnerIndexInfo {
62    // object_id of the object is a part of the Key
63    pub version: SequenceNumber,
64    pub type_: MoveObjectType,
65}
66
67impl OwnerIndexInfo {
68    pub fn new(object: &Object) -> Self {
69        Self {
70            version: object.version(),
71            type_: object.type_().expect("packages cannot be owned").to_owned(),
72        }
73    }
74}
75
76#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
77pub struct TransactionInfo {
78    pub checkpoint: u64,
79}
80
81#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
82pub struct CoinIndexKey {
83    coin_type: StructTag,
84}
85
86#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
87pub struct CoinIndexInfo {
88    pub coin_metadata_object_id: Option<ObjectID>,
89    pub treasury_object_id: Option<ObjectID>,
90}
91
92impl CoinIndexInfo {
93    fn merge(self, other: Self) -> Self {
94        Self {
95            coin_metadata_object_id: self
96                .coin_metadata_object_id
97                .or(other.coin_metadata_object_id),
98            treasury_object_id: self.treasury_object_id.or(other.treasury_object_id),
99        }
100    }
101}
102
103/// RocksDB tables for the RestIndexStore
104///
105/// Anytime a new table is added, or and existing one has it's schema changed,
106/// make sure to also update the value of `CURRENT_DB_VERSION`.
107///
108/// NOTE: Authors and Reviewers before adding any new tables ensure that they
109/// are either:
110/// - bounded in size by the live object set
111/// - are prune-able and have corresponding logic in the `prune` function
112#[derive(DBMapUtils)]
113struct IndexStoreTables {
114    /// A singleton that store metadata information on the DB.
115    ///
116    /// A few uses for this singleton:
117    /// - determining if the DB has been initialized (as some tables will still
118    ///   be empty post initialization)
119    /// - version of the DB. Everytime a new table or schema is changed the
120    ///   version number needs to be incremented.
121    meta: DBMap<(), MetadataInfo>,
122
123    /// An index of extra metadata for Transactions.
124    ///
125    /// Only contains entries for transactions which have yet to be pruned from
126    /// the main database.
127    transactions: DBMap<TransactionDigest, TransactionInfo>,
128
129    /// An index of object ownership.
130    ///
131    /// Allows an efficient iterator to list all objects currently owned by a
132    /// specific user account.
133    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
134
135    /// An index of dynamic fields (children objects).
136    ///
137    /// Allows an efficient iterator to list all of the dynamic fields owned by
138    /// a particular ObjectID.
139    dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
140
141    /// An index of Coin Types
142    ///
143    /// Allows looking up information related to published Coins, like the
144    /// ObjectID of its coorisponding CoinMetadata.
145    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
146    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
147    // - bounded in size by the live object set
148    // - are prune-able and have corresponding logic in the `prune` function
149}
150
151impl IndexStoreTables {
152    fn open<P: Into<PathBuf>>(path: P) -> Self {
153        IndexStoreTables::open_tables_read_write(
154            path.into(),
155            MetricConf::new("rest-index"),
156            None,
157            None,
158        )
159    }
160
161    fn needs_to_do_initialization(&self) -> bool {
162        match self.meta.get(&()) {
163            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
164            Ok(None) => true,
165            Err(_) => true,
166        }
167    }
168
169    fn needs_to_delete_old_db(&self) -> bool {
170        match self.meta.get(&()) {
171            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
172            Ok(None) => false,
173            Err(_) => true,
174        }
175    }
176
177    fn init(
178        &mut self,
179        authority_store: &AuthorityStore,
180        checkpoint_store: &CheckpointStore,
181        epoch_store: &AuthorityPerEpochStore,
182        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
183    ) -> Result<(), StorageError> {
184        info!("Initializing REST indexes");
185
186        // Iterate through available, executed checkpoints that have yet to be pruned
187        // to initialize checkpoint and transaction based indexes.
188        if let Some(highest_executed_checkpoint) =
189            checkpoint_store.get_highest_executed_checkpoint_seq_number()?
190        {
191            let lowest_available_checkpoint = checkpoint_store
192                .get_highest_pruned_checkpoint_seq_number()?
193                .saturating_add(1);
194
195            let checkpoint_range = lowest_available_checkpoint..=highest_executed_checkpoint;
196
197            info!(
198                "Indexing {} checkpoints in range {checkpoint_range:?}",
199                checkpoint_range.size_hint().0
200            );
201            let start_time = Instant::now();
202
203            checkpoint_range.into_par_iter().try_for_each(|seq| {
204                let checkpoint = checkpoint_store
205                    .get_checkpoint_by_sequence_number(seq)?
206                    .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
207                let contents = checkpoint_store
208                    .get_checkpoint_contents(&checkpoint.content_digest)?
209                    .ok_or_else(|| StorageError::missing(format!("missing checkpoint {seq}")))?;
210
211                let info = TransactionInfo {
212                    checkpoint: checkpoint.sequence_number,
213                };
214
215                self.transactions
216                    .multi_insert(contents.iter().map(|digests| (digests.transaction, info)))
217                    .map_err(StorageError::from)
218            })?;
219
220            info!(
221                "Indexing checkpoints took {} seconds",
222                start_time.elapsed().as_secs()
223            );
224        }
225
226        let coin_index = Mutex::new(HashMap::new());
227
228        let make_live_object_indexer = RestParLiveObjectSetIndexer {
229            tables: self,
230            coin_index: &coin_index,
231            epoch_store,
232            package_store,
233        };
234
235        crate::par_index_live_object_set::par_index_live_object_set(
236            authority_store,
237            &make_live_object_indexer,
238        )?;
239
240        self.coin.multi_insert(coin_index.into_inner().unwrap())?;
241
242        self.meta.insert(
243            &(),
244            &MetadataInfo {
245                version: CURRENT_DB_VERSION,
246            },
247        )?;
248
249        info!("Finished initializing REST indexes");
250
251        Ok(())
252    }
253
254    /// Prune data from this Index
255    fn prune(
256        &self,
257        checkpoint_contents_to_prune: &[CheckpointContents],
258    ) -> Result<(), TypedStoreError> {
259        let mut batch = self.transactions.batch();
260
261        let transactions_to_prune = checkpoint_contents_to_prune
262            .iter()
263            .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
264
265        batch.delete_batch(&self.transactions, transactions_to_prune)?;
266
267        batch.write()
268    }
269
270    /// Index a Checkpoint
271    fn index_checkpoint(
272        &self,
273        checkpoint: &CheckpointData,
274        resolver: &mut dyn LayoutResolver,
275    ) -> Result<typed_store::rocks::DBBatch, StorageError> {
276        debug!(
277            checkpoint = checkpoint.checkpoint_summary.sequence_number,
278            "indexing checkpoint"
279        );
280
281        let mut batch = self.transactions.batch();
282
283        // transactions index
284        {
285            let info = TransactionInfo {
286                checkpoint: checkpoint.checkpoint_summary.sequence_number,
287            };
288
289            batch.insert_batch(
290                &self.transactions,
291                checkpoint
292                    .checkpoint_contents
293                    .iter()
294                    .map(|digests| (digests.transaction, info)),
295            )?;
296        }
297
298        // object indexes
299        {
300            let mut coin_index = HashMap::new();
301
302            for tx in &checkpoint.transactions {
303                // determine changes from removed objects
304                for removed_object in tx.removed_objects_pre_version() {
305                    match removed_object.owner() {
306                        Owner::AddressOwner(address) => {
307                            let owner_key = OwnerIndexKey::new(*address, removed_object.id());
308                            batch.delete_batch(&self.owner, [owner_key])?;
309                        }
310                        Owner::ObjectOwner(object_id) => {
311                            batch.delete_batch(
312                                &self.dynamic_field,
313                                [DynamicFieldKey::new(*object_id, removed_object.id())],
314                            )?;
315                        }
316                        Owner::Shared { .. } | Owner::Immutable => {}
317                    }
318                }
319
320                // determine changes from changed objects
321                for (object, old_object) in tx.changed_objects() {
322                    if let Some(old_object) = old_object {
323                        if old_object.owner() != object.owner() {
324                            match old_object.owner() {
325                                Owner::AddressOwner(address) => {
326                                    let owner_key = OwnerIndexKey::new(*address, old_object.id());
327                                    batch.delete_batch(&self.owner, [owner_key])?;
328                                }
329
330                                Owner::ObjectOwner(object_id) => {
331                                    batch.delete_batch(
332                                        &self.dynamic_field,
333                                        [DynamicFieldKey::new(*object_id, old_object.id())],
334                                    )?;
335                                }
336
337                                Owner::Shared { .. } | Owner::Immutable => {}
338                            }
339                        }
340                    }
341
342                    match object.owner() {
343                        Owner::AddressOwner(owner) => {
344                            let owner_key = OwnerIndexKey::new(*owner, object.id());
345                            let owner_info = OwnerIndexInfo::new(object);
346                            batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
347                        }
348                        Owner::ObjectOwner(parent) => {
349                            if let Some(field_info) =
350                                try_create_dynamic_field_info(object, resolver)
351                                    .ok()
352                                    .flatten()
353                            {
354                                let field_key = DynamicFieldKey::new(*parent, object.id());
355
356                                batch
357                                    .insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
358                            }
359                        }
360                        Owner::Shared { .. } | Owner::Immutable => {}
361                    }
362                }
363
364                // coin indexing
365                //
366                // coin indexing relies on the fact that CoinMetadata and TreasuryCap are
367                // created in the same transaction so we don't need to worry
368                // about overriding any older value that may exist in the
369                // database (because there necessarily cannot be).
370                for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
371                    use std::collections::hash_map::Entry;
372
373                    match coin_index.entry(key) {
374                        Entry::Occupied(o) => {
375                            let (key, v) = o.remove_entry();
376                            let value = value.merge(v);
377                            batch.insert_batch(&self.coin, [(key, value)])?;
378                        }
379                        Entry::Vacant(v) => {
380                            v.insert(value);
381                        }
382                    }
383                }
384            }
385
386            batch.insert_batch(&self.coin, coin_index)?;
387        }
388
389        debug!(
390            checkpoint = checkpoint.checkpoint_summary.sequence_number,
391            "finished indexing checkpoint"
392        );
393
394        Ok(batch)
395    }
396
397    fn get_transaction_info(
398        &self,
399        digest: &TransactionDigest,
400    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
401        self.transactions.get(digest)
402    }
403
404    fn owner_iter(
405        &self,
406        owner: IotaAddress,
407        cursor: Option<ObjectID>,
408    ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
409        let lower_bound = OwnerIndexKey::new(owner, ObjectID::ZERO);
410        let upper_bound = OwnerIndexKey::new(owner, ObjectID::MAX);
411        let mut iter = self
412            .owner
413            .iter_with_bounds(Some(lower_bound), Some(upper_bound));
414
415        if let Some(cursor) = cursor {
416            iter = iter.skip_to(&OwnerIndexKey::new(owner, cursor))?;
417        }
418
419        Ok(iter)
420    }
421
422    fn dynamic_field_iter(
423        &self,
424        parent: ObjectID,
425        cursor: Option<ObjectID>,
426    ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
427    {
428        let lower_bound = DynamicFieldKey::new(parent, ObjectID::ZERO);
429        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
430        let mut iter = self
431            .dynamic_field
432            .iter_with_bounds(Some(lower_bound), Some(upper_bound));
433
434        if let Some(cursor) = cursor {
435            iter = iter.skip_to(&DynamicFieldKey::new(parent, cursor))?;
436        }
437
438        Ok(iter)
439    }
440
441    fn get_coin_info(
442        &self,
443        coin_type: &StructTag,
444    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
445        let key = CoinIndexKey {
446            coin_type: coin_type.to_owned(),
447        };
448        self.coin.get(&key)
449    }
450}
451
452pub struct RestIndexStore {
453    tables: IndexStoreTables,
454    pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
455}
456
457impl RestIndexStore {
458    pub fn new(
459        path: PathBuf,
460        authority_store: &AuthorityStore,
461        checkpoint_store: &CheckpointStore,
462        epoch_store: &AuthorityPerEpochStore,
463        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
464    ) -> Self {
465        let tables = {
466            let tables = IndexStoreTables::open(&path);
467
468            // If the index tables are uninitialized or on an older version then we need to
469            // populate them
470            if tables.needs_to_do_initialization() {
471                let mut tables = if tables.needs_to_delete_old_db() {
472                    drop(tables);
473                    typed_store::rocks::safe_drop_db(path.clone())
474                        .expect("unable to destroy old rest-index db");
475                    IndexStoreTables::open(path)
476                } else {
477                    tables
478                };
479
480                tables
481                    .init(
482                        authority_store,
483                        checkpoint_store,
484                        epoch_store,
485                        package_store,
486                    )
487                    .expect("unable to initialize rest index from live object set");
488                tables
489            } else {
490                tables
491            }
492        };
493
494        Self {
495            tables,
496            pending_updates: Default::default(),
497        }
498    }
499
500    pub fn new_without_init(path: PathBuf) -> Self {
501        let tables = IndexStoreTables::open(path);
502
503        Self {
504            tables,
505            pending_updates: Default::default(),
506        }
507    }
508
509    pub fn prune(
510        &self,
511        checkpoint_contents_to_prune: &[CheckpointContents],
512    ) -> Result<(), TypedStoreError> {
513        self.tables.prune(checkpoint_contents_to_prune)
514    }
515
516    /// Index a checkpoint and stage the index updated in `pending_updates`.
517    ///
518    /// Updates will not be committed to the database until
519    /// `commit_update_for_checkpoint` is called.
520    pub fn index_checkpoint(
521        &self,
522        checkpoint: &CheckpointData,
523        resolver: &mut dyn LayoutResolver,
524    ) -> Result<(), StorageError> {
525        let sequence_number = checkpoint.checkpoint_summary.sequence_number;
526        let batch = self.tables.index_checkpoint(checkpoint, resolver)?;
527
528        self.pending_updates
529            .lock()
530            .unwrap()
531            .insert(sequence_number, batch);
532
533        Ok(())
534    }
535
536    /// Commits the pending updates for the provided checkpoint number.
537    ///
538    /// Invariants:
539    /// - `index_checkpoint` must have been called for the provided checkpoint
540    /// - Callers of this function must ensure that it is called for each
541    ///   checkpoint in sequential order. This will panic if the provided
542    ///   checkpoint does not match the expected next checkpoint to commit.
543    pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
544        let next_batch = self.pending_updates.lock().unwrap().pop_first();
545
546        // Its expected that the next batch exists
547        let (next_sequence_number, batch) = next_batch.unwrap();
548        assert_eq!(
549            checkpoint, next_sequence_number,
550            "commit_update_for_checkpoint must be called in order"
551        );
552
553        Ok(batch.write()?)
554    }
555
556    pub fn get_transaction_info(
557        &self,
558        digest: &TransactionDigest,
559    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
560        self.tables.get_transaction_info(digest)
561    }
562
563    pub fn owner_iter(
564        &self,
565        owner: IotaAddress,
566        cursor: Option<ObjectID>,
567    ) -> Result<impl Iterator<Item = (OwnerIndexKey, OwnerIndexInfo)> + '_, TypedStoreError> {
568        self.tables.owner_iter(owner, cursor)
569    }
570
571    pub fn dynamic_field_iter(
572        &self,
573        parent: ObjectID,
574        cursor: Option<ObjectID>,
575    ) -> Result<impl Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_, TypedStoreError>
576    {
577        self.tables.dynamic_field_iter(parent, cursor)
578    }
579
580    pub fn get_coin_info(
581        &self,
582        coin_type: &StructTag,
583    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
584        self.tables.get_coin_info(coin_type)
585    }
586}
587
588fn try_create_dynamic_field_info(
589    object: &Object,
590    resolver: &mut dyn LayoutResolver,
591) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
592    // Skip if not a move object
593    let Some(move_object) = object.data.try_as_move() else {
594        return Ok(None);
595    };
596
597    // Skip any objects that aren't of type `Field<Name, Value>`
598    //
599    // All dynamic fields are of type:
600    //   - Field<Name, Value> for dynamic fields
601    //   - Field<Wrapper<Name, ID>> for dynamic field objects where the ID is the id
602    //     of the pointed
603    //   to object
604    //
605    if !move_object.type_().is_dynamic_field() {
606        return Ok(None);
607    }
608
609    let layout = resolver
610        .get_annotated_layout(&move_object.type_().clone().into())
611        .map_err(StorageError::custom)?
612        .into_layout();
613
614    let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
615        .map_err(StorageError::custom)?;
616
617    let value_metadata = field.value_metadata().map_err(StorageError::custom)?;
618
619    Ok(Some(DynamicFieldIndexInfo {
620        name_type: field.name_layout.into(),
621        name_value: field.name_bytes.to_owned(),
622        dynamic_field_type: field.kind,
623        dynamic_object_id: if let DFV::ValueMetadata::DynamicObjectField(id) = value_metadata {
624            Some(id)
625        } else {
626            None
627        },
628    }))
629}
630
631fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
632    use iota_types::coin::{CoinMetadata, TreasuryCap};
633
634    object
635        .type_()
636        .and_then(MoveObjectType::other)
637        .and_then(|object_type| {
638            CoinMetadata::is_coin_metadata_with_coin_type(object_type)
639                .cloned()
640                .map(|coin_type| {
641                    (
642                        CoinIndexKey { coin_type },
643                        CoinIndexInfo {
644                            coin_metadata_object_id: Some(object.id()),
645                            treasury_object_id: None,
646                        },
647                    )
648                })
649                .or_else(|| {
650                    TreasuryCap::is_treasury_with_coin_type(object_type)
651                        .cloned()
652                        .map(|coin_type| {
653                            (
654                                CoinIndexKey { coin_type },
655                                CoinIndexInfo {
656                                    coin_metadata_object_id: None,
657                                    treasury_object_id: Some(object.id()),
658                                },
659                            )
660                        })
661                })
662        })
663}
664
665struct RestParLiveObjectSetIndexer<'a> {
666    tables: &'a IndexStoreTables,
667    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
668    epoch_store: &'a AuthorityPerEpochStore,
669    package_store: &'a Arc<dyn BackingPackageStore + Send + Sync>,
670}
671
672struct RestLiveObjectIndexer<'a> {
673    tables: &'a IndexStoreTables,
674    batch: typed_store::rocks::DBBatch,
675    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
676    resolver: Box<dyn LayoutResolver + 'a>,
677}
678
679impl<'a> ParMakeLiveObjectIndexer for RestParLiveObjectSetIndexer<'a> {
680    type ObjectIndexer = RestLiveObjectIndexer<'a>;
681
682    fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
683        RestLiveObjectIndexer {
684            tables: self.tables,
685            batch: self.tables.owner.batch(),
686            coin_index: self.coin_index,
687            resolver: self
688                .epoch_store
689                .executor()
690                .type_layout_resolver(Box::new(self.package_store)),
691        }
692    }
693}
694
695impl LiveObjectIndexer for RestLiveObjectIndexer<'_> {
696    fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
697        match object.owner {
698            // Owner Index
699            Owner::AddressOwner(owner) => {
700                let owner_key = OwnerIndexKey::new(owner, object.id());
701                let owner_info = OwnerIndexInfo::new(&object);
702                self.batch
703                    .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
704            }
705
706            // Dynamic Field Index
707            Owner::ObjectOwner(parent) => {
708                if let Some(field_info) =
709                    try_create_dynamic_field_info(&object, self.resolver.as_mut())?
710                {
711                    let field_key = DynamicFieldKey::new(parent, object.id());
712
713                    self.batch
714                        .insert_batch(&self.tables.dynamic_field, [(field_key, field_info)])?;
715                }
716            }
717
718            Owner::Shared { .. } | Owner::Immutable => {}
719        }
720
721        // Look for CoinMetadata<T> and TreasuryCap<T> objects
722        if let Some((key, value)) = try_create_coin_index_info(&object) {
723            use std::collections::hash_map::Entry;
724
725            match self.coin_index.lock().unwrap().entry(key) {
726                Entry::Occupied(o) => {
727                    let (key, v) = o.remove_entry();
728                    let value = value.merge(v);
729                    self.batch.insert_batch(&self.tables.coin, [(key, value)])?;
730                }
731                Entry::Vacant(v) => {
732                    v.insert(value);
733                }
734            }
735        }
736
737        // If the batch size grows to greater that 128MB then write out to the DB so
738        // that the data we need to hold in memory doesn't grown unbounded.
739        if self.batch.size_in_bytes() >= 1 << 27 {
740            std::mem::replace(&mut self.batch, self.tables.owner.batch()).write()?;
741        }
742
743        Ok(())
744    }
745
746    fn finish(self) -> Result<(), StorageError> {
747        self.batch.write()?;
748        Ok(())
749    }
750}