iota_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_types::{
8    base_types::{IotaAddress, ObjectID, TransactionDigest},
9    committee::{Committee, EpochId},
10    digests::TransactionEventsDigest,
11    effects::{TransactionEffects, TransactionEvents},
12    error::IotaError,
13    messages_checkpoint::{
14        CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15        FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16    },
17    object::Object,
18    storage::{
19        AccountOwnedObjectInfo, CoinInfo, DynamicFieldIndexInfo, DynamicFieldKey, ObjectKey,
20        ObjectStore, ReadStore, RestIndexes, RestStateReader, TransactionInfo, WriteStore,
21        error::{Error as StorageError, Result},
22    },
23    transaction::VerifiedTransaction,
24};
25use move_core_types::language_storage::StructTag;
26use parking_lot::Mutex;
27use tap::Pipe;
28use tracing::instrument;
29use typed_store::TypedStoreError;
30
31use crate::{
32    authority::AuthorityState,
33    checkpoints::CheckpointStore,
34    epoch::committee_store::CommitteeStore,
35    execution_cache::ExecutionCacheTraitPointers,
36    rest_index::{CoinIndexInfo, OwnerIndexInfo, OwnerIndexKey, RestIndexStore},
37};
38
39#[derive(Clone)]
40pub struct RocksDbStore {
41    cache_traits: ExecutionCacheTraitPointers,
42
43    committee_store: Arc<CommitteeStore>,
44    checkpoint_store: Arc<CheckpointStore>,
45    // in memory checkpoint watermark sequence numbers
46    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
47    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
48}
49
50impl RocksDbStore {
51    pub fn new(
52        cache_traits: ExecutionCacheTraitPointers,
53        committee_store: Arc<CommitteeStore>,
54        checkpoint_store: Arc<CheckpointStore>,
55    ) -> Self {
56        Self {
57            cache_traits,
58            committee_store,
59            checkpoint_store,
60            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
61            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
62        }
63    }
64
65    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
66        self.cache_traits
67            .object_cache_reader
68            .try_multi_get_objects_by_key(object_keys)
69    }
70
71    pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
72        Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
73    }
74}
75
76impl ReadStore for RocksDbStore {
77    fn try_get_checkpoint_by_digest(
78        &self,
79        digest: &CheckpointDigest,
80    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
81        self.checkpoint_store
82            .get_checkpoint_by_digest(digest)
83            .map_err(Into::into)
84    }
85
86    fn try_get_checkpoint_by_sequence_number(
87        &self,
88        sequence_number: CheckpointSequenceNumber,
89    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
90        self.checkpoint_store
91            .get_checkpoint_by_sequence_number(sequence_number)
92            .map_err(Into::into)
93    }
94
95    fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
96        self.checkpoint_store
97            .get_highest_verified_checkpoint()
98            .map(|maybe_checkpoint| {
99                maybe_checkpoint
100                    .expect("storage should have been initialized with genesis checkpoint")
101            })
102            .map_err(Into::into)
103    }
104
105    fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
106        self.checkpoint_store
107            .get_highest_synced_checkpoint()
108            .map(|maybe_checkpoint| {
109                maybe_checkpoint
110                    .expect("storage should have been initialized with genesis checkpoint")
111            })
112            .map_err(Into::into)
113    }
114
115    fn try_get_lowest_available_checkpoint(
116        &self,
117    ) -> Result<CheckpointSequenceNumber, StorageError> {
118        if let Some(highest_pruned_cp) = self
119            .checkpoint_store
120            .get_highest_pruned_checkpoint_seq_number()
121            .map_err(Into::<StorageError>::into)?
122        {
123            Ok(highest_pruned_cp + 1)
124        } else {
125            Ok(0)
126        }
127    }
128
129    fn try_get_full_checkpoint_contents_by_sequence_number(
130        &self,
131        sequence_number: CheckpointSequenceNumber,
132    ) -> Result<Option<FullCheckpointContents>, StorageError> {
133        self.checkpoint_store
134            .get_full_checkpoint_contents_by_sequence_number(sequence_number)
135            .map_err(Into::into)
136    }
137
138    fn try_get_full_checkpoint_contents(
139        &self,
140        digest: &CheckpointContentsDigest,
141    ) -> Result<Option<FullCheckpointContents>, StorageError> {
142        // First look to see if we saved the complete contents already.
143        if let Some(seq_num) = self
144            .checkpoint_store
145            .get_sequence_number_by_contents_digest(digest)
146            .map_err(iota_types::storage::error::Error::custom)?
147        {
148            let contents = self
149                .checkpoint_store
150                .get_full_checkpoint_contents_by_sequence_number(seq_num)
151                .map_err(iota_types::storage::error::Error::custom)?;
152            if contents.is_some() {
153                return Ok(contents);
154            }
155        }
156
157        // Otherwise gather it from the individual components.
158        // Note we can't insert the constructed contents into `full_checkpoint_content`,
159        // because it needs to be inserted along with
160        // `checkpoint_sequence_by_contents_digest` and `checkpoint_content`.
161        // However at this point it's likely we don't know the corresponding
162        // sequence number yet.
163        self.checkpoint_store
164            .get_checkpoint_contents(digest)
165            .map_err(iota_types::storage::error::Error::custom)?
166            .map(|contents| {
167                let mut transactions = Vec::with_capacity(contents.size());
168                for tx in contents.iter() {
169                    if let (Some(t), Some(e)) = (
170                        self.try_get_transaction(&tx.transaction)?,
171                        self.cache_traits
172                            .transaction_cache_reader
173                            .try_get_effects(&tx.effects)
174                            .map_err(iota_types::storage::error::Error::custom)?,
175                    ) {
176                        transactions.push(iota_types::base_types::ExecutionData::new(
177                            (*t).clone().into_inner(),
178                            e,
179                        ))
180                    } else {
181                        return Result::<
182                            Option<FullCheckpointContents>,
183                            iota_types::storage::error::Error,
184                        >::Ok(None);
185                    }
186                }
187                Ok(Some(
188                    FullCheckpointContents::from_contents_and_execution_data(
189                        contents,
190                        transactions.into_iter(),
191                    ),
192                ))
193            })
194            .transpose()
195            .map(|contents| contents.flatten())
196            .map_err(iota_types::storage::error::Error::custom)
197    }
198
199    fn try_get_committee(
200        &self,
201        epoch: EpochId,
202    ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
203        Ok(self.committee_store.get_committee(&epoch).unwrap())
204    }
205
206    fn try_get_transaction(
207        &self,
208        digest: &TransactionDigest,
209    ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
210        self.cache_traits
211            .transaction_cache_reader
212            .try_get_transaction_block(digest)
213            .map_err(StorageError::custom)
214    }
215
216    fn try_get_transaction_effects(
217        &self,
218        digest: &TransactionDigest,
219    ) -> Result<Option<TransactionEffects>, StorageError> {
220        self.cache_traits
221            .transaction_cache_reader
222            .try_get_executed_effects(digest)
223            .map_err(StorageError::custom)
224    }
225
226    fn try_get_events(
227        &self,
228        digest: &TransactionEventsDigest,
229    ) -> Result<Option<TransactionEvents>, StorageError> {
230        self.cache_traits
231            .transaction_cache_reader
232            .try_get_events(digest)
233            .map_err(StorageError::custom)
234    }
235
236    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
237        self.checkpoint_store
238            .get_highest_executed_checkpoint()
239            .map_err(iota_types::storage::error::Error::custom)?
240            .ok_or_else(|| {
241                iota_types::storage::error::Error::missing("unable to get latest checkpoint")
242            })
243    }
244
245    fn try_get_checkpoint_contents_by_digest(
246        &self,
247        digest: &CheckpointContentsDigest,
248    ) -> iota_types::storage::error::Result<
249        Option<iota_types::messages_checkpoint::CheckpointContents>,
250    > {
251        self.checkpoint_store
252            .get_checkpoint_contents(digest)
253            .map_err(iota_types::storage::error::Error::custom)
254    }
255
256    fn try_get_checkpoint_contents_by_sequence_number(
257        &self,
258        sequence_number: CheckpointSequenceNumber,
259    ) -> iota_types::storage::error::Result<
260        Option<iota_types::messages_checkpoint::CheckpointContents>,
261    > {
262        match self.try_get_checkpoint_by_sequence_number(sequence_number) {
263            Ok(Some(checkpoint)) => {
264                self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
265            }
266            Ok(None) => Ok(None),
267            Err(e) => Err(e),
268        }
269    }
270}
271
272impl ObjectStore for RocksDbStore {
273    fn try_get_object(
274        &self,
275        object_id: &iota_types::base_types::ObjectID,
276    ) -> iota_types::storage::error::Result<Option<Object>> {
277        self.cache_traits.object_store.try_get_object(object_id)
278    }
279
280    fn try_get_object_by_key(
281        &self,
282        object_id: &iota_types::base_types::ObjectID,
283        version: iota_types::base_types::VersionNumber,
284    ) -> iota_types::storage::error::Result<Option<Object>> {
285        self.cache_traits
286            .object_store
287            .try_get_object_by_key(object_id, version)
288    }
289}
290
291impl WriteStore for RocksDbStore {
292    #[instrument(level = "trace", skip_all)]
293    fn try_insert_checkpoint(
294        &self,
295        checkpoint: &VerifiedCheckpoint,
296    ) -> Result<(), iota_types::storage::error::Error> {
297        if let Some(EndOfEpochData {
298            next_epoch_committee,
299            ..
300        }) = checkpoint.end_of_epoch_data.as_ref()
301        {
302            let next_committee = next_epoch_committee.iter().cloned().collect();
303            let committee =
304                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
305            self.try_insert_committee(committee)?;
306        }
307
308        self.checkpoint_store
309            .insert_verified_checkpoint(checkpoint)
310            .map_err(Into::into)
311    }
312
313    fn try_update_highest_synced_checkpoint(
314        &self,
315        checkpoint: &VerifiedCheckpoint,
316    ) -> Result<(), iota_types::storage::error::Error> {
317        let mut locked = self.highest_synced_checkpoint.lock();
318        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
319            return Ok(());
320        }
321        self.checkpoint_store
322            .update_highest_synced_checkpoint(checkpoint)
323            .map_err(iota_types::storage::error::Error::custom)?;
324        *locked = Some(checkpoint.sequence_number);
325        Ok(())
326    }
327
328    fn try_update_highest_verified_checkpoint(
329        &self,
330        checkpoint: &VerifiedCheckpoint,
331    ) -> Result<(), iota_types::storage::error::Error> {
332        let mut locked = self.highest_verified_checkpoint.lock();
333        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
334            return Ok(());
335        }
336        self.checkpoint_store
337            .update_highest_verified_checkpoint(checkpoint)
338            .map_err(iota_types::storage::error::Error::custom)?;
339        *locked = Some(checkpoint.sequence_number);
340        Ok(())
341    }
342
343    fn try_insert_checkpoint_contents(
344        &self,
345        checkpoint: &VerifiedCheckpoint,
346        contents: VerifiedCheckpointContents,
347    ) -> Result<(), iota_types::storage::error::Error> {
348        self.cache_traits
349            .state_sync_store
350            .try_multi_insert_transaction_and_effects(contents.transactions())
351            .map_err(iota_types::storage::error::Error::custom)?;
352        self.checkpoint_store
353            .insert_verified_checkpoint_contents(checkpoint, contents)
354            .map_err(Into::into)
355    }
356
357    fn try_insert_committee(
358        &self,
359        new_committee: Committee,
360    ) -> Result<(), iota_types::storage::error::Error> {
361        self.committee_store
362            .insert_new_committee(&new_committee)
363            .unwrap();
364        Ok(())
365    }
366}
367
368pub struct RestReadStore {
369    state: Arc<AuthorityState>,
370    rocks: RocksDbStore,
371}
372
373impl RestReadStore {
374    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
375        Self { state, rocks }
376    }
377
378    fn index(&self) -> iota_types::storage::error::Result<&RestIndexStore> {
379        self.state.rest_index.as_deref().ok_or_else(|| {
380            iota_types::storage::error::Error::custom("rest index store is disabled")
381        })
382    }
383}
384
385impl ObjectStore for RestReadStore {
386    fn try_get_object(
387        &self,
388        object_id: &iota_types::base_types::ObjectID,
389    ) -> iota_types::storage::error::Result<Option<Object>> {
390        self.rocks.try_get_object(object_id)
391    }
392
393    fn try_get_object_by_key(
394        &self,
395        object_id: &iota_types::base_types::ObjectID,
396        version: iota_types::base_types::VersionNumber,
397    ) -> iota_types::storage::error::Result<Option<Object>> {
398        self.rocks.try_get_object_by_key(object_id, version)
399    }
400}
401
402impl ReadStore for RestReadStore {
403    fn try_get_committee(
404        &self,
405        epoch: EpochId,
406    ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
407        self.rocks.try_get_committee(epoch)
408    }
409
410    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
411        self.rocks.try_get_latest_checkpoint()
412    }
413
414    fn try_get_highest_verified_checkpoint(
415        &self,
416    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
417        self.rocks.try_get_highest_verified_checkpoint()
418    }
419
420    fn try_get_highest_synced_checkpoint(
421        &self,
422    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
423        self.rocks.try_get_highest_synced_checkpoint()
424    }
425
426    fn try_get_lowest_available_checkpoint(
427        &self,
428    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
429        self.rocks.try_get_lowest_available_checkpoint()
430    }
431
432    fn try_get_checkpoint_by_digest(
433        &self,
434        digest: &CheckpointDigest,
435    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
436        self.rocks.try_get_checkpoint_by_digest(digest)
437    }
438
439    fn try_get_checkpoint_by_sequence_number(
440        &self,
441        sequence_number: CheckpointSequenceNumber,
442    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
443        self.rocks
444            .try_get_checkpoint_by_sequence_number(sequence_number)
445    }
446
447    fn try_get_checkpoint_contents_by_digest(
448        &self,
449        digest: &CheckpointContentsDigest,
450    ) -> iota_types::storage::error::Result<
451        Option<iota_types::messages_checkpoint::CheckpointContents>,
452    > {
453        self.rocks.try_get_checkpoint_contents_by_digest(digest)
454    }
455
456    fn try_get_checkpoint_contents_by_sequence_number(
457        &self,
458        sequence_number: CheckpointSequenceNumber,
459    ) -> iota_types::storage::error::Result<
460        Option<iota_types::messages_checkpoint::CheckpointContents>,
461    > {
462        self.rocks
463            .try_get_checkpoint_contents_by_sequence_number(sequence_number)
464    }
465
466    fn try_get_transaction(
467        &self,
468        digest: &TransactionDigest,
469    ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
470        self.rocks.try_get_transaction(digest)
471    }
472
473    fn try_get_transaction_effects(
474        &self,
475        digest: &TransactionDigest,
476    ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
477        self.rocks.try_get_transaction_effects(digest)
478    }
479
480    fn try_get_events(
481        &self,
482        digest: &TransactionEventsDigest,
483    ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
484        self.rocks.try_get_events(digest)
485    }
486
487    fn try_get_full_checkpoint_contents_by_sequence_number(
488        &self,
489        sequence_number: CheckpointSequenceNumber,
490    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
491        self.rocks
492            .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
493    }
494
495    fn try_get_full_checkpoint_contents(
496        &self,
497        digest: &CheckpointContentsDigest,
498    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
499        self.rocks.try_get_full_checkpoint_contents(digest)
500    }
501}
502
503impl RestStateReader for RestReadStore {
504    fn get_lowest_available_checkpoint_objects(
505        &self,
506    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
507        Ok(self
508            .state
509            .get_object_cache_reader()
510            .try_get_highest_pruned_checkpoint()
511            .map_err(StorageError::custom)?
512            .map(|cp| cp + 1)
513            .unwrap_or(0))
514    }
515
516    fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
517        Ok(self.state.get_chain_identifier())
518    }
519
520    fn get_epoch_last_checkpoint(
521        &self,
522        epoch_id: EpochId,
523    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
524        self.rocks
525            .checkpoint_store
526            .get_epoch_last_checkpoint(epoch_id)
527            .map_err(iota_types::storage::error::Error::custom)
528    }
529
530    fn indexes(&self) -> Option<&dyn RestIndexes> {
531        self.index().ok().map(|index| index as _)
532    }
533
534    fn get_struct_layout(
535        &self,
536        struct_tag: &move_core_types::language_storage::StructTag,
537    ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
538        self.state
539            .load_epoch_store_one_call_per_task()
540            .executor()
541            // TODO(cache) - must read through cache
542            .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
543            .get_annotated_layout(struct_tag)
544            .map(|layout| layout.into_layout())
545            .map(Some)
546            .map_err(StorageError::custom)
547    }
548}
549
550impl RestIndexes for RestIndexStore {
551    // only used in "grpc-server"
552    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<iota_types::storage::EpochInfo>> {
553        self.get_epoch_info(epoch).map_err(StorageError::custom)
554    }
555
556    // used in both "grpc-server" and "rest-api"
557    fn get_transaction_info(
558        &self,
559        digest: &TransactionDigest,
560    ) -> iota_types::storage::error::Result<Option<TransactionInfo>> {
561        self.get_transaction_info(digest)
562            .map_err(StorageError::custom)
563    }
564
565    // only used in "rest-api"
566    fn account_owned_objects_info_iter(
567        &self,
568        owner: IotaAddress,
569        cursor: Option<ObjectID>,
570    ) -> Result<Box<dyn Iterator<Item = Result<AccountOwnedObjectInfo, TypedStoreError>> + '_>>
571    {
572        let iter = self.owner_iter(owner, cursor)?.map(|result| {
573            result.map(
574                |(OwnerIndexKey { owner, object_id }, OwnerIndexInfo { version, type_ })| {
575                    AccountOwnedObjectInfo {
576                        owner,
577                        object_id,
578                        version,
579                        type_,
580                    }
581                },
582            )
583        });
584
585        Ok(Box::new(iter) as _)
586    }
587
588    // only used in "rest-api"
589    fn dynamic_field_iter(
590        &self,
591        parent: ObjectID,
592        cursor: Option<ObjectID>,
593    ) -> iota_types::storage::error::Result<
594        Box<
595            dyn Iterator<Item = Result<(DynamicFieldKey, DynamicFieldIndexInfo), TypedStoreError>>
596                + '_,
597        >,
598    > {
599        let iter = self.dynamic_field_iter(parent, cursor)?;
600        Ok(Box::new(iter) as _)
601    }
602
603    // only used in "rest-api"
604    fn get_coin_info(
605        &self,
606        coin_type: &StructTag,
607    ) -> iota_types::storage::error::Result<Option<CoinInfo>> {
608        self.get_coin_info(coin_type)?
609            .map(
610                |CoinIndexInfo {
611                     coin_metadata_object_id,
612                     treasury_object_id,
613                 }| CoinInfo {
614                    coin_metadata_object_id,
615                    treasury_object_id,
616                },
617            )
618            .pipe(Ok)
619    }
620}