iota_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_types::{
8    base_types::{IotaAddress, ObjectID, TransactionDigest},
9    committee::{Committee, EpochId},
10    digests::TransactionEventsDigest,
11    effects::{TransactionEffects, TransactionEvents},
12    error::IotaError,
13    messages_checkpoint::{
14        CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15        FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16    },
17    object::Object,
18    storage::{
19        AccountOwnedObjectInfo, CoinInfo, DynamicFieldIndexInfo, DynamicFieldKey, ObjectKey,
20        ObjectStore, ReadStore, RestStateReader, WriteStore,
21        error::{Error as StorageError, Result},
22    },
23    transaction::VerifiedTransaction,
24};
25use move_core_types::language_storage::StructTag;
26use parking_lot::Mutex;
27use tap::Pipe;
28
29use crate::{
30    authority::AuthorityState,
31    checkpoints::CheckpointStore,
32    epoch::committee_store::CommitteeStore,
33    execution_cache::ExecutionCacheTraitPointers,
34    rest_index::{CoinIndexInfo, OwnerIndexInfo, OwnerIndexKey, RestIndexStore},
35};
36
37#[derive(Clone)]
38pub struct RocksDbStore {
39    cache_traits: ExecutionCacheTraitPointers,
40
41    committee_store: Arc<CommitteeStore>,
42    checkpoint_store: Arc<CheckpointStore>,
43    // in memory checkpoint watermark sequence numbers
44    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
45    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
46}
47
48impl RocksDbStore {
49    pub fn new(
50        cache_traits: ExecutionCacheTraitPointers,
51        committee_store: Arc<CommitteeStore>,
52        checkpoint_store: Arc<CheckpointStore>,
53    ) -> Self {
54        Self {
55            cache_traits,
56            committee_store,
57            checkpoint_store,
58            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
59            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
60        }
61    }
62
63    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
64        self.cache_traits
65            .object_cache_reader
66            .multi_get_objects_by_key(object_keys)
67    }
68
69    pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
70        Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
71    }
72}
73
74impl ReadStore for RocksDbStore {
75    fn get_checkpoint_by_digest(
76        &self,
77        digest: &CheckpointDigest,
78    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
79        self.checkpoint_store
80            .get_checkpoint_by_digest(digest)
81            .map_err(Into::into)
82    }
83
84    fn get_checkpoint_by_sequence_number(
85        &self,
86        sequence_number: CheckpointSequenceNumber,
87    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
88        self.checkpoint_store
89            .get_checkpoint_by_sequence_number(sequence_number)
90            .map_err(Into::into)
91    }
92
93    fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
94        self.checkpoint_store
95            .get_highest_verified_checkpoint()
96            .map(|maybe_checkpoint| {
97                maybe_checkpoint
98                    .expect("storage should have been initialized with genesis checkpoint")
99            })
100            .map_err(Into::into)
101    }
102
103    fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
104        self.checkpoint_store
105            .get_highest_synced_checkpoint()
106            .map(|maybe_checkpoint| {
107                maybe_checkpoint
108                    .expect("storage should have been initialized with genesis checkpoint")
109            })
110            .map_err(Into::into)
111    }
112
113    fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
114        let highest_pruned_cp = self
115            .checkpoint_store
116            .get_highest_pruned_checkpoint_seq_number()
117            .map_err(Into::<StorageError>::into)?;
118
119        if highest_pruned_cp == 0 {
120            Ok(0)
121        } else {
122            Ok(highest_pruned_cp + 1)
123        }
124    }
125
126    fn get_full_checkpoint_contents_by_sequence_number(
127        &self,
128        sequence_number: CheckpointSequenceNumber,
129    ) -> Result<Option<FullCheckpointContents>, StorageError> {
130        self.checkpoint_store
131            .get_full_checkpoint_contents_by_sequence_number(sequence_number)
132            .map_err(Into::into)
133    }
134
135    fn get_full_checkpoint_contents(
136        &self,
137        digest: &CheckpointContentsDigest,
138    ) -> Result<Option<FullCheckpointContents>, StorageError> {
139        // First look to see if we saved the complete contents already.
140        if let Some(seq_num) = self
141            .checkpoint_store
142            .get_sequence_number_by_contents_digest(digest)
143            .map_err(iota_types::storage::error::Error::custom)?
144        {
145            let contents = self
146                .checkpoint_store
147                .get_full_checkpoint_contents_by_sequence_number(seq_num)
148                .map_err(iota_types::storage::error::Error::custom)?;
149            if contents.is_some() {
150                return Ok(contents);
151            }
152        }
153
154        // Otherwise gather it from the individual components.
155        // Note we can't insert the constructed contents into `full_checkpoint_content`,
156        // because it needs to be inserted along with
157        // `checkpoint_sequence_by_contents_digest` and `checkpoint_content`.
158        // However at this point it's likely we don't know the corresponding
159        // sequence number yet.
160        self.checkpoint_store
161            .get_checkpoint_contents(digest)
162            .map_err(iota_types::storage::error::Error::custom)?
163            .map(|contents| {
164                let mut transactions = Vec::with_capacity(contents.size());
165                for tx in contents.iter() {
166                    if let (Some(t), Some(e)) = (
167                        self.get_transaction(&tx.transaction)?,
168                        self.cache_traits
169                            .transaction_cache_reader
170                            .get_effects(&tx.effects)
171                            .map_err(iota_types::storage::error::Error::custom)?,
172                    ) {
173                        transactions.push(iota_types::base_types::ExecutionData::new(
174                            (*t).clone().into_inner(),
175                            e,
176                        ))
177                    } else {
178                        return Result::<
179                            Option<FullCheckpointContents>,
180                            iota_types::storage::error::Error,
181                        >::Ok(None);
182                    }
183                }
184                Ok(Some(
185                    FullCheckpointContents::from_contents_and_execution_data(
186                        contents,
187                        transactions.into_iter(),
188                    ),
189                ))
190            })
191            .transpose()
192            .map(|contents| contents.flatten())
193            .map_err(iota_types::storage::error::Error::custom)
194    }
195
196    fn get_committee(
197        &self,
198        epoch: EpochId,
199    ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
200        Ok(self.committee_store.get_committee(&epoch).unwrap())
201    }
202
203    fn get_transaction(
204        &self,
205        digest: &TransactionDigest,
206    ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
207        self.cache_traits
208            .transaction_cache_reader
209            .get_transaction_block(digest)
210            .map_err(StorageError::custom)
211    }
212
213    fn get_transaction_effects(
214        &self,
215        digest: &TransactionDigest,
216    ) -> Result<Option<TransactionEffects>, StorageError> {
217        self.cache_traits
218            .transaction_cache_reader
219            .get_executed_effects(digest)
220            .map_err(StorageError::custom)
221    }
222
223    fn get_events(
224        &self,
225        digest: &TransactionEventsDigest,
226    ) -> Result<Option<TransactionEvents>, StorageError> {
227        self.cache_traits
228            .transaction_cache_reader
229            .get_events(digest)
230            .map_err(StorageError::custom)
231    }
232
233    fn get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
234        self.checkpoint_store
235            .get_highest_executed_checkpoint()
236            .map_err(iota_types::storage::error::Error::custom)?
237            .ok_or_else(|| {
238                iota_types::storage::error::Error::missing("unable to get latest checkpoint")
239            })
240    }
241
242    fn get_checkpoint_contents_by_digest(
243        &self,
244        digest: &CheckpointContentsDigest,
245    ) -> iota_types::storage::error::Result<
246        Option<iota_types::messages_checkpoint::CheckpointContents>,
247    > {
248        self.checkpoint_store
249            .get_checkpoint_contents(digest)
250            .map_err(iota_types::storage::error::Error::custom)
251    }
252
253    fn get_checkpoint_contents_by_sequence_number(
254        &self,
255        sequence_number: CheckpointSequenceNumber,
256    ) -> iota_types::storage::error::Result<
257        Option<iota_types::messages_checkpoint::CheckpointContents>,
258    > {
259        match self.get_checkpoint_by_sequence_number(sequence_number) {
260            Ok(Some(checkpoint)) => {
261                self.get_checkpoint_contents_by_digest(&checkpoint.content_digest)
262            }
263            Ok(None) => Ok(None),
264            Err(e) => Err(e),
265        }
266    }
267}
268
269impl ObjectStore for RocksDbStore {
270    fn get_object(
271        &self,
272        object_id: &iota_types::base_types::ObjectID,
273    ) -> iota_types::storage::error::Result<Option<Object>> {
274        self.cache_traits.object_store.get_object(object_id)
275    }
276
277    fn get_object_by_key(
278        &self,
279        object_id: &iota_types::base_types::ObjectID,
280        version: iota_types::base_types::VersionNumber,
281    ) -> iota_types::storage::error::Result<Option<Object>> {
282        self.cache_traits
283            .object_store
284            .get_object_by_key(object_id, version)
285    }
286}
287
288impl WriteStore for RocksDbStore {
289    fn insert_checkpoint(
290        &self,
291        checkpoint: &VerifiedCheckpoint,
292    ) -> Result<(), iota_types::storage::error::Error> {
293        if let Some(EndOfEpochData {
294            next_epoch_committee,
295            ..
296        }) = checkpoint.end_of_epoch_data.as_ref()
297        {
298            let next_committee = next_epoch_committee.iter().cloned().collect();
299            let committee =
300                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
301            self.insert_committee(committee)?;
302        }
303
304        self.checkpoint_store
305            .insert_verified_checkpoint(checkpoint)
306            .map_err(Into::into)
307    }
308
309    fn update_highest_synced_checkpoint(
310        &self,
311        checkpoint: &VerifiedCheckpoint,
312    ) -> Result<(), iota_types::storage::error::Error> {
313        let mut locked = self.highest_synced_checkpoint.lock();
314        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
315            return Ok(());
316        }
317        self.checkpoint_store
318            .update_highest_synced_checkpoint(checkpoint)
319            .map_err(iota_types::storage::error::Error::custom)?;
320        *locked = Some(checkpoint.sequence_number);
321        Ok(())
322    }
323
324    fn update_highest_verified_checkpoint(
325        &self,
326        checkpoint: &VerifiedCheckpoint,
327    ) -> Result<(), iota_types::storage::error::Error> {
328        let mut locked = self.highest_verified_checkpoint.lock();
329        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
330            return Ok(());
331        }
332        self.checkpoint_store
333            .update_highest_verified_checkpoint(checkpoint)
334            .map_err(iota_types::storage::error::Error::custom)?;
335        *locked = Some(checkpoint.sequence_number);
336        Ok(())
337    }
338
339    fn insert_checkpoint_contents(
340        &self,
341        checkpoint: &VerifiedCheckpoint,
342        contents: VerifiedCheckpointContents,
343    ) -> Result<(), iota_types::storage::error::Error> {
344        self.cache_traits
345            .state_sync_store
346            .multi_insert_transaction_and_effects(contents.transactions())
347            .map_err(iota_types::storage::error::Error::custom)?;
348        self.checkpoint_store
349            .insert_verified_checkpoint_contents(checkpoint, contents)
350            .map_err(Into::into)
351    }
352
353    fn insert_committee(
354        &self,
355        new_committee: Committee,
356    ) -> Result<(), iota_types::storage::error::Error> {
357        self.committee_store
358            .insert_new_committee(&new_committee)
359            .unwrap();
360        Ok(())
361    }
362}
363
364pub struct RestReadStore {
365    state: Arc<AuthorityState>,
366    rocks: RocksDbStore,
367}
368
369impl RestReadStore {
370    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
371        Self { state, rocks }
372    }
373
374    fn index(&self) -> iota_types::storage::error::Result<&RestIndexStore> {
375        self.state.rest_index.as_deref().ok_or_else(|| {
376            iota_types::storage::error::Error::custom("rest index store is disabled")
377        })
378    }
379}
380
381impl ObjectStore for RestReadStore {
382    fn get_object(
383        &self,
384        object_id: &iota_types::base_types::ObjectID,
385    ) -> iota_types::storage::error::Result<Option<Object>> {
386        self.rocks.get_object(object_id)
387    }
388
389    fn get_object_by_key(
390        &self,
391        object_id: &iota_types::base_types::ObjectID,
392        version: iota_types::base_types::VersionNumber,
393    ) -> iota_types::storage::error::Result<Option<Object>> {
394        self.rocks.get_object_by_key(object_id, version)
395    }
396}
397
398impl ReadStore for RestReadStore {
399    fn get_committee(
400        &self,
401        epoch: EpochId,
402    ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
403        self.rocks.get_committee(epoch)
404    }
405
406    fn get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
407        self.rocks.get_latest_checkpoint()
408    }
409
410    fn get_highest_verified_checkpoint(
411        &self,
412    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
413        self.rocks.get_highest_verified_checkpoint()
414    }
415
416    fn get_highest_synced_checkpoint(
417        &self,
418    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
419        self.rocks.get_highest_synced_checkpoint()
420    }
421
422    fn get_lowest_available_checkpoint(
423        &self,
424    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
425        self.rocks.get_lowest_available_checkpoint()
426    }
427
428    fn get_checkpoint_by_digest(
429        &self,
430        digest: &CheckpointDigest,
431    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
432        self.rocks.get_checkpoint_by_digest(digest)
433    }
434
435    fn get_checkpoint_by_sequence_number(
436        &self,
437        sequence_number: CheckpointSequenceNumber,
438    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
439        self.rocks
440            .get_checkpoint_by_sequence_number(sequence_number)
441    }
442
443    fn get_checkpoint_contents_by_digest(
444        &self,
445        digest: &CheckpointContentsDigest,
446    ) -> iota_types::storage::error::Result<
447        Option<iota_types::messages_checkpoint::CheckpointContents>,
448    > {
449        self.rocks.get_checkpoint_contents_by_digest(digest)
450    }
451
452    fn get_checkpoint_contents_by_sequence_number(
453        &self,
454        sequence_number: CheckpointSequenceNumber,
455    ) -> iota_types::storage::error::Result<
456        Option<iota_types::messages_checkpoint::CheckpointContents>,
457    > {
458        self.rocks
459            .get_checkpoint_contents_by_sequence_number(sequence_number)
460    }
461
462    fn get_transaction(
463        &self,
464        digest: &TransactionDigest,
465    ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
466        self.rocks.get_transaction(digest)
467    }
468
469    fn get_transaction_effects(
470        &self,
471        digest: &TransactionDigest,
472    ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
473        self.rocks.get_transaction_effects(digest)
474    }
475
476    fn get_events(
477        &self,
478        digest: &TransactionEventsDigest,
479    ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
480        self.rocks.get_events(digest)
481    }
482
483    fn get_full_checkpoint_contents_by_sequence_number(
484        &self,
485        sequence_number: CheckpointSequenceNumber,
486    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
487        self.rocks
488            .get_full_checkpoint_contents_by_sequence_number(sequence_number)
489    }
490
491    fn get_full_checkpoint_contents(
492        &self,
493        digest: &CheckpointContentsDigest,
494    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
495        self.rocks.get_full_checkpoint_contents(digest)
496    }
497}
498
499impl RestStateReader for RestReadStore {
500    fn get_transaction_checkpoint(
501        &self,
502        digest: &TransactionDigest,
503    ) -> iota_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
504        self.index()?
505            .get_transaction_info(digest)
506            .map(|maybe_info| maybe_info.map(|info| info.checkpoint))
507            .map_err(StorageError::custom)
508    }
509
510    fn get_lowest_available_checkpoint_objects(
511        &self,
512    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
513        let highest_pruned_cp = self
514            .state
515            .get_object_cache_reader()
516            .get_highest_pruned_checkpoint()
517            .map_err(StorageError::custom)?;
518
519        if highest_pruned_cp == 0 {
520            Ok(0)
521        } else {
522            Ok(highest_pruned_cp + 1)
523        }
524    }
525
526    fn get_chain_identifier(
527        &self,
528    ) -> iota_types::storage::error::Result<iota_types::digests::ChainIdentifier> {
529        self.state
530            .get_chain_identifier()
531            .ok_or_else(|| StorageError::missing("unable to query chain identifier"))
532    }
533
534    fn account_owned_objects_info_iter(
535        &self,
536        owner: IotaAddress,
537        cursor: Option<ObjectID>,
538    ) -> Result<Box<dyn Iterator<Item = AccountOwnedObjectInfo> + '_>> {
539        let iter = self.index()?.owner_iter(owner, cursor)?.map(
540            |(OwnerIndexKey { owner, object_id }, OwnerIndexInfo { version, type_ })| {
541                AccountOwnedObjectInfo {
542                    owner,
543                    object_id,
544                    version,
545                    type_,
546                }
547            },
548        );
549
550        Ok(Box::new(iter) as _)
551    }
552
553    fn dynamic_field_iter(
554        &self,
555        parent: ObjectID,
556        cursor: Option<ObjectID>,
557    ) -> iota_types::storage::error::Result<
558        Box<dyn Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_>,
559    > {
560        let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
561
562        Ok(Box::new(iter) as _)
563    }
564
565    fn get_coin_info(
566        &self,
567        coin_type: &StructTag,
568    ) -> iota_types::storage::error::Result<Option<CoinInfo>> {
569        self.index()?
570            .get_coin_info(coin_type)?
571            .map(
572                |CoinIndexInfo {
573                     coin_metadata_object_id,
574                     treasury_object_id,
575                 }| CoinInfo {
576                    coin_metadata_object_id,
577                    treasury_object_id,
578                },
579            )
580            .pipe(Ok)
581    }
582
583    fn get_epoch_last_checkpoint(
584        &self,
585        epoch_id: EpochId,
586    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
587        self.rocks
588            .checkpoint_store
589            .get_epoch_last_checkpoint(epoch_id)
590            .map_err(iota_types::storage::error::Error::custom)
591    }
592}