iota_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_node_storage::{GrpcIndexes, GrpcStateReader};
8use iota_types::{
9    base_types::{StructTag, TransactionDigest},
10    committee::{Committee, EpochId},
11    effects::{TransactionEffects, TransactionEvents},
12    error::IotaError,
13    messages_checkpoint::{
14        CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15        FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16    },
17    object::Object,
18    storage::{
19        ObjectKey, ObjectStore, ReadStore, WriteStore,
20        error::{Error as StorageError, Result},
21    },
22    transaction::VerifiedTransaction,
23};
24use parking_lot::Mutex;
25use tracing::instrument;
26
27use crate::{
28    authority::AuthorityState, checkpoints::CheckpointStore,
29    epoch::committee_store::CommitteeStore, execution_cache::ExecutionCacheTraitPointers,
30    grpc_indexes::GrpcIndexesStore,
31};
32
33#[derive(Clone)]
34pub struct RocksDbStore {
35    cache_traits: ExecutionCacheTraitPointers,
36
37    committee_store: Arc<CommitteeStore>,
38    checkpoint_store: Arc<CheckpointStore>,
39    // in memory checkpoint watermark sequence numbers
40    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
41    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
42}
43
44impl RocksDbStore {
45    pub fn new(
46        cache_traits: ExecutionCacheTraitPointers,
47        committee_store: Arc<CommitteeStore>,
48        checkpoint_store: Arc<CheckpointStore>,
49    ) -> Self {
50        Self {
51            cache_traits,
52            committee_store,
53            checkpoint_store,
54            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
55            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
56        }
57    }
58
59    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
60        self.cache_traits
61            .object_cache_reader
62            .try_multi_get_objects_by_key(object_keys)
63    }
64
65    pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
66        Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
67    }
68}
69
70impl ReadStore for RocksDbStore {
71    fn try_get_checkpoint_by_digest(
72        &self,
73        digest: &CheckpointDigest,
74    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
75        self.checkpoint_store
76            .get_checkpoint_by_digest(digest)
77            .map_err(Into::into)
78    }
79
80    fn try_get_checkpoint_by_sequence_number(
81        &self,
82        sequence_number: CheckpointSequenceNumber,
83    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
84        self.checkpoint_store
85            .get_checkpoint_by_sequence_number(sequence_number)
86            .map_err(Into::into)
87    }
88
89    fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
90        self.checkpoint_store
91            .get_highest_verified_checkpoint()
92            .map(|maybe_checkpoint| {
93                maybe_checkpoint
94                    .expect("storage should have been initialized with genesis checkpoint")
95            })
96            .map_err(Into::into)
97    }
98
99    fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
100        self.checkpoint_store
101            .get_highest_synced_checkpoint()
102            .map(|maybe_checkpoint| {
103                maybe_checkpoint
104                    .expect("storage should have been initialized with genesis checkpoint")
105            })
106            .map_err(Into::into)
107    }
108
109    fn try_get_lowest_available_checkpoint(
110        &self,
111    ) -> Result<CheckpointSequenceNumber, StorageError> {
112        if let Some(highest_pruned_cp) = self
113            .checkpoint_store
114            .get_highest_pruned_checkpoint_seq_number()
115            .map_err(Into::<StorageError>::into)?
116        {
117            Ok(highest_pruned_cp + 1)
118        } else {
119            Ok(0)
120        }
121    }
122
123    fn try_get_full_checkpoint_contents_by_sequence_number(
124        &self,
125        sequence_number: CheckpointSequenceNumber,
126    ) -> Result<Option<FullCheckpointContents>, StorageError> {
127        self.checkpoint_store
128            .get_full_checkpoint_contents_by_sequence_number(sequence_number)
129            .map_err(Into::into)
130    }
131
132    fn try_get_full_checkpoint_contents(
133        &self,
134        digest: &CheckpointContentsDigest,
135    ) -> Result<Option<FullCheckpointContents>, StorageError> {
136        // First look to see if we saved the complete contents already.
137        if let Some(seq_num) = self
138            .checkpoint_store
139            .get_sequence_number_by_contents_digest(digest)
140            .map_err(iota_types::storage::error::Error::custom)?
141        {
142            let contents = self
143                .checkpoint_store
144                .get_full_checkpoint_contents_by_sequence_number(seq_num)
145                .map_err(iota_types::storage::error::Error::custom)?;
146            if contents.is_some() {
147                return Ok(contents);
148            }
149        }
150
151        // Otherwise gather it from the individual components.
152        // Note we can't insert the constructed contents into `full_checkpoint_content`,
153        // because it needs to be inserted along with
154        // `checkpoint_sequence_by_contents_digest` and `checkpoint_content`.
155        // However at this point it's likely we don't know the corresponding
156        // sequence number yet.
157        self.checkpoint_store
158            .get_checkpoint_contents(digest)
159            .map_err(iota_types::storage::error::Error::custom)?
160            .map(|contents| {
161                let mut transactions = Vec::with_capacity(contents.size());
162                for tx in contents.iter() {
163                    if let (Some(t), Some(e)) = (
164                        self.try_get_transaction(&tx.transaction)?,
165                        self.cache_traits
166                            .transaction_cache_reader
167                            .try_get_effects(&tx.effects)
168                            .map_err(iota_types::storage::error::Error::custom)?,
169                    ) {
170                        transactions.push(iota_types::base_types::ExecutionData::new(
171                            (*t).clone().into_inner(),
172                            e,
173                        ))
174                    } else {
175                        return Result::<
176                            Option<FullCheckpointContents>,
177                            iota_types::storage::error::Error,
178                        >::Ok(None);
179                    }
180                }
181                Ok(Some(
182                    FullCheckpointContents::from_contents_and_execution_data(
183                        contents,
184                        transactions.into_iter(),
185                    ),
186                ))
187            })
188            .transpose()
189            .map(|contents| contents.flatten())
190            .map_err(iota_types::storage::error::Error::custom)
191    }
192
193    fn try_get_committee(
194        &self,
195        epoch: EpochId,
196    ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
197        Ok(self.committee_store.get_committee(&epoch).unwrap())
198    }
199
200    fn try_get_transaction(
201        &self,
202        digest: &TransactionDigest,
203    ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
204        self.cache_traits
205            .transaction_cache_reader
206            .try_get_transaction_block(digest)
207            .map_err(StorageError::custom)
208    }
209
210    fn try_get_transaction_effects(
211        &self,
212        digest: &TransactionDigest,
213    ) -> Result<Option<TransactionEffects>, StorageError> {
214        self.cache_traits
215            .transaction_cache_reader
216            .try_get_executed_effects(digest)
217            .map_err(StorageError::custom)
218    }
219
220    fn try_get_events(
221        &self,
222        digest: &TransactionDigest,
223    ) -> Result<Option<TransactionEvents>, StorageError> {
224        self.cache_traits
225            .transaction_cache_reader
226            .try_get_events(digest)
227            .map_err(StorageError::custom)
228    }
229
230    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
231        self.checkpoint_store
232            .get_highest_executed_checkpoint()
233            .map_err(iota_types::storage::error::Error::custom)?
234            .ok_or_else(|| {
235                iota_types::storage::error::Error::missing("unable to get latest checkpoint")
236            })
237    }
238
239    fn try_get_checkpoint_contents_by_digest(
240        &self,
241        digest: &CheckpointContentsDigest,
242    ) -> iota_types::storage::error::Result<
243        Option<iota_types::messages_checkpoint::CheckpointContents>,
244    > {
245        self.checkpoint_store
246            .get_checkpoint_contents(digest)
247            .map_err(iota_types::storage::error::Error::custom)
248    }
249
250    fn try_get_checkpoint_contents_by_sequence_number(
251        &self,
252        sequence_number: CheckpointSequenceNumber,
253    ) -> iota_types::storage::error::Result<
254        Option<iota_types::messages_checkpoint::CheckpointContents>,
255    > {
256        match self.try_get_checkpoint_by_sequence_number(sequence_number) {
257            Ok(Some(checkpoint)) => {
258                self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
259            }
260            Ok(None) => Ok(None),
261            Err(e) => Err(e),
262        }
263    }
264}
265
266impl ObjectStore for RocksDbStore {
267    fn try_get_object(
268        &self,
269        object_id: &iota_types::base_types::ObjectID,
270    ) -> iota_types::storage::error::Result<Option<Object>> {
271        self.cache_traits.object_store.try_get_object(object_id)
272    }
273
274    fn try_get_object_by_key(
275        &self,
276        object_id: &iota_types::base_types::ObjectID,
277        version: iota_types::base_types::VersionNumber,
278    ) -> iota_types::storage::error::Result<Option<Object>> {
279        self.cache_traits
280            .object_store
281            .try_get_object_by_key(object_id, version)
282    }
283}
284
285impl WriteStore for RocksDbStore {
286    #[instrument(level = "trace", skip_all)]
287    fn try_insert_checkpoint(
288        &self,
289        checkpoint: &VerifiedCheckpoint,
290    ) -> Result<(), iota_types::storage::error::Error> {
291        if let Some(EndOfEpochData {
292            next_epoch_committee,
293            ..
294        }) = checkpoint.end_of_epoch_data.as_ref()
295        {
296            let next_committee = next_epoch_committee.iter().cloned().collect();
297            let committee =
298                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
299            self.try_insert_committee(committee)?;
300        }
301
302        self.checkpoint_store
303            .insert_verified_checkpoint(checkpoint)
304            .map_err(Into::into)
305    }
306
307    fn try_update_highest_synced_checkpoint(
308        &self,
309        checkpoint: &VerifiedCheckpoint,
310    ) -> Result<(), iota_types::storage::error::Error> {
311        let mut locked = self.highest_synced_checkpoint.lock();
312        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
313            return Ok(());
314        }
315        self.checkpoint_store
316            .update_highest_synced_checkpoint(checkpoint)
317            .map_err(iota_types::storage::error::Error::custom)?;
318        *locked = Some(checkpoint.sequence_number);
319        Ok(())
320    }
321
322    fn try_update_highest_verified_checkpoint(
323        &self,
324        checkpoint: &VerifiedCheckpoint,
325    ) -> Result<(), iota_types::storage::error::Error> {
326        let mut locked = self.highest_verified_checkpoint.lock();
327        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
328            return Ok(());
329        }
330        self.checkpoint_store
331            .update_highest_verified_checkpoint(checkpoint)
332            .map_err(iota_types::storage::error::Error::custom)?;
333        *locked = Some(checkpoint.sequence_number);
334        Ok(())
335    }
336
337    fn try_insert_checkpoint_contents(
338        &self,
339        checkpoint: &VerifiedCheckpoint,
340        contents: VerifiedCheckpointContents,
341    ) -> Result<(), iota_types::storage::error::Error> {
342        self.cache_traits
343            .state_sync_store
344            .try_multi_insert_transaction_and_effects(contents.transactions())
345            .map_err(iota_types::storage::error::Error::custom)?;
346        self.checkpoint_store
347            .insert_verified_checkpoint_contents(checkpoint, contents)
348            .map_err(Into::into)
349    }
350
351    fn try_insert_committee(
352        &self,
353        new_committee: Committee,
354    ) -> Result<(), iota_types::storage::error::Error> {
355        self.committee_store
356            .insert_new_committee(&new_committee)
357            .unwrap();
358        Ok(())
359    }
360}
361
362pub struct GrpcReadStore {
363    state: Arc<AuthorityState>,
364    rocks: RocksDbStore,
365}
366
367impl GrpcReadStore {
368    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
369        Self { state, rocks }
370    }
371
372    fn grpc_indexes_store(&self) -> iota_types::storage::error::Result<&GrpcIndexesStore> {
373        self.state.grpc_indexes_store.as_deref().ok_or_else(|| {
374            iota_types::storage::error::Error::custom("gRPC index store is disabled")
375        })
376    }
377}
378
379impl ObjectStore for GrpcReadStore {
380    fn try_get_object(
381        &self,
382        object_id: &iota_types::base_types::ObjectID,
383    ) -> iota_types::storage::error::Result<Option<Object>> {
384        self.rocks.try_get_object(object_id)
385    }
386
387    fn try_get_object_by_key(
388        &self,
389        object_id: &iota_types::base_types::ObjectID,
390        version: iota_types::base_types::VersionNumber,
391    ) -> iota_types::storage::error::Result<Option<Object>> {
392        self.rocks.try_get_object_by_key(object_id, version)
393    }
394}
395
396impl ReadStore for GrpcReadStore {
397    fn try_get_committee(
398        &self,
399        epoch: EpochId,
400    ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
401        self.rocks.try_get_committee(epoch)
402    }
403
404    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
405        self.rocks.try_get_latest_checkpoint()
406    }
407
408    fn try_get_highest_verified_checkpoint(
409        &self,
410    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
411        self.rocks.try_get_highest_verified_checkpoint()
412    }
413
414    fn try_get_highest_synced_checkpoint(
415        &self,
416    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
417        self.rocks.try_get_highest_synced_checkpoint()
418    }
419
420    fn try_get_lowest_available_checkpoint(
421        &self,
422    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
423        self.rocks.try_get_lowest_available_checkpoint()
424    }
425
426    fn try_get_checkpoint_by_digest(
427        &self,
428        digest: &CheckpointDigest,
429    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
430        self.rocks.try_get_checkpoint_by_digest(digest)
431    }
432
433    fn try_get_checkpoint_by_sequence_number(
434        &self,
435        sequence_number: CheckpointSequenceNumber,
436    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
437        self.rocks
438            .try_get_checkpoint_by_sequence_number(sequence_number)
439    }
440
441    fn try_get_checkpoint_contents_by_digest(
442        &self,
443        digest: &CheckpointContentsDigest,
444    ) -> iota_types::storage::error::Result<
445        Option<iota_types::messages_checkpoint::CheckpointContents>,
446    > {
447        self.rocks.try_get_checkpoint_contents_by_digest(digest)
448    }
449
450    fn try_get_checkpoint_contents_by_sequence_number(
451        &self,
452        sequence_number: CheckpointSequenceNumber,
453    ) -> iota_types::storage::error::Result<
454        Option<iota_types::messages_checkpoint::CheckpointContents>,
455    > {
456        self.rocks
457            .try_get_checkpoint_contents_by_sequence_number(sequence_number)
458    }
459
460    fn try_get_transaction(
461        &self,
462        digest: &TransactionDigest,
463    ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
464        self.rocks.try_get_transaction(digest)
465    }
466
467    fn try_get_transaction_effects(
468        &self,
469        digest: &TransactionDigest,
470    ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
471        self.rocks.try_get_transaction_effects(digest)
472    }
473
474    fn try_get_events(
475        &self,
476        digest: &TransactionDigest,
477    ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
478        self.rocks.try_get_events(digest)
479    }
480
481    fn try_get_full_checkpoint_contents_by_sequence_number(
482        &self,
483        sequence_number: CheckpointSequenceNumber,
484    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
485        self.rocks
486            .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
487    }
488
489    fn try_get_full_checkpoint_contents(
490        &self,
491        digest: &CheckpointContentsDigest,
492    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
493        self.rocks.try_get_full_checkpoint_contents(digest)
494    }
495}
496
497impl GrpcStateReader for GrpcReadStore {
498    fn get_lowest_available_checkpoint_objects(
499        &self,
500    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
501        Ok(self
502            .state
503            .get_object_cache_reader()
504            .try_get_highest_pruned_checkpoint()
505            .map_err(StorageError::custom)?
506            .map(|cp| cp + 1)
507            .unwrap_or(0))
508    }
509
510    fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
511        Ok(self.state.get_chain_identifier())
512    }
513
514    fn get_epoch_last_checkpoint(
515        &self,
516        epoch_id: EpochId,
517    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
518        self.rocks
519            .checkpoint_store
520            .get_epoch_last_checkpoint(epoch_id)
521            .map_err(iota_types::storage::error::Error::custom)
522    }
523
524    fn grpc_indexes(&self) -> Option<&dyn GrpcIndexes> {
525        self.grpc_indexes_store().ok().map(|index| index as _)
526    }
527
528    fn get_struct_layout(
529        &self,
530        struct_tag: &StructTag,
531    ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
532        self.state
533            .load_epoch_store_one_call_per_task()
534            .executor()
535            // TODO(cache) - must read through cache
536            .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
537            .get_annotated_layout(struct_tag)
538            .map(|layout| layout.into_layout())
539            .map(Some)
540            .map_err(StorageError::custom)
541    }
542}