iota_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_types::{
8    base_types::{IotaAddress, ObjectID, TransactionDigest},
9    committee::{Committee, EpochId},
10    digests::TransactionEventsDigest,
11    effects::{TransactionEffects, TransactionEvents},
12    error::IotaError,
13    messages_checkpoint::{
14        CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15        FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16    },
17    object::Object,
18    storage::{
19        AccountOwnedObjectInfo, CoinInfo, DynamicFieldIndexInfo, DynamicFieldKey, ObjectKey,
20        ObjectStore, ReadStore, RestIndexes, RestStateReader, WriteStore,
21        error::{Error as StorageError, Result},
22    },
23    transaction::VerifiedTransaction,
24};
25use move_core_types::language_storage::StructTag;
26use parking_lot::Mutex;
27use tap::Pipe;
28use tracing::instrument;
29
30use crate::{
31    authority::AuthorityState,
32    checkpoints::CheckpointStore,
33    epoch::committee_store::CommitteeStore,
34    execution_cache::ExecutionCacheTraitPointers,
35    rest_index::{CoinIndexInfo, OwnerIndexInfo, OwnerIndexKey, RestIndexStore},
36};
37
38#[derive(Clone)]
39pub struct RocksDbStore {
40    cache_traits: ExecutionCacheTraitPointers,
41
42    committee_store: Arc<CommitteeStore>,
43    checkpoint_store: Arc<CheckpointStore>,
44    // in memory checkpoint watermark sequence numbers
45    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
46    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
47}
48
49impl RocksDbStore {
50    pub fn new(
51        cache_traits: ExecutionCacheTraitPointers,
52        committee_store: Arc<CommitteeStore>,
53        checkpoint_store: Arc<CheckpointStore>,
54    ) -> Self {
55        Self {
56            cache_traits,
57            committee_store,
58            checkpoint_store,
59            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
60            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
61        }
62    }
63
64    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
65        self.cache_traits
66            .object_cache_reader
67            .try_multi_get_objects_by_key(object_keys)
68    }
69
70    pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
71        Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
72    }
73}
74
75impl ReadStore for RocksDbStore {
76    fn try_get_checkpoint_by_digest(
77        &self,
78        digest: &CheckpointDigest,
79    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
80        self.checkpoint_store
81            .get_checkpoint_by_digest(digest)
82            .map_err(Into::into)
83    }
84
85    fn try_get_checkpoint_by_sequence_number(
86        &self,
87        sequence_number: CheckpointSequenceNumber,
88    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
89        self.checkpoint_store
90            .get_checkpoint_by_sequence_number(sequence_number)
91            .map_err(Into::into)
92    }
93
94    fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
95        self.checkpoint_store
96            .get_highest_verified_checkpoint()
97            .map(|maybe_checkpoint| {
98                maybe_checkpoint
99                    .expect("storage should have been initialized with genesis checkpoint")
100            })
101            .map_err(Into::into)
102    }
103
104    fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
105        self.checkpoint_store
106            .get_highest_synced_checkpoint()
107            .map(|maybe_checkpoint| {
108                maybe_checkpoint
109                    .expect("storage should have been initialized with genesis checkpoint")
110            })
111            .map_err(Into::into)
112    }
113
114    fn try_get_lowest_available_checkpoint(
115        &self,
116    ) -> Result<CheckpointSequenceNumber, StorageError> {
117        let highest_pruned_cp = self
118            .checkpoint_store
119            .get_highest_pruned_checkpoint_seq_number()
120            .map_err(Into::<StorageError>::into)?;
121
122        if highest_pruned_cp == 0 {
123            Ok(0)
124        } else {
125            Ok(highest_pruned_cp + 1)
126        }
127    }
128
129    fn try_get_full_checkpoint_contents_by_sequence_number(
130        &self,
131        sequence_number: CheckpointSequenceNumber,
132    ) -> Result<Option<FullCheckpointContents>, StorageError> {
133        self.checkpoint_store
134            .get_full_checkpoint_contents_by_sequence_number(sequence_number)
135            .map_err(Into::into)
136    }
137
138    fn try_get_full_checkpoint_contents(
139        &self,
140        digest: &CheckpointContentsDigest,
141    ) -> Result<Option<FullCheckpointContents>, StorageError> {
142        // First look to see if we saved the complete contents already.
143        if let Some(seq_num) = self
144            .checkpoint_store
145            .get_sequence_number_by_contents_digest(digest)
146            .map_err(iota_types::storage::error::Error::custom)?
147        {
148            let contents = self
149                .checkpoint_store
150                .get_full_checkpoint_contents_by_sequence_number(seq_num)
151                .map_err(iota_types::storage::error::Error::custom)?;
152            if contents.is_some() {
153                return Ok(contents);
154            }
155        }
156
157        // Otherwise gather it from the individual components.
158        // Note we can't insert the constructed contents into `full_checkpoint_content`,
159        // because it needs to be inserted along with
160        // `checkpoint_sequence_by_contents_digest` and `checkpoint_content`.
161        // However at this point it's likely we don't know the corresponding
162        // sequence number yet.
163        self.checkpoint_store
164            .get_checkpoint_contents(digest)
165            .map_err(iota_types::storage::error::Error::custom)?
166            .map(|contents| {
167                let mut transactions = Vec::with_capacity(contents.size());
168                for tx in contents.iter() {
169                    if let (Some(t), Some(e)) = (
170                        self.try_get_transaction(&tx.transaction)?,
171                        self.cache_traits
172                            .transaction_cache_reader
173                            .try_get_effects(&tx.effects)
174                            .map_err(iota_types::storage::error::Error::custom)?,
175                    ) {
176                        transactions.push(iota_types::base_types::ExecutionData::new(
177                            (*t).clone().into_inner(),
178                            e,
179                        ))
180                    } else {
181                        return Result::<
182                            Option<FullCheckpointContents>,
183                            iota_types::storage::error::Error,
184                        >::Ok(None);
185                    }
186                }
187                Ok(Some(
188                    FullCheckpointContents::from_contents_and_execution_data(
189                        contents,
190                        transactions.into_iter(),
191                    ),
192                ))
193            })
194            .transpose()
195            .map(|contents| contents.flatten())
196            .map_err(iota_types::storage::error::Error::custom)
197    }
198
199    fn try_get_committee(
200        &self,
201        epoch: EpochId,
202    ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
203        Ok(self.committee_store.get_committee(&epoch).unwrap())
204    }
205
206    fn try_get_transaction(
207        &self,
208        digest: &TransactionDigest,
209    ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
210        self.cache_traits
211            .transaction_cache_reader
212            .try_get_transaction_block(digest)
213            .map_err(StorageError::custom)
214    }
215
216    fn try_get_transaction_effects(
217        &self,
218        digest: &TransactionDigest,
219    ) -> Result<Option<TransactionEffects>, StorageError> {
220        self.cache_traits
221            .transaction_cache_reader
222            .try_get_executed_effects(digest)
223            .map_err(StorageError::custom)
224    }
225
226    fn try_get_events(
227        &self,
228        digest: &TransactionEventsDigest,
229    ) -> Result<Option<TransactionEvents>, StorageError> {
230        self.cache_traits
231            .transaction_cache_reader
232            .try_get_events(digest)
233            .map_err(StorageError::custom)
234    }
235
236    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
237        self.checkpoint_store
238            .get_highest_executed_checkpoint()
239            .map_err(iota_types::storage::error::Error::custom)?
240            .ok_or_else(|| {
241                iota_types::storage::error::Error::missing("unable to get latest checkpoint")
242            })
243    }
244
245    fn try_get_checkpoint_contents_by_digest(
246        &self,
247        digest: &CheckpointContentsDigest,
248    ) -> iota_types::storage::error::Result<
249        Option<iota_types::messages_checkpoint::CheckpointContents>,
250    > {
251        self.checkpoint_store
252            .get_checkpoint_contents(digest)
253            .map_err(iota_types::storage::error::Error::custom)
254    }
255
256    fn try_get_checkpoint_contents_by_sequence_number(
257        &self,
258        sequence_number: CheckpointSequenceNumber,
259    ) -> iota_types::storage::error::Result<
260        Option<iota_types::messages_checkpoint::CheckpointContents>,
261    > {
262        match self.try_get_checkpoint_by_sequence_number(sequence_number) {
263            Ok(Some(checkpoint)) => {
264                self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
265            }
266            Ok(None) => Ok(None),
267            Err(e) => Err(e),
268        }
269    }
270}
271
272impl ObjectStore for RocksDbStore {
273    fn try_get_object(
274        &self,
275        object_id: &iota_types::base_types::ObjectID,
276    ) -> iota_types::storage::error::Result<Option<Object>> {
277        self.cache_traits.object_store.try_get_object(object_id)
278    }
279
280    fn try_get_object_by_key(
281        &self,
282        object_id: &iota_types::base_types::ObjectID,
283        version: iota_types::base_types::VersionNumber,
284    ) -> iota_types::storage::error::Result<Option<Object>> {
285        self.cache_traits
286            .object_store
287            .try_get_object_by_key(object_id, version)
288    }
289}
290
291impl WriteStore for RocksDbStore {
292    #[instrument(level = "trace", skip_all)]
293    fn try_insert_checkpoint(
294        &self,
295        checkpoint: &VerifiedCheckpoint,
296    ) -> Result<(), iota_types::storage::error::Error> {
297        if let Some(EndOfEpochData {
298            next_epoch_committee,
299            ..
300        }) = checkpoint.end_of_epoch_data.as_ref()
301        {
302            let next_committee = next_epoch_committee.iter().cloned().collect();
303            let committee =
304                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
305            self.try_insert_committee(committee)?;
306        }
307
308        self.checkpoint_store
309            .insert_verified_checkpoint(checkpoint)
310            .map_err(Into::into)
311    }
312
313    fn try_update_highest_synced_checkpoint(
314        &self,
315        checkpoint: &VerifiedCheckpoint,
316    ) -> Result<(), iota_types::storage::error::Error> {
317        let mut locked = self.highest_synced_checkpoint.lock();
318        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
319            return Ok(());
320        }
321        self.checkpoint_store
322            .update_highest_synced_checkpoint(checkpoint)
323            .map_err(iota_types::storage::error::Error::custom)?;
324        *locked = Some(checkpoint.sequence_number);
325        Ok(())
326    }
327
328    fn try_update_highest_verified_checkpoint(
329        &self,
330        checkpoint: &VerifiedCheckpoint,
331    ) -> Result<(), iota_types::storage::error::Error> {
332        let mut locked = self.highest_verified_checkpoint.lock();
333        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
334            return Ok(());
335        }
336        self.checkpoint_store
337            .update_highest_verified_checkpoint(checkpoint)
338            .map_err(iota_types::storage::error::Error::custom)?;
339        *locked = Some(checkpoint.sequence_number);
340        Ok(())
341    }
342
343    fn try_insert_checkpoint_contents(
344        &self,
345        checkpoint: &VerifiedCheckpoint,
346        contents: VerifiedCheckpointContents,
347    ) -> Result<(), iota_types::storage::error::Error> {
348        self.cache_traits
349            .state_sync_store
350            .try_multi_insert_transaction_and_effects(contents.transactions())
351            .map_err(iota_types::storage::error::Error::custom)?;
352        self.checkpoint_store
353            .insert_verified_checkpoint_contents(checkpoint, contents)
354            .map_err(Into::into)
355    }
356
357    fn try_insert_committee(
358        &self,
359        new_committee: Committee,
360    ) -> Result<(), iota_types::storage::error::Error> {
361        self.committee_store
362            .insert_new_committee(&new_committee)
363            .unwrap();
364        Ok(())
365    }
366}
367
368pub struct RestReadStore {
369    state: Arc<AuthorityState>,
370    rocks: RocksDbStore,
371}
372
373impl RestReadStore {
374    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
375        Self { state, rocks }
376    }
377
378    fn index(&self) -> iota_types::storage::error::Result<&RestIndexStore> {
379        self.state.rest_index.as_deref().ok_or_else(|| {
380            iota_types::storage::error::Error::custom("rest index store is disabled")
381        })
382    }
383}
384
385impl ObjectStore for RestReadStore {
386    fn try_get_object(
387        &self,
388        object_id: &iota_types::base_types::ObjectID,
389    ) -> iota_types::storage::error::Result<Option<Object>> {
390        self.rocks.try_get_object(object_id)
391    }
392
393    fn try_get_object_by_key(
394        &self,
395        object_id: &iota_types::base_types::ObjectID,
396        version: iota_types::base_types::VersionNumber,
397    ) -> iota_types::storage::error::Result<Option<Object>> {
398        self.rocks.try_get_object_by_key(object_id, version)
399    }
400}
401
402impl ReadStore for RestReadStore {
403    fn try_get_committee(
404        &self,
405        epoch: EpochId,
406    ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
407        self.rocks.try_get_committee(epoch)
408    }
409
410    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
411        self.rocks.try_get_latest_checkpoint()
412    }
413
414    fn try_get_highest_verified_checkpoint(
415        &self,
416    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
417        self.rocks.try_get_highest_verified_checkpoint()
418    }
419
420    fn try_get_highest_synced_checkpoint(
421        &self,
422    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
423        self.rocks.try_get_highest_synced_checkpoint()
424    }
425
426    fn try_get_lowest_available_checkpoint(
427        &self,
428    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
429        self.rocks.try_get_lowest_available_checkpoint()
430    }
431
432    fn try_get_checkpoint_by_digest(
433        &self,
434        digest: &CheckpointDigest,
435    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
436        self.rocks.try_get_checkpoint_by_digest(digest)
437    }
438
439    fn try_get_checkpoint_by_sequence_number(
440        &self,
441        sequence_number: CheckpointSequenceNumber,
442    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
443        self.rocks
444            .try_get_checkpoint_by_sequence_number(sequence_number)
445    }
446
447    fn try_get_checkpoint_contents_by_digest(
448        &self,
449        digest: &CheckpointContentsDigest,
450    ) -> iota_types::storage::error::Result<
451        Option<iota_types::messages_checkpoint::CheckpointContents>,
452    > {
453        self.rocks.try_get_checkpoint_contents_by_digest(digest)
454    }
455
456    fn try_get_checkpoint_contents_by_sequence_number(
457        &self,
458        sequence_number: CheckpointSequenceNumber,
459    ) -> iota_types::storage::error::Result<
460        Option<iota_types::messages_checkpoint::CheckpointContents>,
461    > {
462        self.rocks
463            .try_get_checkpoint_contents_by_sequence_number(sequence_number)
464    }
465
466    fn try_get_transaction(
467        &self,
468        digest: &TransactionDigest,
469    ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
470        self.rocks.try_get_transaction(digest)
471    }
472
473    fn try_get_transaction_effects(
474        &self,
475        digest: &TransactionDigest,
476    ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
477        self.rocks.try_get_transaction_effects(digest)
478    }
479
480    fn try_get_events(
481        &self,
482        digest: &TransactionEventsDigest,
483    ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
484        self.rocks.try_get_events(digest)
485    }
486
487    fn try_get_full_checkpoint_contents_by_sequence_number(
488        &self,
489        sequence_number: CheckpointSequenceNumber,
490    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
491        self.rocks
492            .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
493    }
494
495    fn try_get_full_checkpoint_contents(
496        &self,
497        digest: &CheckpointContentsDigest,
498    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
499        self.rocks.try_get_full_checkpoint_contents(digest)
500    }
501}
502
503impl RestStateReader for RestReadStore {
504    fn get_lowest_available_checkpoint_objects(
505        &self,
506    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
507        let highest_pruned_cp = self
508            .state
509            .get_object_cache_reader()
510            .try_get_highest_pruned_checkpoint()
511            .map_err(StorageError::custom)?;
512
513        if highest_pruned_cp == 0 {
514            Ok(0)
515        } else {
516            Ok(highest_pruned_cp + 1)
517        }
518    }
519
520    fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
521        Ok(self.state.get_chain_identifier())
522    }
523
524    fn get_epoch_last_checkpoint(
525        &self,
526        epoch_id: EpochId,
527    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
528        self.rocks
529            .checkpoint_store
530            .get_epoch_last_checkpoint(epoch_id)
531            .map_err(iota_types::storage::error::Error::custom)
532    }
533
534    fn indexes(&self) -> Option<&dyn RestIndexes> {
535        self.index().ok().map(|index| index as _)
536    }
537}
538
539impl RestIndexes for RestIndexStore {
540    fn get_transaction_checkpoint(
541        &self,
542        digest: &TransactionDigest,
543    ) -> iota_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
544        self.get_transaction_info(digest)
545            .map(|maybe_info| maybe_info.map(|info| info.checkpoint))
546            .map_err(StorageError::custom)
547    }
548
549    fn account_owned_objects_info_iter(
550        &self,
551        owner: IotaAddress,
552        cursor: Option<ObjectID>,
553    ) -> Result<Box<dyn Iterator<Item = AccountOwnedObjectInfo> + '_>> {
554        let iter = self.owner_iter(owner, cursor)?.map(
555            |(OwnerIndexKey { owner, object_id }, OwnerIndexInfo { version, type_ })| {
556                AccountOwnedObjectInfo {
557                    owner,
558                    object_id,
559                    version,
560                    type_,
561                }
562            },
563        );
564
565        Ok(Box::new(iter) as _)
566    }
567
568    fn dynamic_field_iter(
569        &self,
570        parent: ObjectID,
571        cursor: Option<ObjectID>,
572    ) -> iota_types::storage::error::Result<
573        Box<dyn Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_>,
574    > {
575        let iter = self.dynamic_field_iter(parent, cursor)?;
576
577        Ok(Box::new(iter) as _)
578    }
579
580    fn get_coin_info(
581        &self,
582        coin_type: &StructTag,
583    ) -> iota_types::storage::error::Result<Option<CoinInfo>> {
584        self.get_coin_info(coin_type)?
585            .map(
586                |CoinIndexInfo {
587                     coin_metadata_object_id,
588                     treasury_object_id,
589                 }| CoinInfo {
590                    coin_metadata_object_id,
591                    treasury_object_id,
592                },
593            )
594            .pipe(Ok)
595    }
596}