Skip to main content

iota_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use iota_node_storage::{GrpcIndexes, GrpcStateReader};
8use iota_sdk_types::StructTag;
9use iota_types::{
10    base_types::TransactionDigest,
11    committee::{Committee, EpochId},
12    effects::{TransactionEffects, TransactionEvents},
13    error::IotaError,
14    messages_checkpoint::{
15        CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
16        FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
17    },
18    object::Object,
19    storage::{
20        ObjectKey, ObjectStore, ReadStore, WriteStore,
21        error::{Error as StorageError, Result},
22    },
23    transaction::VerifiedTransaction,
24};
25use parking_lot::Mutex;
26use tracing::instrument;
27
28use crate::{
29    authority::AuthorityState, checkpoints::CheckpointStore,
30    epoch::committee_store::CommitteeStore, execution_cache::ExecutionCacheTraitPointers,
31    grpc_indexes::GrpcIndexesStore,
32};
33
34#[derive(Clone)]
35pub struct RocksDbStore {
36    cache_traits: ExecutionCacheTraitPointers,
37
38    committee_store: Arc<CommitteeStore>,
39    checkpoint_store: Arc<CheckpointStore>,
40    // in memory checkpoint watermark sequence numbers
41    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
42    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
43}
44
45impl RocksDbStore {
46    pub fn new(
47        cache_traits: ExecutionCacheTraitPointers,
48        committee_store: Arc<CommitteeStore>,
49        checkpoint_store: Arc<CheckpointStore>,
50    ) -> Self {
51        Self {
52            cache_traits,
53            committee_store,
54            checkpoint_store,
55            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
56            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
57        }
58    }
59
60    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
61        self.cache_traits
62            .object_cache_reader
63            .try_multi_get_objects_by_key(object_keys)
64    }
65
66    pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
67        Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
68    }
69}
70
71impl ReadStore for RocksDbStore {
72    fn try_get_checkpoint_by_digest(
73        &self,
74        digest: &CheckpointDigest,
75    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
76        self.checkpoint_store
77            .get_checkpoint_by_digest(digest)
78            .map_err(Into::into)
79    }
80
81    fn try_get_checkpoint_by_sequence_number(
82        &self,
83        sequence_number: CheckpointSequenceNumber,
84    ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
85        self.checkpoint_store
86            .get_checkpoint_by_sequence_number(sequence_number)
87            .map_err(Into::into)
88    }
89
90    fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
91        self.checkpoint_store
92            .get_highest_verified_checkpoint()
93            .map(|maybe_checkpoint| {
94                maybe_checkpoint
95                    .expect("storage should have been initialized with genesis checkpoint")
96            })
97            .map_err(Into::into)
98    }
99
100    fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
101        self.checkpoint_store
102            .get_highest_synced_checkpoint()
103            .map(|maybe_checkpoint| {
104                maybe_checkpoint
105                    .expect("storage should have been initialized with genesis checkpoint")
106            })
107            .map_err(Into::into)
108    }
109
110    fn try_get_lowest_available_checkpoint(
111        &self,
112    ) -> Result<CheckpointSequenceNumber, StorageError> {
113        if let Some(highest_pruned_cp) = self
114            .checkpoint_store
115            .get_highest_pruned_checkpoint_seq_number()
116            .map_err(Into::<StorageError>::into)?
117        {
118            Ok(highest_pruned_cp + 1)
119        } else {
120            Ok(0)
121        }
122    }
123
124    fn try_get_full_checkpoint_contents_by_sequence_number(
125        &self,
126        sequence_number: CheckpointSequenceNumber,
127    ) -> Result<Option<FullCheckpointContents>, StorageError> {
128        self.checkpoint_store
129            .get_full_checkpoint_contents_by_sequence_number(sequence_number)
130            .map_err(Into::into)
131    }
132
133    fn try_get_full_checkpoint_contents(
134        &self,
135        digest: &CheckpointContentsDigest,
136    ) -> Result<Option<FullCheckpointContents>, StorageError> {
137        // First look to see if we saved the complete contents already.
138        if let Some(seq_num) = self
139            .checkpoint_store
140            .get_sequence_number_by_contents_digest(digest)
141            .map_err(iota_types::storage::error::Error::custom)?
142        {
143            let contents = self
144                .checkpoint_store
145                .get_full_checkpoint_contents_by_sequence_number(seq_num)
146                .map_err(iota_types::storage::error::Error::custom)?;
147            if contents.is_some() {
148                return Ok(contents);
149            }
150        }
151
152        // Otherwise gather it from the individual components.
153        // Note we can't insert the constructed contents into `full_checkpoint_content`,
154        // because it needs to be inserted along with
155        // `checkpoint_sequence_by_contents_digest` and `checkpoint_content`.
156        // However at this point it's likely we don't know the corresponding
157        // sequence number yet.
158        self.checkpoint_store
159            .get_checkpoint_contents(digest)
160            .map_err(iota_types::storage::error::Error::custom)?
161            .map(|contents| {
162                let mut transactions = Vec::with_capacity(contents.size());
163                for tx in contents.iter() {
164                    if let (Some(t), Some(e)) = (
165                        self.try_get_transaction(&tx.transaction)?,
166                        self.cache_traits
167                            .transaction_cache_reader
168                            .try_get_effects(&tx.effects)
169                            .map_err(iota_types::storage::error::Error::custom)?,
170                    ) {
171                        transactions.push(iota_types::base_types::ExecutionData::new(
172                            (*t).clone().into_inner(),
173                            e,
174                        ))
175                    } else {
176                        return Result::<
177                            Option<FullCheckpointContents>,
178                            iota_types::storage::error::Error,
179                        >::Ok(None);
180                    }
181                }
182                Ok(Some(
183                    FullCheckpointContents::from_contents_and_execution_data(
184                        contents,
185                        transactions.into_iter(),
186                    ),
187                ))
188            })
189            .transpose()
190            .map(|contents| contents.flatten())
191            .map_err(iota_types::storage::error::Error::custom)
192    }
193
194    fn try_get_committee(
195        &self,
196        epoch: EpochId,
197    ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
198        Ok(self.committee_store.get_committee(&epoch).unwrap())
199    }
200
201    fn try_get_transaction(
202        &self,
203        digest: &TransactionDigest,
204    ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
205        self.cache_traits
206            .transaction_cache_reader
207            .try_get_transaction_block(digest)
208            .map_err(StorageError::custom)
209    }
210
211    fn try_get_transaction_effects(
212        &self,
213        digest: &TransactionDigest,
214    ) -> Result<Option<TransactionEffects>, StorageError> {
215        self.cache_traits
216            .transaction_cache_reader
217            .try_get_executed_effects(digest)
218            .map_err(StorageError::custom)
219    }
220
221    fn try_get_events(
222        &self,
223        digest: &TransactionDigest,
224    ) -> Result<Option<TransactionEvents>, StorageError> {
225        self.cache_traits
226            .transaction_cache_reader
227            .try_get_events(digest)
228            .map_err(StorageError::custom)
229    }
230
231    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
232        self.checkpoint_store
233            .get_highest_executed_checkpoint()
234            .map_err(iota_types::storage::error::Error::custom)?
235            .ok_or_else(|| {
236                iota_types::storage::error::Error::missing("unable to get latest checkpoint")
237            })
238    }
239
240    fn try_get_checkpoint_contents_by_digest(
241        &self,
242        digest: &CheckpointContentsDigest,
243    ) -> iota_types::storage::error::Result<
244        Option<iota_types::messages_checkpoint::CheckpointContents>,
245    > {
246        self.checkpoint_store
247            .get_checkpoint_contents(digest)
248            .map_err(iota_types::storage::error::Error::custom)
249    }
250
251    fn try_get_checkpoint_contents_by_sequence_number(
252        &self,
253        sequence_number: CheckpointSequenceNumber,
254    ) -> iota_types::storage::error::Result<
255        Option<iota_types::messages_checkpoint::CheckpointContents>,
256    > {
257        match self.try_get_checkpoint_by_sequence_number(sequence_number) {
258            Ok(Some(checkpoint)) => {
259                self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
260            }
261            Ok(None) => Ok(None),
262            Err(e) => Err(e),
263        }
264    }
265}
266
267impl ObjectStore for RocksDbStore {
268    fn try_get_object(
269        &self,
270        object_id: &iota_sdk_types::ObjectId,
271    ) -> iota_types::storage::error::Result<Option<Object>> {
272        self.cache_traits.object_store.try_get_object(object_id)
273    }
274
275    fn try_get_object_by_key(
276        &self,
277        object_id: &iota_sdk_types::ObjectId,
278        version: iota_types::base_types::VersionNumber,
279    ) -> iota_types::storage::error::Result<Option<Object>> {
280        self.cache_traits
281            .object_store
282            .try_get_object_by_key(object_id, version)
283    }
284}
285
286impl WriteStore for RocksDbStore {
287    #[instrument(level = "trace", skip_all)]
288    fn try_insert_checkpoint(
289        &self,
290        checkpoint: &VerifiedCheckpoint,
291    ) -> Result<(), iota_types::storage::error::Error> {
292        if let Some(EndOfEpochData {
293            next_epoch_committee,
294            ..
295        }) = checkpoint.end_of_epoch_data.as_ref()
296        {
297            let next_committee = next_epoch_committee.iter().cloned().collect();
298            let committee =
299                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
300            self.try_insert_committee(committee)?;
301        }
302
303        self.checkpoint_store
304            .insert_verified_checkpoint(checkpoint)
305            .map_err(Into::into)
306    }
307
308    fn try_update_highest_synced_checkpoint(
309        &self,
310        checkpoint: &VerifiedCheckpoint,
311    ) -> Result<(), iota_types::storage::error::Error> {
312        let mut locked = self.highest_synced_checkpoint.lock();
313        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
314            return Ok(());
315        }
316        self.checkpoint_store
317            .update_highest_synced_checkpoint(checkpoint)
318            .map_err(iota_types::storage::error::Error::custom)?;
319        *locked = Some(checkpoint.sequence_number);
320        Ok(())
321    }
322
323    fn try_update_highest_verified_checkpoint(
324        &self,
325        checkpoint: &VerifiedCheckpoint,
326    ) -> Result<(), iota_types::storage::error::Error> {
327        let mut locked = self.highest_verified_checkpoint.lock();
328        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
329            return Ok(());
330        }
331        self.checkpoint_store
332            .update_highest_verified_checkpoint(checkpoint)
333            .map_err(iota_types::storage::error::Error::custom)?;
334        *locked = Some(checkpoint.sequence_number);
335        Ok(())
336    }
337
338    fn try_insert_checkpoint_contents(
339        &self,
340        checkpoint: &VerifiedCheckpoint,
341        contents: VerifiedCheckpointContents,
342    ) -> Result<(), iota_types::storage::error::Error> {
343        self.cache_traits
344            .state_sync_store
345            .try_multi_insert_transaction_and_effects(contents.transactions())
346            .map_err(iota_types::storage::error::Error::custom)?;
347        self.checkpoint_store
348            .insert_verified_checkpoint_contents(checkpoint, contents)
349            .map_err(Into::into)
350    }
351
352    fn try_insert_committee(
353        &self,
354        new_committee: Committee,
355    ) -> Result<(), iota_types::storage::error::Error> {
356        self.committee_store
357            .insert_new_committee(&new_committee)
358            .unwrap();
359        Ok(())
360    }
361}
362
363pub struct GrpcReadStore {
364    state: Arc<AuthorityState>,
365    rocks: RocksDbStore,
366}
367
368impl GrpcReadStore {
369    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
370        Self { state, rocks }
371    }
372
373    fn grpc_indexes_store(&self) -> iota_types::storage::error::Result<&GrpcIndexesStore> {
374        self.state.grpc_indexes_store.as_deref().ok_or_else(|| {
375            iota_types::storage::error::Error::custom("gRPC index store is disabled")
376        })
377    }
378}
379
380impl ObjectStore for GrpcReadStore {
381    fn try_get_object(
382        &self,
383        object_id: &iota_sdk_types::ObjectId,
384    ) -> iota_types::storage::error::Result<Option<Object>> {
385        self.rocks.try_get_object(object_id)
386    }
387
388    fn try_get_object_by_key(
389        &self,
390        object_id: &iota_sdk_types::ObjectId,
391        version: iota_types::base_types::VersionNumber,
392    ) -> iota_types::storage::error::Result<Option<Object>> {
393        self.rocks.try_get_object_by_key(object_id, version)
394    }
395}
396
397impl ReadStore for GrpcReadStore {
398    fn try_get_committee(
399        &self,
400        epoch: EpochId,
401    ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
402        self.rocks.try_get_committee(epoch)
403    }
404
405    fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
406        self.rocks.try_get_latest_checkpoint()
407    }
408
409    fn try_get_highest_verified_checkpoint(
410        &self,
411    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
412        self.rocks.try_get_highest_verified_checkpoint()
413    }
414
415    fn try_get_highest_synced_checkpoint(
416        &self,
417    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
418        self.rocks.try_get_highest_synced_checkpoint()
419    }
420
421    fn try_get_lowest_available_checkpoint(
422        &self,
423    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
424        self.rocks.try_get_lowest_available_checkpoint()
425    }
426
427    fn try_get_checkpoint_by_digest(
428        &self,
429        digest: &CheckpointDigest,
430    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
431        self.rocks.try_get_checkpoint_by_digest(digest)
432    }
433
434    fn try_get_checkpoint_by_sequence_number(
435        &self,
436        sequence_number: CheckpointSequenceNumber,
437    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
438        self.rocks
439            .try_get_checkpoint_by_sequence_number(sequence_number)
440    }
441
442    fn try_get_checkpoint_contents_by_digest(
443        &self,
444        digest: &CheckpointContentsDigest,
445    ) -> iota_types::storage::error::Result<
446        Option<iota_types::messages_checkpoint::CheckpointContents>,
447    > {
448        self.rocks.try_get_checkpoint_contents_by_digest(digest)
449    }
450
451    fn try_get_checkpoint_contents_by_sequence_number(
452        &self,
453        sequence_number: CheckpointSequenceNumber,
454    ) -> iota_types::storage::error::Result<
455        Option<iota_types::messages_checkpoint::CheckpointContents>,
456    > {
457        self.rocks
458            .try_get_checkpoint_contents_by_sequence_number(sequence_number)
459    }
460
461    fn try_get_transaction(
462        &self,
463        digest: &TransactionDigest,
464    ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
465        self.rocks.try_get_transaction(digest)
466    }
467
468    fn try_get_transaction_effects(
469        &self,
470        digest: &TransactionDigest,
471    ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
472        self.rocks.try_get_transaction_effects(digest)
473    }
474
475    fn try_get_events(
476        &self,
477        digest: &TransactionDigest,
478    ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
479        self.rocks.try_get_events(digest)
480    }
481
482    fn try_get_full_checkpoint_contents_by_sequence_number(
483        &self,
484        sequence_number: CheckpointSequenceNumber,
485    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
486        self.rocks
487            .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
488    }
489
490    fn try_get_full_checkpoint_contents(
491        &self,
492        digest: &CheckpointContentsDigest,
493    ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
494        self.rocks.try_get_full_checkpoint_contents(digest)
495    }
496}
497
498impl GrpcStateReader for GrpcReadStore {
499    fn get_lowest_available_checkpoint_objects(
500        &self,
501    ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
502        Ok(self
503            .state
504            .get_object_cache_reader()
505            .try_get_highest_pruned_checkpoint()
506            .map_err(StorageError::custom)?
507            .map(|cp| cp + 1)
508            .unwrap_or(0))
509    }
510
511    fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
512        Ok(self.state.get_chain_identifier())
513    }
514
515    fn get_epoch_last_checkpoint(
516        &self,
517        epoch_id: EpochId,
518    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
519        self.rocks
520            .checkpoint_store
521            .get_epoch_last_checkpoint(epoch_id)
522            .map_err(iota_types::storage::error::Error::custom)
523    }
524
525    fn grpc_indexes(&self) -> Option<&dyn GrpcIndexes> {
526        self.grpc_indexes_store().ok().map(|index| index as _)
527    }
528
529    fn get_struct_layout(
530        &self,
531        struct_tag: &StructTag,
532    ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
533        self.state
534            .load_epoch_store_one_call_per_task()
535            .executor()
536            // TODO(cache) - must read through cache
537            .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
538            .get_annotated_layout(struct_tag)
539            .map(|layout| layout.into_layout())
540            .map(Some)
541            .map_err(StorageError::custom)
542    }
543}