1use std::sync::Arc;
6
7use iota_types::{
8 base_types::{IotaAddress, ObjectID, TransactionDigest},
9 committee::{Committee, EpochId},
10 digests::TransactionEventsDigest,
11 effects::{TransactionEffects, TransactionEvents},
12 error::IotaError,
13 messages_checkpoint::{
14 CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15 FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16 },
17 object::Object,
18 storage::{
19 AccountOwnedObjectInfo, CoinInfo, DynamicFieldIndexInfo, DynamicFieldKey, ObjectKey,
20 ObjectStore, ReadStore, RestIndexes, RestStateReader, TransactionInfo, WriteStore,
21 error::{Error as StorageError, Result},
22 },
23 transaction::VerifiedTransaction,
24};
25use move_core_types::language_storage::StructTag;
26use parking_lot::Mutex;
27use tap::Pipe;
28use tracing::instrument;
29use typed_store::TypedStoreError;
30
31use crate::{
32 authority::AuthorityState,
33 checkpoints::CheckpointStore,
34 epoch::committee_store::CommitteeStore,
35 execution_cache::ExecutionCacheTraitPointers,
36 rest_index::{CoinIndexInfo, OwnerIndexInfo, OwnerIndexKey, RestIndexStore},
37};
38
39#[derive(Clone)]
40pub struct RocksDbStore {
41 cache_traits: ExecutionCacheTraitPointers,
42
43 committee_store: Arc<CommitteeStore>,
44 checkpoint_store: Arc<CheckpointStore>,
45 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
47 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
48}
49
50impl RocksDbStore {
51 pub fn new(
52 cache_traits: ExecutionCacheTraitPointers,
53 committee_store: Arc<CommitteeStore>,
54 checkpoint_store: Arc<CheckpointStore>,
55 ) -> Self {
56 Self {
57 cache_traits,
58 committee_store,
59 checkpoint_store,
60 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
61 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
62 }
63 }
64
65 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
66 self.cache_traits
67 .object_cache_reader
68 .try_multi_get_objects_by_key(object_keys)
69 }
70
71 pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
72 Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
73 }
74}
75
76impl ReadStore for RocksDbStore {
77 fn try_get_checkpoint_by_digest(
78 &self,
79 digest: &CheckpointDigest,
80 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
81 self.checkpoint_store
82 .get_checkpoint_by_digest(digest)
83 .map_err(Into::into)
84 }
85
86 fn try_get_checkpoint_by_sequence_number(
87 &self,
88 sequence_number: CheckpointSequenceNumber,
89 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
90 self.checkpoint_store
91 .get_checkpoint_by_sequence_number(sequence_number)
92 .map_err(Into::into)
93 }
94
95 fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
96 self.checkpoint_store
97 .get_highest_verified_checkpoint()
98 .map(|maybe_checkpoint| {
99 maybe_checkpoint
100 .expect("storage should have been initialized with genesis checkpoint")
101 })
102 .map_err(Into::into)
103 }
104
105 fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
106 self.checkpoint_store
107 .get_highest_synced_checkpoint()
108 .map(|maybe_checkpoint| {
109 maybe_checkpoint
110 .expect("storage should have been initialized with genesis checkpoint")
111 })
112 .map_err(Into::into)
113 }
114
115 fn try_get_lowest_available_checkpoint(
116 &self,
117 ) -> Result<CheckpointSequenceNumber, StorageError> {
118 if let Some(highest_pruned_cp) = self
119 .checkpoint_store
120 .get_highest_pruned_checkpoint_seq_number()
121 .map_err(Into::<StorageError>::into)?
122 {
123 Ok(highest_pruned_cp + 1)
124 } else {
125 Ok(0)
126 }
127 }
128
129 fn try_get_full_checkpoint_contents_by_sequence_number(
130 &self,
131 sequence_number: CheckpointSequenceNumber,
132 ) -> Result<Option<FullCheckpointContents>, StorageError> {
133 self.checkpoint_store
134 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
135 .map_err(Into::into)
136 }
137
138 fn try_get_full_checkpoint_contents(
139 &self,
140 digest: &CheckpointContentsDigest,
141 ) -> Result<Option<FullCheckpointContents>, StorageError> {
142 if let Some(seq_num) = self
144 .checkpoint_store
145 .get_sequence_number_by_contents_digest(digest)
146 .map_err(iota_types::storage::error::Error::custom)?
147 {
148 let contents = self
149 .checkpoint_store
150 .get_full_checkpoint_contents_by_sequence_number(seq_num)
151 .map_err(iota_types::storage::error::Error::custom)?;
152 if contents.is_some() {
153 return Ok(contents);
154 }
155 }
156
157 self.checkpoint_store
164 .get_checkpoint_contents(digest)
165 .map_err(iota_types::storage::error::Error::custom)?
166 .map(|contents| {
167 let mut transactions = Vec::with_capacity(contents.size());
168 for tx in contents.iter() {
169 if let (Some(t), Some(e)) = (
170 self.try_get_transaction(&tx.transaction)?,
171 self.cache_traits
172 .transaction_cache_reader
173 .try_get_effects(&tx.effects)
174 .map_err(iota_types::storage::error::Error::custom)?,
175 ) {
176 transactions.push(iota_types::base_types::ExecutionData::new(
177 (*t).clone().into_inner(),
178 e,
179 ))
180 } else {
181 return Result::<
182 Option<FullCheckpointContents>,
183 iota_types::storage::error::Error,
184 >::Ok(None);
185 }
186 }
187 Ok(Some(
188 FullCheckpointContents::from_contents_and_execution_data(
189 contents,
190 transactions.into_iter(),
191 ),
192 ))
193 })
194 .transpose()
195 .map(|contents| contents.flatten())
196 .map_err(iota_types::storage::error::Error::custom)
197 }
198
199 fn try_get_committee(
200 &self,
201 epoch: EpochId,
202 ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
203 Ok(self.committee_store.get_committee(&epoch).unwrap())
204 }
205
206 fn try_get_transaction(
207 &self,
208 digest: &TransactionDigest,
209 ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
210 self.cache_traits
211 .transaction_cache_reader
212 .try_get_transaction_block(digest)
213 .map_err(StorageError::custom)
214 }
215
216 fn try_get_transaction_effects(
217 &self,
218 digest: &TransactionDigest,
219 ) -> Result<Option<TransactionEffects>, StorageError> {
220 self.cache_traits
221 .transaction_cache_reader
222 .try_get_executed_effects(digest)
223 .map_err(StorageError::custom)
224 }
225
226 fn try_get_events(
227 &self,
228 digest: &TransactionEventsDigest,
229 ) -> Result<Option<TransactionEvents>, StorageError> {
230 self.cache_traits
231 .transaction_cache_reader
232 .try_get_events(digest)
233 .map_err(StorageError::custom)
234 }
235
236 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
237 self.checkpoint_store
238 .get_highest_executed_checkpoint()
239 .map_err(iota_types::storage::error::Error::custom)?
240 .ok_or_else(|| {
241 iota_types::storage::error::Error::missing("unable to get latest checkpoint")
242 })
243 }
244
245 fn try_get_checkpoint_contents_by_digest(
246 &self,
247 digest: &CheckpointContentsDigest,
248 ) -> iota_types::storage::error::Result<
249 Option<iota_types::messages_checkpoint::CheckpointContents>,
250 > {
251 self.checkpoint_store
252 .get_checkpoint_contents(digest)
253 .map_err(iota_types::storage::error::Error::custom)
254 }
255
256 fn try_get_checkpoint_contents_by_sequence_number(
257 &self,
258 sequence_number: CheckpointSequenceNumber,
259 ) -> iota_types::storage::error::Result<
260 Option<iota_types::messages_checkpoint::CheckpointContents>,
261 > {
262 match self.try_get_checkpoint_by_sequence_number(sequence_number) {
263 Ok(Some(checkpoint)) => {
264 self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
265 }
266 Ok(None) => Ok(None),
267 Err(e) => Err(e),
268 }
269 }
270}
271
272impl ObjectStore for RocksDbStore {
273 fn try_get_object(
274 &self,
275 object_id: &iota_types::base_types::ObjectID,
276 ) -> iota_types::storage::error::Result<Option<Object>> {
277 self.cache_traits.object_store.try_get_object(object_id)
278 }
279
280 fn try_get_object_by_key(
281 &self,
282 object_id: &iota_types::base_types::ObjectID,
283 version: iota_types::base_types::VersionNumber,
284 ) -> iota_types::storage::error::Result<Option<Object>> {
285 self.cache_traits
286 .object_store
287 .try_get_object_by_key(object_id, version)
288 }
289}
290
291impl WriteStore for RocksDbStore {
292 #[instrument(level = "trace", skip_all)]
293 fn try_insert_checkpoint(
294 &self,
295 checkpoint: &VerifiedCheckpoint,
296 ) -> Result<(), iota_types::storage::error::Error> {
297 if let Some(EndOfEpochData {
298 next_epoch_committee,
299 ..
300 }) = checkpoint.end_of_epoch_data.as_ref()
301 {
302 let next_committee = next_epoch_committee.iter().cloned().collect();
303 let committee =
304 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
305 self.try_insert_committee(committee)?;
306 }
307
308 self.checkpoint_store
309 .insert_verified_checkpoint(checkpoint)
310 .map_err(Into::into)
311 }
312
313 fn try_update_highest_synced_checkpoint(
314 &self,
315 checkpoint: &VerifiedCheckpoint,
316 ) -> Result<(), iota_types::storage::error::Error> {
317 let mut locked = self.highest_synced_checkpoint.lock();
318 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
319 return Ok(());
320 }
321 self.checkpoint_store
322 .update_highest_synced_checkpoint(checkpoint)
323 .map_err(iota_types::storage::error::Error::custom)?;
324 *locked = Some(checkpoint.sequence_number);
325 Ok(())
326 }
327
328 fn try_update_highest_verified_checkpoint(
329 &self,
330 checkpoint: &VerifiedCheckpoint,
331 ) -> Result<(), iota_types::storage::error::Error> {
332 let mut locked = self.highest_verified_checkpoint.lock();
333 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
334 return Ok(());
335 }
336 self.checkpoint_store
337 .update_highest_verified_checkpoint(checkpoint)
338 .map_err(iota_types::storage::error::Error::custom)?;
339 *locked = Some(checkpoint.sequence_number);
340 Ok(())
341 }
342
343 fn try_insert_checkpoint_contents(
344 &self,
345 checkpoint: &VerifiedCheckpoint,
346 contents: VerifiedCheckpointContents,
347 ) -> Result<(), iota_types::storage::error::Error> {
348 self.cache_traits
349 .state_sync_store
350 .try_multi_insert_transaction_and_effects(contents.transactions())
351 .map_err(iota_types::storage::error::Error::custom)?;
352 self.checkpoint_store
353 .insert_verified_checkpoint_contents(checkpoint, contents)
354 .map_err(Into::into)
355 }
356
357 fn try_insert_committee(
358 &self,
359 new_committee: Committee,
360 ) -> Result<(), iota_types::storage::error::Error> {
361 self.committee_store
362 .insert_new_committee(&new_committee)
363 .unwrap();
364 Ok(())
365 }
366}
367
368pub struct RestReadStore {
369 state: Arc<AuthorityState>,
370 rocks: RocksDbStore,
371}
372
373impl RestReadStore {
374 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
375 Self { state, rocks }
376 }
377
378 fn index(&self) -> iota_types::storage::error::Result<&RestIndexStore> {
379 self.state.rest_index.as_deref().ok_or_else(|| {
380 iota_types::storage::error::Error::custom("rest index store is disabled")
381 })
382 }
383}
384
385impl ObjectStore for RestReadStore {
386 fn try_get_object(
387 &self,
388 object_id: &iota_types::base_types::ObjectID,
389 ) -> iota_types::storage::error::Result<Option<Object>> {
390 self.rocks.try_get_object(object_id)
391 }
392
393 fn try_get_object_by_key(
394 &self,
395 object_id: &iota_types::base_types::ObjectID,
396 version: iota_types::base_types::VersionNumber,
397 ) -> iota_types::storage::error::Result<Option<Object>> {
398 self.rocks.try_get_object_by_key(object_id, version)
399 }
400}
401
402impl ReadStore for RestReadStore {
403 fn try_get_committee(
404 &self,
405 epoch: EpochId,
406 ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
407 self.rocks.try_get_committee(epoch)
408 }
409
410 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
411 self.rocks.try_get_latest_checkpoint()
412 }
413
414 fn try_get_highest_verified_checkpoint(
415 &self,
416 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
417 self.rocks.try_get_highest_verified_checkpoint()
418 }
419
420 fn try_get_highest_synced_checkpoint(
421 &self,
422 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
423 self.rocks.try_get_highest_synced_checkpoint()
424 }
425
426 fn try_get_lowest_available_checkpoint(
427 &self,
428 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
429 self.rocks.try_get_lowest_available_checkpoint()
430 }
431
432 fn try_get_checkpoint_by_digest(
433 &self,
434 digest: &CheckpointDigest,
435 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
436 self.rocks.try_get_checkpoint_by_digest(digest)
437 }
438
439 fn try_get_checkpoint_by_sequence_number(
440 &self,
441 sequence_number: CheckpointSequenceNumber,
442 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
443 self.rocks
444 .try_get_checkpoint_by_sequence_number(sequence_number)
445 }
446
447 fn try_get_checkpoint_contents_by_digest(
448 &self,
449 digest: &CheckpointContentsDigest,
450 ) -> iota_types::storage::error::Result<
451 Option<iota_types::messages_checkpoint::CheckpointContents>,
452 > {
453 self.rocks.try_get_checkpoint_contents_by_digest(digest)
454 }
455
456 fn try_get_checkpoint_contents_by_sequence_number(
457 &self,
458 sequence_number: CheckpointSequenceNumber,
459 ) -> iota_types::storage::error::Result<
460 Option<iota_types::messages_checkpoint::CheckpointContents>,
461 > {
462 self.rocks
463 .try_get_checkpoint_contents_by_sequence_number(sequence_number)
464 }
465
466 fn try_get_transaction(
467 &self,
468 digest: &TransactionDigest,
469 ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
470 self.rocks.try_get_transaction(digest)
471 }
472
473 fn try_get_transaction_effects(
474 &self,
475 digest: &TransactionDigest,
476 ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
477 self.rocks.try_get_transaction_effects(digest)
478 }
479
480 fn try_get_events(
481 &self,
482 digest: &TransactionEventsDigest,
483 ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
484 self.rocks.try_get_events(digest)
485 }
486
487 fn try_get_full_checkpoint_contents_by_sequence_number(
488 &self,
489 sequence_number: CheckpointSequenceNumber,
490 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
491 self.rocks
492 .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
493 }
494
495 fn try_get_full_checkpoint_contents(
496 &self,
497 digest: &CheckpointContentsDigest,
498 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
499 self.rocks.try_get_full_checkpoint_contents(digest)
500 }
501}
502
503impl RestStateReader for RestReadStore {
504 fn get_lowest_available_checkpoint_objects(
505 &self,
506 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
507 Ok(self
508 .state
509 .get_object_cache_reader()
510 .try_get_highest_pruned_checkpoint()
511 .map_err(StorageError::custom)?
512 .map(|cp| cp + 1)
513 .unwrap_or(0))
514 }
515
516 fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
517 Ok(self.state.get_chain_identifier())
518 }
519
520 fn get_epoch_last_checkpoint(
521 &self,
522 epoch_id: EpochId,
523 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
524 self.rocks
525 .checkpoint_store
526 .get_epoch_last_checkpoint(epoch_id)
527 .map_err(iota_types::storage::error::Error::custom)
528 }
529
530 fn indexes(&self) -> Option<&dyn RestIndexes> {
531 self.index().ok().map(|index| index as _)
532 }
533
534 fn get_struct_layout(
535 &self,
536 struct_tag: &move_core_types::language_storage::StructTag,
537 ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
538 self.state
539 .load_epoch_store_one_call_per_task()
540 .executor()
541 .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
543 .get_annotated_layout(struct_tag)
544 .map(|layout| layout.into_layout())
545 .map(Some)
546 .map_err(StorageError::custom)
547 }
548}
549
550impl RestIndexes for RestIndexStore {
551 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<iota_types::storage::EpochInfo>> {
553 self.get_epoch_info(epoch).map_err(StorageError::custom)
554 }
555
556 fn get_transaction_info(
558 &self,
559 digest: &TransactionDigest,
560 ) -> iota_types::storage::error::Result<Option<TransactionInfo>> {
561 self.get_transaction_info(digest)
562 .map_err(StorageError::custom)
563 }
564
565 fn account_owned_objects_info_iter(
567 &self,
568 owner: IotaAddress,
569 cursor: Option<ObjectID>,
570 ) -> Result<Box<dyn Iterator<Item = Result<AccountOwnedObjectInfo, TypedStoreError>> + '_>>
571 {
572 let iter = self.owner_iter(owner, cursor)?.map(|result| {
573 result.map(
574 |(OwnerIndexKey { owner, object_id }, OwnerIndexInfo { version, type_ })| {
575 AccountOwnedObjectInfo {
576 owner,
577 object_id,
578 version,
579 type_,
580 }
581 },
582 )
583 });
584
585 Ok(Box::new(iter) as _)
586 }
587
588 fn dynamic_field_iter(
590 &self,
591 parent: ObjectID,
592 cursor: Option<ObjectID>,
593 ) -> iota_types::storage::error::Result<
594 Box<
595 dyn Iterator<Item = Result<(DynamicFieldKey, DynamicFieldIndexInfo), TypedStoreError>>
596 + '_,
597 >,
598 > {
599 let iter = self.dynamic_field_iter(parent, cursor)?;
600 Ok(Box::new(iter) as _)
601 }
602
603 fn get_coin_info(
605 &self,
606 coin_type: &StructTag,
607 ) -> iota_types::storage::error::Result<Option<CoinInfo>> {
608 self.get_coin_info(coin_type)?
609 .map(
610 |CoinIndexInfo {
611 coin_metadata_object_id,
612 treasury_object_id,
613 }| CoinInfo {
614 coin_metadata_object_id,
615 treasury_object_id,
616 },
617 )
618 .pipe(Ok)
619 }
620}