iota_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_types::{
8    base_types::{IotaAddress, ObjectID, TransactionDigest},
9    committee::{Committee, EpochId},
10    digests::TransactionEventsDigest,
11    effects::{TransactionEffects, TransactionEvents},
12    error::IotaError,
13    messages_checkpoint::{
14        CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15        FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16    },
17    object::Object,
18    storage::{
19        AccountOwnedObjectInfo, CoinInfo, DynamicFieldIndexInfo, DynamicFieldKey, ObjectKey,
20        ObjectStore, ReadStore, RestIndexes, RestStateReader, WriteStore,
21        error::{Error as StorageError, Result},
22    },
23    transaction::VerifiedTransaction,
24};
25use move_core_types::language_storage::StructTag;
26use parking_lot::Mutex;
27use tap::Pipe;
28
29use crate::{
30    authority::AuthorityState,
31    checkpoints::CheckpointStore,
32    epoch::committee_store::CommitteeStore,
33    execution_cache::ExecutionCacheTraitPointers,
34    rest_index::{CoinIndexInfo, OwnerIndexInfo, OwnerIndexKey, RestIndexStore},
35};
36
37#[derive(Clone)]
38pub struct RocksDbStore {
39    cache_traits: ExecutionCacheTraitPointers,
40
41    committee_store: Arc<CommitteeStore>,
42    checkpoint_store: Arc<CheckpointStore>,
43    // in memory checkpoint watermark sequence numbers
44    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
45    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
46}
47
48impl RocksDbStore {
49    pub fn new(
50        cache_traits: ExecutionCacheTraitPointers,
51        committee_store: Arc<CommitteeStore>,
52        checkpoint_store: Arc<CheckpointStore>,
53    ) -> Self {
54        Self {
55            cache_traits,
56            committee_store,
57            checkpoint_store,
58            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
59            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
60        }
61    }
62
63    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
64        self.cache_traits
65            .object_cache_reader
66            .try_multi_get_objects_by_key(object_keys)
67    }
68
69    pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
70        Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
71    }
72}
73
74impl ReadStore for RocksDbStore {
75    fn try_get_checkpoint_by_digest(
76        &self,
77        digest: &CheckpointDigest,
78    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
79        self.checkpoint_store
80            .get_checkpoint_by_digest(digest)
81            .map_err(Into::into)
82    }
83
84    fn try_get_checkpoint_by_sequence_number(
85        &self,
86        sequence_number: CheckpointSequenceNumber,
87    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
88        self.checkpoint_store
89            .get_checkpoint_by_sequence_number(sequence_number)
90            .map_err(Into::into)
91    }
92
93    fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
94        self.checkpoint_store
95            .get_highest_verified_checkpoint()
96            .map(|maybe_checkpoint| {
97                maybe_checkpoint
98                    .expect("storage should have been initialized with genesis checkpoint")
99            })
100            .map_err(Into::into)
101    }
102
103    fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
104        self.checkpoint_store
105            .get_highest_synced_checkpoint()
106            .map(|maybe_checkpoint| {
107                maybe_checkpoint
108                    .expect("storage should have been initialized with genesis checkpoint")
109            })
110            .map_err(Into::into)
111    }
112
113    fn try_get_lowest_available_checkpoint(
114        &self,
115    ) -> Result<CheckpointSequenceNumber, StorageError> {
116        let highest_pruned_cp = self
117            .checkpoint_store
118            .get_highest_pruned_checkpoint_seq_number()
119            .map_err(Into::<StorageError>::into)?;
120
121        if highest_pruned_cp == 0 {
122            Ok(0)
123        } else {
124            Ok(highest_pruned_cp + 1)
125        }
126    }
127
128    fn try_get_full_checkpoint_contents_by_sequence_number(
129        &self,
130        sequence_number: CheckpointSequenceNumber,
131    ) -> Result<Option<FullCheckpointContents>, StorageError> {
132        self.checkpoint_store
133            .get_full_checkpoint_contents_by_sequence_number(sequence_number)
134            .map_err(Into::into)
135    }
136
137    fn try_get_full_checkpoint_contents(
138        &self,
139        digest: &CheckpointContentsDigest,
140    ) -> Result<Option<FullCheckpointContents>, StorageError> {
141        // First look to see if we saved the complete contents already.
142        if let Some(seq_num) = self
143            .checkpoint_store
144            .get_sequence_number_by_contents_digest(digest)
145            .map_err(iota_types::storage::error::Error::custom)?
146        {
147            let contents = self
148                .checkpoint_store
149                .get_full_checkpoint_contents_by_sequence_number(seq_num)
150                .map_err(iota_types::storage::error::Error::custom)?;
151            if contents.is_some() {
152                return Ok(contents);
153            }
154        }
155
156        // Otherwise gather it from the individual components.
157        // Note we can't insert the constructed contents into `full_checkpoint_content`,
158        // because it needs to be inserted along with
159        // `checkpoint_sequence_by_contents_digest` and `checkpoint_content`.
160        // However at this point it's likely we don't know the corresponding
161        // sequence number yet.
162        self.checkpoint_store
163            .get_checkpoint_contents(digest)
164            .map_err(iota_types::storage::error::Error::custom)?
165            .map(|contents| {
166                let mut transactions = Vec::with_capacity(contents.size());
167                for tx in contents.iter() {
168                    if let (Some(t), Some(e)) = (
169                        self.try_get_transaction(&tx.transaction)?,
170                        self.cache_traits
171                            .transaction_cache_reader
172                            .try_get_effects(&tx.effects)
173                            .map_err(iota_types::storage::error::Error::custom)?,
174                    ) {
175                        transactions.push(iota_types::base_types::ExecutionData::new(
176                            (*t).clone().into_inner(),
177                            e,
178                        ))
179                    } else {
180                        return Result::<
181                            Option<FullCheckpointContents>,
182                            iota_types::storage::error::Error,
183                        >::Ok(None);
184                    }
185                }
186                Ok(Some(
187                    FullCheckpointContents::from_contents_and_execution_data(
188                        contents,
189                        transactions.into_iter(),
190                    ),
191                ))
192            })
193            .transpose()
194            .map(|contents| contents.flatten())
195            .map_err(iota_types::storage::error::Error::custom)
196    }
197
198    fn try_get_committee(
199        &self,
200        epoch: EpochId,
201    ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
202        Ok(self.committee_store.get_committee(&epoch).unwrap())
203    }
204
205    fn try_get_transaction(
206        &self,
207        digest: &TransactionDigest,
208    ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
209        self.cache_traits
210            .transaction_cache_reader
211            .try_get_transaction_block(digest)
212            .map_err(StorageError::custom)
213    }
214
215    fn try_get_transaction_effects(
216        &self,
217        digest: &TransactionDigest,
218    ) -> Result<Option<TransactionEffects>, StorageError> {
219        self.cache_traits
220            .transaction_cache_reader
221            .try_get_executed_effects(digest)
222            .map_err(StorageError::custom)
223    }
224
225    fn try_get_events(
226        &self,
227        digest: &TransactionEventsDigest,
228    ) -> Result<Option<TransactionEvents>, StorageError> {
229        self.cache_traits
230            .transaction_cache_reader
231            .try_get_events(digest)
232            .map_err(StorageError::custom)
233    }
234
235    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
236        self.checkpoint_store
237            .get_highest_executed_checkpoint()
238            .map_err(iota_types::storage::error::Error::custom)?
239            .ok_or_else(|| {
240                iota_types::storage::error::Error::missing("unable to get latest checkpoint")
241            })
242    }
243
244    fn try_get_checkpoint_contents_by_digest(
245        &self,
246        digest: &CheckpointContentsDigest,
247    ) -> iota_types::storage::error::Result<
248        Option<iota_types::messages_checkpoint::CheckpointContents>,
249    > {
250        self.checkpoint_store
251            .get_checkpoint_contents(digest)
252            .map_err(iota_types::storage::error::Error::custom)
253    }
254
255    fn try_get_checkpoint_contents_by_sequence_number(
256        &self,
257        sequence_number: CheckpointSequenceNumber,
258    ) -> iota_types::storage::error::Result<
259        Option<iota_types::messages_checkpoint::CheckpointContents>,
260    > {
261        match self.try_get_checkpoint_by_sequence_number(sequence_number) {
262            Ok(Some(checkpoint)) => {
263                self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
264            }
265            Ok(None) => Ok(None),
266            Err(e) => Err(e),
267        }
268    }
269}
270
271impl ObjectStore for RocksDbStore {
272    fn try_get_object(
273        &self,
274        object_id: &iota_types::base_types::ObjectID,
275    ) -> iota_types::storage::error::Result<Option<Object>> {
276        self.cache_traits.object_store.try_get_object(object_id)
277    }
278
279    fn try_get_object_by_key(
280        &self,
281        object_id: &iota_types::base_types::ObjectID,
282        version: iota_types::base_types::VersionNumber,
283    ) -> iota_types::storage::error::Result<Option<Object>> {
284        self.cache_traits
285            .object_store
286            .try_get_object_by_key(object_id, version)
287    }
288}
289
290impl WriteStore for RocksDbStore {
291    fn try_insert_checkpoint(
292        &self,
293        checkpoint: &VerifiedCheckpoint,
294    ) -> Result<(), iota_types::storage::error::Error> {
295        if let Some(EndOfEpochData {
296            next_epoch_committee,
297            ..
298        }) = checkpoint.end_of_epoch_data.as_ref()
299        {
300            let next_committee = next_epoch_committee.iter().cloned().collect();
301            let committee =
302                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
303            self.try_insert_committee(committee)?;
304        }
305
306        self.checkpoint_store
307            .insert_verified_checkpoint(checkpoint)
308            .map_err(Into::into)
309    }
310
311    fn try_update_highest_synced_checkpoint(
312        &self,
313        checkpoint: &VerifiedCheckpoint,
314    ) -> Result<(), iota_types::storage::error::Error> {
315        let mut locked = self.highest_synced_checkpoint.lock();
316        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
317            return Ok(());
318        }
319        self.checkpoint_store
320            .update_highest_synced_checkpoint(checkpoint)
321            .map_err(iota_types::storage::error::Error::custom)?;
322        *locked = Some(checkpoint.sequence_number);
323        Ok(())
324    }
325
326    fn try_update_highest_verified_checkpoint(
327        &self,
328        checkpoint: &VerifiedCheckpoint,
329    ) -> Result<(), iota_types::storage::error::Error> {
330        let mut locked = self.highest_verified_checkpoint.lock();
331        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
332            return Ok(());
333        }
334        self.checkpoint_store
335            .update_highest_verified_checkpoint(checkpoint)
336            .map_err(iota_types::storage::error::Error::custom)?;
337        *locked = Some(checkpoint.sequence_number);
338        Ok(())
339    }
340
341    fn try_insert_checkpoint_contents(
342        &self,
343        checkpoint: &VerifiedCheckpoint,
344        contents: VerifiedCheckpointContents,
345    ) -> Result<(), iota_types::storage::error::Error> {
346        self.cache_traits
347            .state_sync_store
348            .try_multi_insert_transaction_and_effects(contents.transactions())
349            .map_err(iota_types::storage::error::Error::custom)?;
350        self.checkpoint_store
351            .insert_verified_checkpoint_contents(checkpoint, contents)
352            .map_err(Into::into)
353    }
354
355    fn try_insert_committee(
356        &self,
357        new_committee: Committee,
358    ) -> Result<(), iota_types::storage::error::Error> {
359        self.committee_store
360            .insert_new_committee(&new_committee)
361            .unwrap();
362        Ok(())
363    }
364}
365
366pub struct RestReadStore {
367    state: Arc<AuthorityState>,
368    rocks: RocksDbStore,
369}
370
371impl RestReadStore {
372    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
373        Self { state, rocks }
374    }
375
376    fn index(&self) -> iota_types::storage::error::Result<&RestIndexStore> {
377        self.state.rest_index.as_deref().ok_or_else(|| {
378            iota_types::storage::error::Error::custom("rest index store is disabled")
379        })
380    }
381}
382
383impl ObjectStore for RestReadStore {
384    fn try_get_object(
385        &self,
386        object_id: &iota_types::base_types::ObjectID,
387    ) -> iota_types::storage::error::Result<Option<Object>> {
388        self.rocks.try_get_object(object_id)
389    }
390
391    fn try_get_object_by_key(
392        &self,
393        object_id: &iota_types::base_types::ObjectID,
394        version: iota_types::base_types::VersionNumber,
395    ) -> iota_types::storage::error::Result<Option<Object>> {
396        self.rocks.try_get_object_by_key(object_id, version)
397    }
398}
399
400impl ReadStore for RestReadStore {
401    fn try_get_committee(
402        &self,
403        epoch: EpochId,
404    ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
405        self.rocks.try_get_committee(epoch)
406    }
407
408    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
409        self.rocks.try_get_latest_checkpoint()
410    }
411
412    fn try_get_highest_verified_checkpoint(
413        &self,
414    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
415        self.rocks.try_get_highest_verified_checkpoint()
416    }
417
418    fn try_get_highest_synced_checkpoint(
419        &self,
420    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
421        self.rocks.try_get_highest_synced_checkpoint()
422    }
423
424    fn try_get_lowest_available_checkpoint(
425        &self,
426    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
427        self.rocks.try_get_lowest_available_checkpoint()
428    }
429
430    fn try_get_checkpoint_by_digest(
431        &self,
432        digest: &CheckpointDigest,
433    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
434        self.rocks.try_get_checkpoint_by_digest(digest)
435    }
436
437    fn try_get_checkpoint_by_sequence_number(
438        &self,
439        sequence_number: CheckpointSequenceNumber,
440    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
441        self.rocks
442            .try_get_checkpoint_by_sequence_number(sequence_number)
443    }
444
445    fn try_get_checkpoint_contents_by_digest(
446        &self,
447        digest: &CheckpointContentsDigest,
448    ) -> iota_types::storage::error::Result<
449        Option<iota_types::messages_checkpoint::CheckpointContents>,
450    > {
451        self.rocks.try_get_checkpoint_contents_by_digest(digest)
452    }
453
454    fn try_get_checkpoint_contents_by_sequence_number(
455        &self,
456        sequence_number: CheckpointSequenceNumber,
457    ) -> iota_types::storage::error::Result<
458        Option<iota_types::messages_checkpoint::CheckpointContents>,
459    > {
460        self.rocks
461            .try_get_checkpoint_contents_by_sequence_number(sequence_number)
462    }
463
464    fn try_get_transaction(
465        &self,
466        digest: &TransactionDigest,
467    ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
468        self.rocks.try_get_transaction(digest)
469    }
470
471    fn try_get_transaction_effects(
472        &self,
473        digest: &TransactionDigest,
474    ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
475        self.rocks.try_get_transaction_effects(digest)
476    }
477
478    fn try_get_events(
479        &self,
480        digest: &TransactionEventsDigest,
481    ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
482        self.rocks.try_get_events(digest)
483    }
484
485    fn try_get_full_checkpoint_contents_by_sequence_number(
486        &self,
487        sequence_number: CheckpointSequenceNumber,
488    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
489        self.rocks
490            .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
491    }
492
493    fn try_get_full_checkpoint_contents(
494        &self,
495        digest: &CheckpointContentsDigest,
496    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
497        self.rocks.try_get_full_checkpoint_contents(digest)
498    }
499}
500
501impl RestStateReader for RestReadStore {
502    fn get_lowest_available_checkpoint_objects(
503        &self,
504    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
505        let highest_pruned_cp = self
506            .state
507            .get_object_cache_reader()
508            .try_get_highest_pruned_checkpoint()
509            .map_err(StorageError::custom)?;
510
511        if highest_pruned_cp == 0 {
512            Ok(0)
513        } else {
514            Ok(highest_pruned_cp + 1)
515        }
516    }
517
518    fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
519        Ok(self.state.get_chain_identifier())
520    }
521
522    fn get_epoch_last_checkpoint(
523        &self,
524        epoch_id: EpochId,
525    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
526        self.rocks
527            .checkpoint_store
528            .get_epoch_last_checkpoint(epoch_id)
529            .map_err(iota_types::storage::error::Error::custom)
530    }
531
532    fn indexes(&self) -> Option<&dyn RestIndexes> {
533        self.index().ok().map(|index| index as _)
534    }
535}
536
537impl RestIndexes for RestIndexStore {
538    fn get_transaction_checkpoint(
539        &self,
540        digest: &TransactionDigest,
541    ) -> iota_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
542        self.get_transaction_info(digest)
543            .map(|maybe_info| maybe_info.map(|info| info.checkpoint))
544            .map_err(StorageError::custom)
545    }
546
547    fn account_owned_objects_info_iter(
548        &self,
549        owner: IotaAddress,
550        cursor: Option<ObjectID>,
551    ) -> Result<Box<dyn Iterator<Item = AccountOwnedObjectInfo> + '_>> {
552        let iter = self.owner_iter(owner, cursor)?.map(
553            |(OwnerIndexKey { owner, object_id }, OwnerIndexInfo { version, type_ })| {
554                AccountOwnedObjectInfo {
555                    owner,
556                    object_id,
557                    version,
558                    type_,
559                }
560            },
561        );
562
563        Ok(Box::new(iter) as _)
564    }
565
566    fn dynamic_field_iter(
567        &self,
568        parent: ObjectID,
569        cursor: Option<ObjectID>,
570    ) -> iota_types::storage::error::Result<
571        Box<dyn Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_>,
572    > {
573        let iter = self.dynamic_field_iter(parent, cursor)?;
574
575        Ok(Box::new(iter) as _)
576    }
577
578    fn get_coin_info(
579        &self,
580        coin_type: &StructTag,
581    ) -> iota_types::storage::error::Result<Option<CoinInfo>> {
582        self.get_coin_info(coin_type)?
583            .map(
584                |CoinIndexInfo {
585                     coin_metadata_object_id,
586                     treasury_object_id,
587                 }| CoinInfo {
588                    coin_metadata_object_id,
589                    treasury_object_id,
590                },
591            )
592            .pipe(Ok)
593    }
594}